When importing data from CSV files in spark, you may need to convert the textual data to a different type.
I had an especially hard time with date time values. The spark SQL module supports java.sql.Timestamp. So the question becomes, how does one create a java.sql.Timestamp from a string given the need to specify its format explicitly?
The spark-datetime package uses joda time and a special serializable SparkDateTimeUDT (composed of a long millis and a TZ) to make date time serializable. That is good but spark-datetime only supports 2.10 out of the box and I wanted to use 2.11--I had compiled spark 1.5 to support scala 2.11.
In java 1.8, the java.time package matches most of joda time, but DateTimeFormatter is not serializable so when you try to define a date time formatter for parsing:
val f = DateTimeFormatter.ofPattern("dd-MMM-yyyy HH:mm:ss")
val toDT=udf{ (v:String) => java.sql.Timestamp.valueOf(LocalDateTime.parse(v, f))}
you get the dreaded "not serializable" error when you try to use it in a sql query via sqlContext.sql("...").
However, the real issue is that a DateTimeFormatter is not serializable when it used outside the closure that is created and shipped off to the executors running on the other nodes. When you build-up the code that runs your logic, it needs to be serializable so that the spark infrastructure can get the code to the nodes for you to use.
The answer is quite easy though you may sacrifice performance to get it to work. Here's what you can define that can be serialized:
sqlContext.udf.register("toTimestamp", (v:String) => {java.sql.Timestamp.valueOf(LocalDateTime.parse(v, DateTimeFormatter.ofPattern("dd-MMM-yyyy HH:mm:ss"))) })
The
DateTimeFormatter is still used but it is used in the closure. We can test that it works by doing a quick test:scala> sqlContext.sql("""select toTimestamp("01-Jan-2015 00:00:00")""").show()
+--------------------+
| _c0|
+--------------------+
|2015-01-01 00:00:...|
+--------------------+
scala> sqlContext.sql("""select toTimestamp("01-Jan-2015 00:00:00")""").printSchema()
root
|-- _c0: timestamp (nullable = true)
No comments:
Post a Comment