I have my timestamp in UTC and ISO8601, but using Structured Streaming, it gets automatically converted into the local time. Is there a way to stop this conversion? I would like to have it in UTC.
I'm reading json data from Kafka and then parsing them using the from_json Spark function.
Input:
{"Timestamp":"2015-01-01T00:00:06.222Z"}
Flow:
SparkSession
  .builder()
  .master("local[*]")
  .appName("my-app")
  .getOrCreate()
  .readStream()
  .format("kafka")
  ... //some magic
  .writeStream()
  .format("console")
  .start()
  .awaitTermination();
Schema:
StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField("Timestamp", DataTypes.TimestampType, true),});
Output:
+--------------------+
|           Timestamp|
+--------------------+
|2015-01-01 01:00:...|
|2015-01-01 01:00:...|
+--------------------+
As you can see, the hour has incremented by itself.
PS: I tried to experiment with the from_utc_timestamp Spark function, but no luck.
Apache Spark Structured Streaming is a near-real time processing engine that offers end-to-end fault tolerance with exactly-once processing guarantees using familiar Spark APIs.
According to the definition of the TIMESTAMP WITH SESSION TIME ZONE , Spark stores local timestamps in the UTC time zone, and uses the session time zone while extracting date-time fields or converting the timestamps to strings.
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
current_timestamp() - Returns the current timestamp at the start of query evaluation.
For me it worked to use:
spark.conf.set("spark.sql.session.timeZone", "UTC")
It tells the spark SQL to use UTC as a default timezone for timestamps. I used it in spark SQL for example:
select *, cast('2017-01-01 10:10:10' as timestamp) from someTable
I know it does not work in 2.0.1. but works in Spark 2.2. I used in SQLTransformer also and it worked. 
I am not sure about streaming though.
Note:
This answer is useful primarily in Spark < 2.2. For newer Spark version see the answer by astro-asz
However we should note that as of Spark 2.4.0, spark.sql.session.timeZone doesn't set user.timezone (java.util.TimeZone.getDefault). So setting spark.sql.session.timeZone alone can result in rather awkward situation where SQL and non-SQL components use different timezone settings.
Therefore I still recommend setting user.timezone explicitly, even if spark.sql.session.timeZone is set.
TL;DR Unfortunately this is how Spark handles timestamps right now and there is really no built-in alternative, other than operating on epoch time directly, without using date/time utilities.
You can an insightful discussion on the Spark developers list: SQL TIMESTAMP semantics vs. SPARK-18350
The cleanest workaround I've found so far is to set -Duser.timezone to UTC for both the driver and executors. For example with submit:
bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC" \
                --conf "spark.executor.extraJavaOptions=-Duser.timezone=UTC"
or by adjusting configuration files (spark-defaults.conf):
spark.driver.extraJavaOptions      -Duser.timezone=UTC
spark.executor.extraJavaOptions    -Duser.timezone=UTC
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With