Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark SQL can use FIRST_VALUE and LAST_VALUE in a GROUP BY aggregation (but it's not standard)

(Tested on Spark 2.2 and 2.3)

I am using Spark to aggregate stock trading ticks into daily OHLC (open-high-low-close) records.

The input data is like

val data = Seq(("2018-07-11 09:01:00", 34.0), ("2018-07-11 09:04:00", 32.0), ("2018-07-11 09:02:00", 35.0), ("2018-07-11 09:03:00", 30.0), ("2018-07-11 09:00:00", 33.0), ("2018-07-12 09:01:00", 56.0), ("2018-07-12 09:04:00", 54.0), ("2018-07-12 09:02:00", 51.0), ("2018-07-12 09:03:00", 50.0), ("2018-07-12 09:00:00", 51.0)).toDF("time", "price")

data.createOrReplaceTempView("ticks")

data.show

scala>

shown as

+-------------------+-----+
|               time|price|
+-------------------+-----+
|2018-07-11 09:01:00| 34.0|
|2018-07-11 09:04:00| 32.0|
|2018-07-11 09:02:00| 35.0|
|2018-07-11 09:03:00| 30.0|
|2018-07-11 09:00:00| 33.0|
|2018-07-12 09:01:00| 56.0|
|2018-07-12 09:04:00| 54.0|
|2018-07-12 09:02:00| 51.0|
|2018-07-12 09:03:00| 50.0|
|2018-07-12 09:00:00| 51.0|
+-------------------+-----+

Desired output is

+----------+----+----+----+-----+
|      date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|33.0|35.0|30.0| 32.0|
|2018-07-12|51.0|56.0|50.0| 54.0|
+----------+----+----+----+-----+

There have been many SQL solutions such as this and this.

SELECT
    TO_DATE(time) AS date,
    FIRST_VALUE(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS open,
    MAX(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS high,
    MIN(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS low,
    LAST_VALUE(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS close
FROM ticks

Due to the limitation of SQL, these solutions are cumbersome.

Today, I found Spark SQL can use FIRST_VALUE and LAST_VALUE in a GROUP BY context, which is not allowed in standard SQL.

This unlimitation of Spark SQL derives a neat and tidy solution like this:

SELECT
    TO_DATE(time) AS date,
    FIRST_VALUE(price) AS open,
    MAX(price) AS high,
    MIN(price) AS low,
    LAST_VALUE(price) AS close
FROM ticks
GROUP BY TO_DATE(time)

You can try it

spark.sql("SELECT TO_DATE(time) AS date, FIRST(price) AS open, MAX(price) AS high, MIN(price) AS low, LAST(price) AS close FROM ticks GROUP BY TO_DATE(time)").show

scala>

shown as

+----------+----+----+----+-----+
|      date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|34.0|35.0|30.0| 33.0|
|2018-07-12|56.0|56.0|50.0| 51.0|
+----------+----+----+----+-----+

However, the above result is incorrect. (Please compare with the above desired result.)

FIRST_VALUE and LAST_VALUE need a deterministic ordering to get deterministic results.

I can correct it by adding an orderBy before grouping.

import org.apache.spark.sql.functions._

data.orderBy("time").groupBy(expr("TO_DATE(time)").as("date")).agg(first("price").as("open"), max("price").as("high"), min("price").as("low"), last("price").as("close")).show

scala>

shown as

+----------+----+----+----+-----+
|      date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|33.0|35.0|30.0| 32.0|
|2018-07-12|51.0|56.0|50.0| 54.0|
+----------+----+----+----+-----+

which is correct as desired !!!

My question is, is the above code 'orderBy then groupBy' valid? Is this ordering guaranteed? Can we use this non-standard feature in serious productions?

The point of this question is that, in standard SQL, we can only do a GROUP BY then ORDER BY to sort the aggregation, but not ORDER BY then GROUP BY. The GROUP BY will ignore the ordering of ORDER BY.

I also wonder if Spark SQL can do such a GROUP BY under desired ordering, can standard SQL also invent such a syntax for this?

P.S.

I can think of some aggregation functions that depend on deterministic ordering.

WITH ORDER BY time SELECT COLLECT_LIST(price) GROUP BY stockID

WITH ORDER BY time SELECT SUM(SQUARE(price - LAG(price, 1, 0))) GROUP BY stockID

Without the WITH ORDER BY time, how can we sort the COLLECTed_LIST in standard SQL?

These examples show that "GROUP BY under desired ordering" is still useful.

like image 570
John Lin Avatar asked Jul 11 '18 09:07

John Lin


People also ask

What is AGG function in Spark?

agg is a DataFrame method that accepts those aggregate functions as arguments: scala> my_df.agg(min("column")) res0: org.apache.spark.sql. DataFrame = [min(column): double]

Which of the following is true for Spark SQL?

Which of the following is true for Spark core? It enables users to run SQL / HQL queries on the top of Spark. Improves the performance of iterative algorithm drastically. 16.

What is a benefit for using the partition by function in Spark SQL?

Spark/PySpark partitioning is a way to split the data into multiple partitions so that you can execute transformations on multiple partitions in parallel which allows completing the job faster. You can also write partitioned data into a file system (multiple sub-directories) for faster reads by downstream systems.

Does Spark SQL support window functions?

Spark SQL supports three kinds of window functions: ranking functions. analytic functions. aggregate functions.


1 Answers

Ordering in groupBy/agg not guaranted, you can use window function with partition by key and ordering by time

like image 167
K. Kostikov Avatar answered Oct 22 '22 14:10

K. Kostikov



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!