(Tested on Spark 2.2 and 2.3)
I am using Spark to aggregate stock trading ticks into daily OHLC (open-high-low-close) records.
The input data is like
val data = Seq(("2018-07-11 09:01:00", 34.0), ("2018-07-11 09:04:00", 32.0), ("2018-07-11 09:02:00", 35.0), ("2018-07-11 09:03:00", 30.0), ("2018-07-11 09:00:00", 33.0), ("2018-07-12 09:01:00", 56.0), ("2018-07-12 09:04:00", 54.0), ("2018-07-12 09:02:00", 51.0), ("2018-07-12 09:03:00", 50.0), ("2018-07-12 09:00:00", 51.0)).toDF("time", "price")
data.createOrReplaceTempView("ticks")
data.show
scala>
shown as
+-------------------+-----+
| time|price|
+-------------------+-----+
|2018-07-11 09:01:00| 34.0|
|2018-07-11 09:04:00| 32.0|
|2018-07-11 09:02:00| 35.0|
|2018-07-11 09:03:00| 30.0|
|2018-07-11 09:00:00| 33.0|
|2018-07-12 09:01:00| 56.0|
|2018-07-12 09:04:00| 54.0|
|2018-07-12 09:02:00| 51.0|
|2018-07-12 09:03:00| 50.0|
|2018-07-12 09:00:00| 51.0|
+-------------------+-----+
Desired output is
+----------+----+----+----+-----+
| date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|33.0|35.0|30.0| 32.0|
|2018-07-12|51.0|56.0|50.0| 54.0|
+----------+----+----+----+-----+
There have been many SQL solutions such as this and this.
SELECT
TO_DATE(time) AS date,
FIRST_VALUE(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS open,
MAX(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS high,
MIN(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS low,
LAST_VALUE(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS close
FROM ticks
Due to the limitation of SQL, these solutions are cumbersome.
Today, I found Spark SQL can use FIRST_VALUE and LAST_VALUE in a GROUP BY context, which is not allowed in standard SQL.
This unlimitation of Spark SQL derives a neat and tidy solution like this:
SELECT
TO_DATE(time) AS date,
FIRST_VALUE(price) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST_VALUE(price) AS close
FROM ticks
GROUP BY TO_DATE(time)
You can try it
spark.sql("SELECT TO_DATE(time) AS date, FIRST(price) AS open, MAX(price) AS high, MIN(price) AS low, LAST(price) AS close FROM ticks GROUP BY TO_DATE(time)").show
scala>
shown as
+----------+----+----+----+-----+
| date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|34.0|35.0|30.0| 33.0|
|2018-07-12|56.0|56.0|50.0| 51.0|
+----------+----+----+----+-----+
However, the above result is incorrect. (Please compare with the above desired result.)
FIRST_VALUE and LAST_VALUE need a deterministic ordering to get deterministic results.
I can correct it by adding an orderBy before grouping.
import org.apache.spark.sql.functions._
data.orderBy("time").groupBy(expr("TO_DATE(time)").as("date")).agg(first("price").as("open"), max("price").as("high"), min("price").as("low"), last("price").as("close")).show
scala>
shown as
+----------+----+----+----+-----+
| date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|33.0|35.0|30.0| 32.0|
|2018-07-12|51.0|56.0|50.0| 54.0|
+----------+----+----+----+-----+
which is correct as desired !!!
My question is, is the above code 'orderBy then groupBy' valid? Is this ordering guaranteed? Can we use this non-standard feature in serious productions?
The point of this question is that, in standard SQL, we can only do a GROUP BY then ORDER BY to sort the aggregation, but not ORDER BY then GROUP BY.
The GROUP BY will ignore the ordering of ORDER BY.
I also wonder if Spark SQL can do such a GROUP BY under desired ordering, can standard SQL also invent such a syntax for this?
P.S.
I can think of some aggregation functions that depend on deterministic ordering.
WITH ORDER BY time SELECT COLLECT_LIST(price) GROUP BY stockID
WITH ORDER BY time SELECT SUM(SQUARE(price - LAG(price, 1, 0))) GROUP BY stockID
Without the WITH ORDER BY time, how can we sort the COLLECTed_LIST in standard SQL?
These examples show that "GROUP BY under desired ordering" is still useful.
agg is a DataFrame method that accepts those aggregate functions as arguments: scala> my_df.agg(min("column")) res0: org.apache.spark.sql. DataFrame = [min(column): double]
Which of the following is true for Spark core? It enables users to run SQL / HQL queries on the top of Spark. Improves the performance of iterative algorithm drastically. 16.
Spark/PySpark partitioning is a way to split the data into multiple partitions so that you can execute transformations on multiple partitions in parallel which allows completing the job faster. You can also write partitioned data into a file system (multiple sub-directories) for faster reads by downstream systems.
Spark SQL supports three kinds of window functions: ranking functions. analytic functions. aggregate functions.
Ordering in groupBy/agg not guaranted, you can use window function with partition by key and ordering by time
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With