Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implicit schema for pandas_udf in PySpark?

This answer nicely explains how to use pyspark's groupby and pandas_udf to do custom aggregations. However, I cannot possibly declare my schema manually as shown in this part of the example

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_min", DoubleType())
])

since I will be returning 100+ columns with names that are automatically generated. Is there any way to tell PySpark to just implicetely use the Schema returned by my function and assume it's going to be the same for all worker nodes? This schema will also change during runs since I will have to play around with the predictors I want to use, so an automated process for Schema generation might be an option...

like image 694
Thomas Avatar asked Oct 16 '25 03:10

Thomas


1 Answers

Based on Sanxofons comment, I got an idea on how to implement this myself:

from pyspark.sql.types import *

mapping = {"float64": DoubleType,
           "object":StringType,
           "int64":IntegerType} # Incomplete - extend with your types.

def createUDFSchemaFromPandas(dfp):
  column_types  = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
  schema = StructType(column_types)
  return schema

What I do is get a sample pandas df, pass it to the function, and see what returns:

dfp = df_total.limit(100).toPandas()
df_return = my_UDF_function(dfp)
schema = createUDFSchemaFromPandas(df_return)

This seems to work for me. The problem is that it is kind of recursive (need to define the function to get the schema, have the schema to define as udf). I solved this by creating a "wrapper" UDF that simply passes the dataframe.

like image 59
Thomas Avatar answered Oct 17 '25 16:10

Thomas



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!