I am trying to measure the performance impact on having to copy a dataframe
from scala to python and back in a large pipeline. For that purpose I have created this rather artificial transformer:
from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml.util import keyword_only
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import random
class RandomColAdderTransformer(Transformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__self(self, inputCol=None, outputCol=None, bogusarg=None):
super(RandomColAdderTransformer, self).__init__()
self.bogusarg = None
self._setDefault(bogusarg=set())
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None):
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset):
cur_col = self.getInputCol()
def randGet(col): # UDF crashes with no arguments
a = col*random.random() # Ensure we are reading and copying to python space
return a # It runs only once?
sparktype = FloatType()
return dataset.withColumn("randFloat", udf(randGet, sparktype)(cur_col))
The goal of this transformer is to ensure that there are some numbers which are generated from python, it accesses the dataframe
and does a multiplication (in python) and then for the next stage of the pipeline it will have to add a column to the dataframe
However I am having some weirdness. When testing my code the same random number is generated for all columns:
df = sqlContext.createDataFrame([(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
myTestTransformer = RandomColAdderTransformer()
myTestTransformer.setInputCol("x3")
transformedDF = myTestTransformer.transform(df)
transformedDF.show()
+---+---+-----+-----------+
| x1| x2| x3| randFloat|
+---+---+-----+-----------+
| 1| a| 23.0| 0.95878977|
| 3| B|-23.0|-0.95878977|
+---+---+-----+-----------+
And then consecutive invocations of transformedDF.show()
actually change the values!?
transformedDF.show()
+---+---+-----+-----------+
| x1| x2| x3| randFloat|
+---+---+-----+-----------+
| 1| a| 23.0| 0.95878977|
| 3| B|-23.0|-0.95878977|
+---+---+-----+-----------+
In [3]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2| x3| randFloat|
+---+---+-----+-----------+
| 1| a| 23.0| 2.9191132|
| 3| B|-23.0|-0.95878977|
+---+---+-----+-----------+
In [4]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2| x3| randFloat|
+---+---+-----+-----------+
| 1| a| 23.0| 0.95878977|
| 3| B|-23.0|-0.95878977|
+---+---+-----+-----------+
In [5]: transformedDF.show()
+---+---+-----+----------+
| x1| x2| x3| randFloat|
+---+---+-----+----------+
| 1| a| 23.0| 16.033003|
| 3| B|-23.0|-2.9191132|
+---+---+-----+----------+
Is this behavior expected? Does .show()
actually trigger the computation start? AFAIK I should be using a single node, sure they would run in a single thread so they would be sharing the random seed? I know a builtin pyspark rng
exists, but it is not suitable for my purpose as it wouldn't actually be generating the data from python space.
Well, expected is rather relative here but it is not something that cannot be explained. In particular the state of the RNG is inherited from the parent process. You can easily prove that by running following simple snippet in the local mode:
import random
def roll_and_get_state(*args):
random.random()
return [random.getstate()]
states = sc.parallelize([], 10).mapPartitions(roll_and_get_state).collect()
len(set(states))
## 1
As you can see each partition has is using its own RNG but all have the same state.
In general ensuring correct Python RNG behavior in Spark without a serious performance penalty, especially if you need reproducible results, is rather tricky.
One possible approach is to instantiate separate Random
instance per partition with seed generated using cryptographically safe random data (os.urandom
).
If you need reproducible results you can generate RNG seeds based on global state and partition data. Unfortunately this information is not easily accessible on runtime from Python (ignoring special cases like mapPartitionsWithIndex
).
Since partition level operations are not always applicably (like in case of UDF) you can achieve similar result by using singleton module or Borg pattern to initialize RNG for each executor.
See also:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With