I'm trying to return a specific structure from a pandas_udf. It worked on one cluster but fails on another. I try to run a udf on groups, which requires the return type to be a data frame.
from pyspark.sql.functions import pandas_udf
import pandas as pd
import numpy as np
from pyspark.sql.types import *
schema = StructType([
  StructField("Distance", FloatType()),
  StructField("CarId", IntegerType())
])
def haversine(lon1, lat1, lon2, lat2):
    #Calculate distance, return scalar
    return 3.5 # Removed logic to facilitate reading
@pandas_udf(schema)
def totalDistance(oneCar):
    dist = haversine(oneCar.Longtitude.shift(1),
                     oneCar.Latitude.shift(1),
                     oneCar.loc[1:, 'Longitude'], 
                     oneCar.loc[1:, 'Latitude'])
    return pd.DataFrame({"CarId":oneCar['CarId'].iloc[0],"Distance":np.sum(dist)},index = [0])
## Calculate the overall distance made by each car
distancePerCar= df.groupBy('CarId').apply(totalDistance)
This is the exception I'm getting:
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self)
    114             try:
--> 115                 to_arrow_type(self._returnType_placeholder)
    116             except TypeError:
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in to_arrow_type(dt)
   1641     else:
-> 1642         raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
   1643     return arrow_type
TypeError: Unsupported type in conversion to Arrow: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true)))
During handling of the above exception, another exception occurred:
NotImplementedError                       Traceback (most recent call last)
<ipython-input-35-4f2194cfb998> in <module>()
     18     km = 6367 * c
     19     return km
---> 20 @pandas_udf("CarId: int, Distance: float")
     21 def totalDistance(oneUser):
     22     dist = haversine(oneUser.Longtitude.shift(1), oneUser.Latitude.shift(1),
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _create_udf(f, returnType, evalType)
     62     udf_obj = UserDefinedFunction(
     63         f, returnType=returnType, name=None, evalType=evalType, deterministic=True)
---> 64     return udf_obj._wrapped()
     65 
     66 
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _wrapped(self)
    184 
    185         wrapper.func = self.func
--> 186         wrapper.returnType = self.returnType
    187         wrapper.evalType = self.evalType
    188         wrapper.deterministic = self.deterministic
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self)
    117                 raise NotImplementedError(
    118                     "Invalid returnType with scalar Pandas UDFs: %s is "
--> 119                     "not supported" % str(self._returnType_placeholder))
    120         elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
    121             if isinstance(self._returnType_placeholder, StructType):
NotImplementedError: Invalid returnType with scalar Pandas UDFs: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true))) is not supported
I've also tried changing the schema to
@pandas_udf("<CarId:int,Distance:float>")
and
@pandas_udf("CarId:int,Distance:float")
but get the same exception. I suspect it has to do with my pyarrow version, which isn't compatible with my pyspark version.
Any help would be appreciated. Thanks!
As reported in the error message ("Invalid returnType with scalar Pandas UDFs"), you are trying to create a SCALAR vectorized pandas UDF, but using a StructType schema and returning a pandas DataFrame.
You should rather declare your function as a GROUPED MAP pandas UDF, i.e.:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
Difference between scalar and grouped vectorized UDFs is explained in the pyspark doc: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf.
A scalar UDF defines a transformation: One or more pandas.Series -> A pandas.Series. The returnType should be a primitive data type, e.g., DoubleType(). The length of the returned pandas.Series must be of the same as the input pandas.Series.
To summarize, a scalar pandas UDF processes a column at a time (a pandas Series), leading to better performance than traditional UDFs that process one row element at a time. Note that the performance improvement is due to efficient python serialization using PyArrow.
A grouped map UDF defines transformation: A pandas.DataFrame -> A pandas.DataFrame The returnType should be a StructType describing the schema of the returned pandas.DataFrame. The length of the returned pandas.DataFrame can be arbitrary and the columns must be indexed so that their position matches the corresponding field in the schema.
A grouped pandas UDF processes multiple rows and columns at a time (using a pandas DataFrame, not to be confused with a Spark DataFrame), and is extremely useful and efficient for multivariate operations (especially when using local python numerical analysis and machine learning libraries like numpy, scipy, scikit-learn etc.). In this case, the output is a single-row DataFrame with several columns.
Note that I did not check the internal logic of the code, only the methodology.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With