There are different kinds of pandasUDFType depending upon the input and the output type of the function.
There is:
series to series PandasUDFType.SCALAR:
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('long', PandasUDFType.SCALAR)
def pandas_plus_one(v):
return v + 1
spark.range(10).select(pandas_plus_one("id")).show()
And there is also Iterator of Series to Iterator of Series PandasUDFType.SCALAR_ITER:
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('long', PandasUDFType.SCALAR_ITER)
def pandas_plus_one(iterator):
return map(lambda s: s + 1, iterator)
spark.range(10).select(pandas_plus_one("id")).show()
Can you please give me a simple usecase that cannot be solved by series to series PandasUDFType.SCALAR and can be solved by Iterator of Series to Iterator of Series PandasUDFType.SCALAR_ITER. I cannot seem to understand what is the need to have one while the other is still there
According to the official documentation and Databricks docs, these two types of Pandas UDFs are quite similar but differ in some aspects. In addition to input and output type differences, Iterator of Series to Iterator of Series UDF can only take a single column as input whereas Scalar UDF can take multiple input columns. To make Iterator UDF takes multiple spark columns, you'll need to use Iterator of multiple Series to Iterator of Series UDF which is basically the same as Iterator of Series to Iterator of Series UDF but takes an iterator of a tuple of p.Series as parameter.
Iterator UDFs are said useful when:
@pandas_udf("long")
def do_something(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
threading.Thread(consume, args=(iterator, q)) # prefetch the iterator
for s in q:
yield func(s)
@pandas_udf("long")
def do_something(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
s = some_initialization() # initialize states
for x in iterator:
yield func(x, s) # use the state for the whole iterator
However, there is this citation from the docs which makes some confusion as it sates that internally it works identically as Series to Series:
It is also useful when the UDF execution requires initializing some states although internally it works identically as Series to Series case
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With