Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can I use Spark DataFrame inside regular Spark map operation?

I tried to use defined before Spark DataFrame from a regular Spark map operation like below:

businessJSON = os.path.join(targetDir, 'business.json')
businessDF = sqlContext.read.json(businessJSON)

reviewsJSON = os.path.join(targetDir, 'review.json')
reviewsDF = sqlContext.read.json(reviewsJSON)

contains = udf(lambda xs, val: val in xs, BooleanType())

def selectReviews(category):
    businessesByCategory = businessDF[contains(businessDF.categories, lit(category))]
    selectedReviewsDF = reviewsDF.join(businessesByCategory,\
                                   businessesByCategory.business_id == reviewsDF.business_id)      
    return selectedReviewsDF.select("text").map(lambda x: x.text)

categories = ['category1', 'category2'] 
rdd = (sc.parallelize(cuisines)
       .map(lambda c: (c, selectReviews(c)))
       )

rdd.take(1)

and I've got a huge error message:

Py4JError                                 Traceback (most recent call last)
<ipython-input-346-051af5183a76> in <module>()
     12        )
     13 
---> 14 rdd.take(1)

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/rdd.pyc in take(self, num)
   1275 
   1276             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1277             res = self.context.runJob(self, takeUpToNumLeft, p, True)
   1278 
   1279             items += res

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    894         # SparkContext#runJob.
    895         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 896         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions,
    897                                           allowLocal)
    898         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/rdd.pyc in _jrdd(self)
   2361         command = (self.func, profiler, self._prev_jrdd_deserializer,
   2362                    self._jrdd_deserializer)
-> 2363         pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
   2364         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
   2365                                              bytearray(pickled_cmd),

 /usr/local/Cellar/apache-spark/1.4.1/libexec/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command, obj)
   2281     # the serialized command will be compressed by broadcast
   2282     ser = CloudPickleSerializer()
-> 2283     pickled_command = ser.dumps(command)
   2284     if len(pickled_command) > (1 << 20):  # 1M
   2285         # The broadcast will have same life cycle as created PythonRDD

 ...

/Users/igorsokolov/anaconda/lib/python2.7/pickle.pyc in save(self, obj)
    304             reduce = getattr(obj, "__reduce_ex__", None)
    305             if reduce:
--> 306                 rv = reduce(self.proto)
    307             else:
    308                 reduce = getattr(obj, "__reduce__", None)

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/Cellar/apache-spark/1.4.1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    302                 raise Py4JError(
    303                     'An error occurred while calling {0}{1}{2}. Trace:\n{3}\n'.
--> 304                     format(target_id, '.', name, value))
    305         else:
    306             raise Py4JError(

Py4JError: An error occurred while calling o96495.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

I make some investigation to understand which line exactly leads to this error and I found out that the minimum code to get this error is:

def selectReviews(category):
    return reviewsDF.select("text")

rdd = (sc.parallelize(categories)
       .map(lambda c: (c, selectReviews(c)))
       )

rdd.take(1)

Thus I make conclusion I use somehow wrong DataFrame, but what exactly is not clear from the Spark documentation. I have suspicious that reviewsDF should be distributed across all machines in a cluster, but I guess since I've created using SqlContext this should be already in the Spark Context.

Thank you in advance.

like image 605
Igor Sokolov Avatar asked Jan 31 '26 01:01

Igor Sokolov


1 Answers

Spark is not re-entrant. Specifically, workers can not execute new RDD actions or transformations within a step of another action or transformation.

This issue occurs when selectReviews is called in a map's lambda function, which is occuring on a worker node, as selectReviews requires executing .select() on the RDD backing reviewsDF.

The workaround is to replace sc.parallelize with a simple for loop or similar, over categories, executed locally. Speedup from spark will still be involved in the data frame filtering that occurs in each call to selectReviews.

like image 192
Paul Avatar answered Feb 02 '26 16:02

Paul



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!