Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark. Transformer that generates a random number generates always the same number

I am trying to measure the performance impact on having to copy a dataframe from scala to python and back in a large pipeline. For that purpose I have created this rather artificial transformer:

from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml.util import keyword_only
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

import random

class RandomColAdderTransformer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__self(self, inputCol=None, outputCol=None, bogusarg=None):
        super(RandomColAdderTransformer, self).__init__()
        self.bogusarg = None
        self._setDefault(bogusarg=set())
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        cur_col = self.getInputCol()
        def randGet(col): # UDF crashes with no arguments
            a = col*random.random() # Ensure we are reading and copying to python space 
            return a            # It runs only once?

        sparktype = FloatType()
        return dataset.withColumn("randFloat", udf(randGet, sparktype)(cur_col))

The goal of this transformer is to ensure that there are some numbers which are generated from python, it accesses the dataframe and does a multiplication (in python) and then for the next stage of the pipeline it will have to add a column to the dataframe

However I am having some weirdness. When testing my code the same random number is generated for all columns:

df = sqlContext.createDataFrame([(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
myTestTransformer = RandomColAdderTransformer()
myTestTransformer.setInputCol("x3")
transformedDF = myTestTransformer.transform(df)
transformedDF.show()

+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+

And then consecutive invocations of transformedDF.show() actually change the values!?

transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [3]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0|  2.9191132|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [4]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [5]: transformedDF.show()
+---+---+-----+----------+
| x1| x2|   x3| randFloat|
+---+---+-----+----------+
|  1|  a| 23.0| 16.033003|
|  3|  B|-23.0|-2.9191132|
+---+---+-----+----------+

Is this behavior expected? Does .show() actually trigger the computation start? AFAIK I should be using a single node, sure they would run in a single thread so they would be sharing the random seed? I know a builtin pyspark rng exists, but it is not suitable for my purpose as it wouldn't actually be generating the data from python space.

like image 479
XapaJIaMnu Avatar asked Oct 19 '25 04:10

XapaJIaMnu


1 Answers

Well, expected is rather relative here but it is not something that cannot be explained. In particular the state of the RNG is inherited from the parent process. You can easily prove that by running following simple snippet in the local mode:

import random 

def roll_and_get_state(*args):
    random.random()
    return [random.getstate()]

states = sc.parallelize([], 10).mapPartitions(roll_and_get_state).collect()
len(set(states))
## 1

As you can see each partition has is using its own RNG but all have the same state.

In general ensuring correct Python RNG behavior in Spark without a serious performance penalty, especially if you need reproducible results, is rather tricky.

One possible approach is to instantiate separate Random instance per partition with seed generated using cryptographically safe random data (os.urandom).

If you need reproducible results you can generate RNG seeds based on global state and partition data. Unfortunately this information is not easily accessible on runtime from Python (ignoring special cases like mapPartitionsWithIndex).

Since partition level operations are not always applicably (like in case of UDF) you can achieve similar result by using singleton module or Borg pattern to initialize RNG for each executor.

See also:

  • Random numbers generation in PySpark
  • Filtering Spark DataFrame on new column
like image 150
zero323 Avatar answered Oct 21 '25 16:10

zero323



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!