Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why Iterator of Series to Iterator of Series pandasUDF (PandasUDFType.SCALAR_ITER) when Series to Series (PandasUDFType.SCALAR) is available?

There are different kinds of pandasUDFType depending upon the input and the output type of the function.

There is:

series to series PandasUDFType.SCALAR:

from pyspark.sql.functions import pandas_udf, PandasUDFType                                                                                                                                                                                                                                                                                                                                                                                                                   

@pandas_udf('long', PandasUDFType.SCALAR)
def pandas_plus_one(v):
    return v + 1

spark.range(10).select(pandas_plus_one("id")).show()

And there is also Iterator of Series to Iterator of Series PandasUDFType.SCALAR_ITER:

from pyspark.sql.functions import pandas_udf, PandasUDFType                                                                                                                                                                                                                   

@pandas_udf('long', PandasUDFType.SCALAR_ITER)
def pandas_plus_one(iterator):
    return map(lambda s: s + 1, iterator)

spark.range(10).select(pandas_plus_one("id")).show()

Can you please give me a simple usecase that cannot be solved by series to series PandasUDFType.SCALAR and can be solved by Iterator of Series to Iterator of Series PandasUDFType.SCALAR_ITER. I cannot seem to understand what is the need to have one while the other is still there

like image 599
Nitin Siwach Avatar asked Jan 30 '26 08:01

Nitin Siwach


1 Answers

According to the official documentation and Databricks docs, these two types of Pandas UDFs are quite similar but differ in some aspects. In addition to input and output type differences, Iterator of Series to Iterator of Series UDF can only take a single column as input whereas Scalar UDF can take multiple input columns. To make Iterator UDF takes multiple spark columns, you'll need to use Iterator of multiple Series to Iterator of Series UDF which is basically the same as Iterator of Series to Iterator of Series UDF but takes an iterator of a tuple of p.Series as parameter.

Iterator UDFs are said useful when:

  • You need to pre-fetch the input iterator
@pandas_udf("long") 
def do_something(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: 
    threading.Thread(consume, args=(iterator, q)) # prefetch the iterator 
    for s in q: 
        yield func(s) 
  • You need to do some expensive state initialization before processing each batch:
@pandas_udf("long") 
def do_something(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: 
    s = some_initialization() # initialize states
    for x in iterator: 
        yield func(x, s) # use the state for the whole iterator

However, there is this citation from the docs which makes some confusion as it sates that internally it works identically as Series to Series:

It is also useful when the UDF execution requires initializing some states although internally it works identically as Series to Series case

like image 75
blackbishop Avatar answered Feb 02 '26 09:02

blackbishop



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!