Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Heavy stateful UDF in pyspark

I have to run a really heavy python function as UDF in Spark and I want to cache some data inside UDF. The case is similar to one mentioned here

I am aware that it is slow and wrong. But the existing infrastructure is in spark and I don't want set up a new infrastructure and deal with data loading/parallelization/fail safety separately for this case.

This is how my spark program looks like:

from mymodule import my_function # here is my function
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()

schema = StructType().add("input", "string")
df = spark.read.format("json").schema(schema).load("s3://input_path")

udf1 = udf(my_function, StructType().add("output", "string"))
df.withColumn("result", udf1(df.input)).write.json("s3://output_path/")

The my_function internally calls a method of an object with a slow constructor. Therefore I don't want the object to be initialized for every entry and I am trying to cache it:

from my_slow_class import SlowClass
from cachetools import cached

@cached(cache={}) 
def get_cached_object():
    # this call is really slow therefore I am trying 
    # to cache it with cachetools
    return SlowClass() 

def my_function(input):
    slow_object = get_cached_object()
    output = slow_object.call(input)
    return {'output': output}

mymodule and my_slow_class are installed as modules on each spark machine.

It seems working. The constructor is called only a few times (only 10-20 times for 100k lines in input dataframe). And that is what I want.

My concern is multithreading/multiprocessing inside Spark executors and if the cached SlowObject instance is shared between many parallel my_function calls.

Can I rely on the fact that my_function is called once at a time inside python processes on worker nodes? Does spark use any multiprocessing/multithreading in python process that executes my UDF?

like image 416
sergem Avatar asked Oct 23 '25 06:10

sergem


1 Answers

Spark forks Python process to create individual workers, however all processing in the individual worker process is sequential, unless multithreading or multiprocessing is used explicitly by the UserDefinedFunction.

So as long as state is used for caching and slow_object.call is a pure function you have nothing to worry about.

like image 73
10465355 Avatar answered Oct 25 '25 19:10

10465355



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!