Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to serialize jdbc connection for spark node distrobution in a foreach

My end goal is to get apache spark to use a jdbc connection to a mysql database for transporting mapped RDD data in scala. Going about this has led to an error explaining that the simply jdbc code I'm using could not be serialized. How do I allow the jdbc class to be serialized?

like image 356
CodingIsAwesome Avatar asked Sep 03 '25 17:09

CodingIsAwesome


2 Answers

Typically, the DB session in a driver cannot be serialized b/c it involves threads and open TCP connections to the underlying DB.

As @aaronman mentions, the easiest way at the moment is to include the creation of the driver connection in the closure in a partition foreach. That way you won't have serialization issues with the Driver.

This is a skeleton code of how this can be done:

rdd.foreachPartition {
    msgIterator => {
      val cluster = Cluster.builder.addContactPoint(host).build()
      val session  = cluster.connect(db)
      msgIterator.foreach {msg =>
        ...
        session.execute(statement)
      }
      session.close
    }
}

As SparkSQL continues to evolve, I expect to have improved support for DB connectivity coming in the future. For example, DataStax created a Cassandra-Spark driver that abstracts out the connection creation per worker in an efficient way, improving on resource usage.

Look also at JdbcRDD which adds the connection handling as a function (executed on the workers)

like image 173
maasg Avatar answered Sep 07 '25 07:09

maasg


A JDBC connection object is associated to a specific TCP connection and socket port and hence cannot be serialized. So you should create the JDBC connection in the remote executor JVM process not in the driver JVM process.

One way of achieving this is to have the connection object as a field in a singleton object in Scala (or a static field in Java) as shown below. In the below snippet the statement val session = ExecutorSingleton.session is not executed in the driver but the statement is shipped off to the Executor where it is executed.

case class ConnectionProfile(host: String, username: String, password: String)

object ExecutorSingleton {
  var profile: ConnectionProfile = _
  lazy val session = createConnection(profile)
  def createJDBCSession(profile: ConnectionProfile) = { ... }
}

rdd.foreachPartition {
    msgIterator => {
      ExecutorSingleton.profile = ConnectionProfile("host", "username", "password")
      msgIterator.foreach {msg =>
        val session = ExecutorSingleton.session
        session.execute(msg)
      }
    }
}
like image 27
rogue-one Avatar answered Sep 07 '25 08:09

rogue-one