Spark 2.0 (final) with Scala 2.11.8. The following super simple code yields the compilation error Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
import org.apache.spark.sql.SparkSession
case class SimpleTuple(id: Int, desc: String)
object DatasetTest {
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder.
master("local")
.appName("example")
.getOrCreate()
val dataset = sparkSession.createDataset(dataList)
}
}
Spark Datasets require Encoders for data type which is about to be stored. For common types (atomics, product types) there is a number of predefined encoders available but you have to import these first from SparkSession.implicits to make it work:
val sparkSession: SparkSession = ???
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
Alternatively you can provide directly an explicit
import org.apache.spark.sql.{Encoder, Encoders}
val dataset = sparkSession.createDataset(dataList)(Encoders.product[SimpleTuple])
or implicit
implicit val enc: Encoder[SimpleTuple] = Encoders.product[SimpleTuple]
val dataset = sparkSession.createDataset(dataList)
Encoder for the stored type.
Note that Encoders also provide a number of predefined Encoders for atomic types, and Encoders for complex ones, can derived with ExpressionEncoder.
Further reading:
Row objects you have to provide Encoder explicitly as shown in Encoder error while trying to map dataframe row to updated row
For other users (yours is correct), note that you it's also important that the case class is defined outside of the object scope. So:
Fails:
object DatasetTest {
case class SimpleTuple(id: Int, desc: String)
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
val dataset = sparkSession.createDataset(dataList)
}
}
Add the implicits, still fails with the same error:
object DatasetTest {
case class SimpleTuple(id: Int, desc: String)
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
}
}
Works:
case class SimpleTuple(id: Int, desc: String)
object DatasetTest {
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
}
}
Here's the relevant bug: https://issues.apache.org/jira/browse/SPARK-13540, so hopefully it will be fixed in the next release of Spark 2.
(Edit: Looks like that bugfix is actually in Spark 2.0.0... So I'm not sure why this still fails).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With