My end goal is to get apache spark to use a jdbc connection to a mysql database for transporting mapped RDD data in scala. Going about this has led to an error explaining that the simply jdbc code I'm using could not be serialized. How do I allow the jdbc class to be serialized?
Typically, the DB session in a driver cannot be serialized b/c it involves threads and open TCP connections to the underlying DB.
As @aaronman mentions, the easiest way at the moment is to include the creation of the driver connection in the closure in a partition foreach. That way you won't have serialization issues with the Driver.
This is a skeleton code of how this can be done:
rdd.foreachPartition {
msgIterator => {
val cluster = Cluster.builder.addContactPoint(host).build()
val session = cluster.connect(db)
msgIterator.foreach {msg =>
...
session.execute(statement)
}
session.close
}
}
As SparkSQL continues to evolve, I expect to have improved support for DB connectivity coming in the future. For example, DataStax created a Cassandra-Spark driver that abstracts out the connection creation per worker in an efficient way, improving on resource usage.
Look also at JdbcRDD which adds the connection handling as a function (executed on the workers)
A JDBC connection object is associated to a specific TCP connection and socket port and hence cannot be serialized. So you should create the JDBC connection in the remote executor JVM process not in the driver JVM process.
One way of achieving this is to have the connection object as a field in a singleton object in Scala (or a static field in Java) as shown below. In the below snippet the statement val session = ExecutorSingleton.session
is not executed in the driver but the statement is shipped off to the Executor where it is executed.
case class ConnectionProfile(host: String, username: String, password: String)
object ExecutorSingleton {
var profile: ConnectionProfile = _
lazy val session = createConnection(profile)
def createJDBCSession(profile: ConnectionProfile) = { ... }
}
rdd.foreachPartition {
msgIterator => {
ExecutorSingleton.profile = ConnectionProfile("host", "username", "password")
msgIterator.foreach {msg =>
val session = ExecutorSingleton.session
session.execute(msg)
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With