Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming: NullPointerException inside foreachPartition

I have a spark streaming job which reads from Kafka and does some comparisons with an existing table in Postgres before writing to Postrges again. This is what it looks like :

val message = KafkaUtils.createStream(...).map(_._2)

message.foreachRDD( rdd => {

  if (!rdd.isEmpty){
    val kafkaDF = sqlContext.read.json(rdd)
    println("First")

    kafkaDF.foreachPartition(
      i =>{
        val jdbcDF = sqlContext.read.format("jdbc").options(
          Map("url" -> "jdbc:postgresql://...",
            "dbtable" -> "table", "user" -> "user", "password" -> "pwd" )).load()

        createConnection()
        i.foreach(
          row =>{
            println("Second")
            connection.sendToTable()
          }
        )
        closeConnection()
      }
    )

This code is giving me NullPointerException at the line val jbdcDF = ...

What am I doing wrong? Also, my log "First" works, but "Second" doesn't show up anywhere in the logs. I tried the entire code with kafkaDF.collect().foreach(...) and it works perfectly, but has very poor performance. I am looking to replace it with foreachPartition.

Thanks

like image 910
void Avatar asked Oct 19 '25 21:10

void


2 Answers

It is not clear if there are any issues inside createConnection, closeConnection or connection.sendToTable but fundamental problem is an attempt to nest actions / transformations. It is not supported in Spark and Spark Streaming is not different.

It means that nested DataFrame initialization (val jdbcDF = sqlContext.read.format ...) simply cannot work and should be removed. If you use it as a reference it should be created at the same level as kafkaDF and refferenced using standard transformations (unionAll, join, ...).

If for some reason it is not an acceptable solution you can create plain JDBC connection inside forEachPartition and operate on PostgreSQL table (I guess it is what you're already do inside sendToTable).

like image 180
zero323 Avatar answered Oct 21 '25 10:10

zero323


As @zero323 correctly pointed out, you can't broadcast your jdbc connection around and you cannot create nested RDDs either. Spark simply does not support using sparkContext or sqlContext for that matter within an existing closure, i.e. foreachPartition, hence the null pointer exception.

The only way to solve this efficiently is to create a JDBC connection within foreachPartition and execute SQL directly on it to do whatever you intended and then use that same connection to write back the records.

As to your second, edited, question:

Change:

kafkaDF.foreachPartition(..)

to

kafkaDF.repartition(numPartition).foreachPartition(..)

where numPartition is the desired number of partitions. This will increase the number of partitions. If you have multiple executors (and multiple tasks per executor), these will run in parallel.

like image 40
Erik Schmiegelow Avatar answered Oct 21 '25 11:10

Erik Schmiegelow