I tried to use defined before Spark DataFrame from a regular Spark map operation like below:
businessJSON = os.path.join(targetDir, 'business.json')
businessDF = sqlContext.read.json(businessJSON)
reviewsJSON = os.path.join(targetDir, 'review.json')
reviewsDF = sqlContext.read.json(reviewsJSON)
contains = udf(lambda xs, val: val in xs, BooleanType())
def selectReviews(category):
businessesByCategory = businessDF[contains(businessDF.categories, lit(category))]
selectedReviewsDF = reviewsDF.join(businessesByCategory,\
businessesByCategory.business_id == reviewsDF.business_id)
return selectedReviewsDF.select("text").map(lambda x: x.text)
categories = ['category1', 'category2']
rdd = (sc.parallelize(cuisines)
.map(lambda c: (c, selectReviews(c)))
)
rdd.take(1)
and I've got a huge error message:
Py4JError Traceback (most recent call last)
<ipython-input-346-051af5183a76> in <module>()
12 )
13
---> 14 rdd.take(1)
/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/rdd.pyc in take(self, num)
1275
1276 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1277 res = self.context.runJob(self, takeUpToNumLeft, p, True)
1278
1279 items += res
/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
894 # SparkContext#runJob.
895 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 896 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions,
897 allowLocal)
898 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/rdd.pyc in _jrdd(self)
2361 command = (self.func, profiler, self._prev_jrdd_deserializer,
2362 self._jrdd_deserializer)
-> 2363 pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
2364 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
2365 bytearray(pickled_cmd),
/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command, obj)
2281 # the serialized command will be compressed by broadcast
2282 ser = CloudPickleSerializer()
-> 2283 pickled_command = ser.dumps(command)
2284 if len(pickled_command) > (1 << 20): # 1M
2285 # The broadcast will have same life cycle as created PythonRDD
...
/Users/igorsokolov/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
304 reduce = getattr(obj, "__reduce_ex__", None)
305 if reduce:
--> 306 rv = reduce(self.proto)
307 else:
308 reduce = getattr(obj, "__reduce__", None)
/usr/local/Cellar/apache-spark/1.4.1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/usr/local/Cellar/apache-spark/1.4.1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
302 raise Py4JError(
303 'An error occurred while calling {0}{1}{2}. Trace:\n{3}\n'.
--> 304 format(target_id, '.', name, value))
305 else:
306 raise Py4JError(
Py4JError: An error occurred while calling o96495.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
I make some investigation to understand which line exactly leads to this error and I found out that the minimum code to get this error is:
def selectReviews(category):
return reviewsDF.select("text")
rdd = (sc.parallelize(categories)
.map(lambda c: (c, selectReviews(c)))
)
rdd.take(1)
Thus I make conclusion I use somehow wrong DataFrame, but what exactly is not clear from the Spark documentation. I have suspicious that reviewsDF should be distributed across all machines in a cluster, but I guess since I've created using SqlContext this should be already in the Spark Context.
Thank you in advance.
Spark is not re-entrant. Specifically, workers can not execute new RDD actions or transformations within a step of another action or transformation.
This issue occurs when selectReviews is called in a map's lambda function, which is occuring on a worker node, as selectReviews requires executing .select() on the RDD backing reviewsDF.
The workaround is to replace sc.parallelize with a simple for loop or similar, over categories, executed locally. Speedup from spark will still be involved in the data frame filtering that occurs in each call to selectReviews.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With