This answer nicely explains how to use pyspark's groupby and pandas_udf to do custom aggregations. However, I cannot possibly declare my schema manually as shown in this part of the example
from pyspark.sql.types import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_min", DoubleType())
])
since I will be returning 100+ columns with names that are automatically generated. Is there any way to tell PySpark to just implicetely use the Schema returned by my function and assume it's going to be the same for all worker nodes? This schema will also change during runs since I will have to play around with the predictors I want to use, so an automated process for Schema generation might be an option...
Based on Sanxofons comment, I got an idea on how to implement this myself:
from pyspark.sql.types import *
mapping = {"float64": DoubleType,
"object":StringType,
"int64":IntegerType} # Incomplete - extend with your types.
def createUDFSchemaFromPandas(dfp):
column_types = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
schema = StructType(column_types)
return schema
What I do is get a sample pandas df, pass it to the function, and see what returns:
dfp = df_total.limit(100).toPandas()
df_return = my_UDF_function(dfp)
schema = createUDFSchemaFromPandas(df_return)
This seems to work for me. The problem is that it is kind of recursive (need to define the function to get the schema, have the schema to define as udf). I solved this by creating a "wrapper" UDF that simply passes the dataframe.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With