Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cannot convert Catalyst type IntegerType to Avro type ["null","int"]

I've Spark Structured Streaming process build with Pyspark that reads a avro message from a kafka topic, make some transformations and load the data as avro in a target topic.

I use the ABRIS package (https://github.com/AbsaOSS/ABRiS) to serialize/deserialize the Avro from Confluent, integrating with Schema Registry.

The schema contains integer columns as follows:

{
  "name": "total_images",
  "type": [
    "null",
    "int"
  ],
  "default": null
},
{
  "name": "total_videos",
  "type": [
    "null",
    "int"
  ],
  "default": null
},

The process raises the following error: Cannot convert Catalyst type IntegerType to Avro type ["null","int"].

I've tried to convert the columns to be nullable but the error persists.

If someone have a suggestion I would appreciate that

like image 229
Matheus Dantas Avatar asked Oct 31 '25 08:10

Matheus Dantas


1 Answers

I burned hours on this one

Actually, It is unrelated to Abris dependency (behaviour is the same with native spark-avro apis)

There may be several root causes but in my case … using Spark 3.0.1, Scala with Dataset : it was related to encoder and wrong type in the case class handling datas.

Shortly, avro field defined with "type": ["null","int"] can’t be mapped to scala Int, it needs Option[Int]

Using the following code:

test("Avro Nullable field") {
val schema: String =
  """
    |{
    | "namespace": "com.mberchon.monitor.dto.avro",
    | "type": "record",
    | "name": "TestAvro",
    | "fields": [
    |  {"name": "strVal", "type": ["null", "string"]},
    |  {"name": "longVal",  "type": ["null", "long"]}
    |  ]
    |}
  """.stripMargin
val topicName = "TestNullableAvro"
val testInstance = TestAvro("foo",Some(Random.nextInt()))

import sparkSession.implicits._

val dsWrite:Dataset[TestAvro] = Seq(testInstance).toDS
val allColumns = struct(dsWrite.columns.head, dsWrite.columns.tail: _*)

dsWrite
  .select(to_avro(allColumns,schema) as 'value)
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrap)
  .option("topic", topicName)
  .save()

val dsRead:Dataset[TestAvro] = sparkSession.read
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrap)
  .option("subscribe", topicName)
  .option("startingOffsets", "earliest")
  .load()
  .select(from_avro(col("value"), schema) as 'Metric)
  .select("Metric.*")
  .as[TestAvro]

assert(dsRead.collect().contains(testInstance))

}

It fails if case class is defined as follow:

case class TestAvro(strVal:String,longVal:Long)

Cannot convert Catalyst type LongType to Avro type ["null","long"]. org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type LongType to Avro type ["null","long"]. at org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219) at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)

It works properly with:

case class TestAvro(strVal:String,longVal:Option[Long])

Btw, it would be more than nice to have support for SpecificRecord within Spark encoders (you can use Kryo but it is sub efficient) Since, in order to use efficiently typed Dataset with my avro data … I need to create additional case classes (which duplicates of my SpecificRecords).

like image 102
mberchon Avatar answered Nov 02 '25 12:11

mberchon