The purpose of the following examples is to understand the difference of the two encoders in Spark Dataset.
I can do this:
val df = Seq((1, "a"), (2, "d")).toDF("id", "value")
import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
val myStructType = StructType(Seq(StructField("id", IntegerType), StructField("value", StringType)))
implicit val myRowEncoder = RowEncoder(myStructType)
val ds = df.map{case row => row}
ds.show
//+---+-----+
//| id|value|
//+---+-----+
//| 1| a|
//| 2| d|
//+---+-----+
I can also do this:
val df = Seq((1, "a"), (2, "d")).toDF("id", "value")
import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
implicit val myKryoEncoder: Encoder[Row] = Encoders.kryo[Row]
val ds = df.map{case row => row}
ds.show
//+--------------------+
//| value|
//+--------------------+
//|[01 00 6F 72 67 2...|
//|[01 00 6F 72 67 2...|
//+--------------------+
The only difference of the code is: one is using Kryo encoder, another is using RowEncoder.
Question:
Encoders.kryo simply creates an encoder that serializes objects of type T using Kryo
RowEncoder is an object in Scala with apply and other factory methods. RowEncoder can create ExpressionEncoder[Row] from a schema. Internally, apply creates a BoundReference for the Row type and returns a ExpressionEncoder[Row] for the input schema, a CreateNamedStruct serializer (using serializerFor internal method), a deserializer for the schema, and the Row type
RowEncoder knows about schema and uses it for serialization and deserialization.
Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance.
Kryo is good for efficiently storaging large dataset and network intensive application.
for more information you can refer to these links:
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-RowEncoder.html https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Encoders.html https://medium.com/@knoldus/kryo-serialization-in-spark-55b53667e7ab https://stackoverflow.com/questions/58946987/what-are-the-pros-and-cons-of-java-serialization-vs-kryo-serialization#:~:text=Kryo%20is%20significantly%20faster%20and,in%20advance%20for%20best%20performance.
According to Spark's documentation, SparkSQL does NOT use Kryo or Java serializations (standardly).
Kryo is for RDDs and not Dataframes or DataSets. Hence the question is a little off-beam afaik.
Does Kryo help in SparkSQL? This elaborates on custom objects, but...
UPDATED Answer after some free time
Your example was not really what I would call custom type. They are are just structs with primitives. No issue.
Kryo is a serializer, DS, DF's use Encoders for columnar advantage. Kryo is used internally by Spark for shuffling.
This user defined example
case class Foo(name: String, position: Point)
is one that we can do with DS or DF or via kryo. But what's the point with Tungsten and Catalyst working with "understanding the structure of the data"? and thus able to optimize. You also get a single binary value with kryo and I have found few examples of how to work successfully with it, e.g. JOIN.
KRYO Example
import org.apache.spark.sql.{Encoder, Encoders, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import spark.implicits._
case class Point(a: Int, b: Int)
case class Foo(name: String, position: Point)
implicit val PointEncoder: Encoder[Point] = Encoders.kryo[Point]
implicit val FooEncoder: Encoder[Foo] = Encoders.kryo[Foo]
val ds = Seq(new Foo("bar", new Point(0, 0))).toDS
ds.show()
returns:
+--------------------+
| value|
+--------------------+
|[01 00 D2 02 6C 6...|
+--------------------+
Encoder for DS using case class Example
import org.apache.spark.sql.{Encoder, Encoders, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import spark.implicits._
case class Point(a: Int, b: Int)
case class Foo(name: String, position: Point)
val ds = Seq(new Foo("bar", new Point(0, 0))).toDS
ds.show()
returns:
+----+--------+
|name|position|
+----+--------+
| bar| [0, 0]|
+----+--------+
This strikes me as the way to go with Spark, Tungsten, Catalyst.
Now, more complicated stuff is this when an Any is involved, but Any is not a good thing:
val data = Seq(
("sublime", Map(
"good_song" -> "santeria",
"bad_song" -> "doesn't exist")
),
("prince_royce", Map(
"good_song" -> 4,
"bad_song" -> "back it up")
)
)
val schema = List(
("name", StringType, true),
("songs", MapType(StringType, StringType, true), true)
)
val rdd= spark.sparkContext.parallelize(data)
rdd.collect
val df = spark.createDataFrame(rdd)
df.show()
df.printSchema()
returns:
Java.lang.UnsupportedOperationException: No Encoder found for Any.
Then this example is interesting that is a valid custom object use case Spark No Encoder found for java.io.Serializable in Map[String, java.io.Serializable]. But I would stay away from such.
Conclusions
Kryo vs Encoder vs Java Serialization in Spark? states that kryo is for RDD but that is for legacy; internally one can use it. Not 100% correct but actually to the point.
Spark: Dataset Serialization is also an informative link.
The stuff has evolved and the spirit is to not use kryo for DS, DF.
Hope this helps.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With