Aggregation on Spark dataframe with multiple dynamic aggregation operations.
I want to do aggregation on a Spark dataframe using Scala with multiple dynamic aggregation operations (passed by user in JSON). I'm converting the JSON to a Map.
Below is some sample data:
colA colB colC colD
1 2 3 4
5 6 7 8
9 10 11 12
The Spark aggregation code which I am using:
var cols = ["colA","colB"]
var aggFuncMap = Map("colC"-> "sum", "colD"-> "countDistinct")
var aggregatedDF = currentDF.groupBy(cols.head, cols.tail: _*).agg(aggFuncMap)
I have to pass aggFuncMap as Map only, so that user can pass any number of aggregations through the JSON configuration.
The above code is working fine for some aggregations, including sum, min, max, avg and count.
However, unfortunately this code is not working for countDistinct (maybe because it is camel case?).
When running the above code, I am getting this error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Undefined function: 'countdistinct'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'
Any help will be appreciated!
It's currently not possible to use agg with countDistinct inside a Map. From the documentation we see:
The available aggregate methods are avg, max, min, sum, count.
A possible fix would be to change the Map to a Seq[Column],
val cols = Seq("colA", "colB")
val aggFuncs = Seq(sum("colC"), countDistinct("colD"))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)
but that won't help very much if the user are to specify the aggregations in a configuration file.
Another approach would be to use expr, this function will evaluate a string and give back a column. However, expr won't accept "countDistinct", instead "count(distinct(...))" needs to be used.
This could be coded as follows:
val aggFuncs = Seq("sum(colC)", "count(distinct(colD))").map(e => expr(e))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With