Disclaimer: just starting to play with Spark.
I'm having troubles understanding the famous "Task not serializable" exception but my question is a little different from those I see on SO (or so I think).
I have a tiny custom RDD (TestRDD). It has a field which stores objects whose class does not implement Serializable (NonSerializable). I've set the "spark.serializer" config option to use Kryo. However, when I try count() on my RDD, I get the following:
Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable
Serialization stack:
- object not serializable (class: com.test.spark.NonSerializable, value: com.test.spark.NonSerializable@2901e052)
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable)
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)
When I look inside DAGScheduler.submitMissingTasks I see that it uses its closure serializer on my RDD, which is the Java serializer, not the Kryo serializer which I'd expect. I've read that Kryo has issues serializing closures and Spark always uses the Java serializer for closures but I don't quite understand how closures come into play here at all. All I'm doing here is this:
SparkConf conf = new SparkConf()
                         .setAppName("ScanTest")
                         .setMaster("local")
                         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext sc = new JavaSparkContext(conf);
TestRDD rdd = new TestRDD(sc.sc());
System.err.println(rdd.count());
That is, no mappers or anything which would require serialization of closures. OTOH this works:
sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count()
The Kryo serializer is used as expected, the closure serializer is not involved. If I didn't set the serializer property to Kryo, I'd get an exception here as well.
I appreciate any pointers explaining where the closure comes from and how to ensure that I can use Kryo to serialize custom RDDs.
UPDATE: here's TestRDD with its non-serializable field mNS:
class TestRDD extends RDD<String> {
    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);
    NonSerializable mNS = new NonSerializable();
    public TestRDD(final SparkContext _sc) {
        super(_sc,
              JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()),
              STRING_TAG);
    }
    @Override
    public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) {
        return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(),
                                                                     "test_" + thePartition.index(),
                                                                     "test_" + thePartition.index()).iterator()).asScala();
    }
    @Override
    public Partition[] getPartitions() {
        return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)};
    }
    static class TestPartition implements Partition {
        final int mIndex;
        public TestPartition(final int theIndex) {
            mIndex = theIndex;
        }
        public int index() {
            return mIndex;
        }
    }
}
When I look inside
DAGScheduler.submitMissingTasksI see that it uses its closure serializer on my RDD, which is the Java serializer, not the Kryo serializer which I'd expect.
SparkEnv supports two serializers, one named serializer which is used for serialization of your data, checkpointing, messaging between workers, etc and is available under spark.serializer configuration flag. The other is called closureSerializer under spark.closure.serializer which is used to check that your object is in fact serializable and is configurable for Spark <= 1.6.2 (but nothing other than JavaSerializer actually works) and hardcoded from 2.0.0 and above to JavaSerializer.
The Kryo closure serializer has a bug which make it unusable, you can see that bug under SPARK-7708 (this may be fixed with Kryo 3.0.0, but Spark is currently fixed with a specific version of Chill which is fixed on Kryo 2.2.1). Further, for Spark 2.0.x the JavaSerializer is now fixed instead of configurable (you can see it in this pull request). This mean that effectively we're stuck with the JavaSerializer for closure serialization.
Is this weird that we're using one serializer to submit tasks and other to serialize data between workers and such? definitely, but this is what we have.
To sum up, if you're setting the spark.serializer configuration, or using SparkContext.registerKryoClasses you'll be utilizing Kryo for most of your serialization in Spark. Having said that, for checking if a given class is serializable and serialization of tasks to workers, Spark will use JavaSerializer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With