I know how to insert rows
df.write \
.format('jdbc') \
.option("url", url) \
.option("dbtable", table) \
.option("user", user) \
.option("password", password) \
.option("driver", "org.postgresql.Driver") \
.mode('append') \
.save()
But how to delete rows? like.
df = [Row(id=1), Row(id=2), ... ]
=> DELETE FROM TABLE WHERE id in df ...
Is it possible?
Spark doesn't support it. But I've done it with foreachPartition (Just use dataframe data..)
Like this Does Apache Spark SQL support MERGE clause?
df.rdd.coalesce(2).foreachPartition(partition => {
val connectionProperties = brConnect.value
val jdbcUrl = connectionProperties.getProperty("jdbcurl")
val user = connectionProperties.getProperty("user")
val password = connectionProperties.getProperty("password")
val driver = connectionProperties.getProperty("Driver")
Class.forName(driver)
val dbc: Connection = DriverManager.getConnection(jdbcUrl, user, password)
val db_batchsize = 1000
val sqlString = "INSERT employee USING values (?, ?, ?, ?)"
var pstmt: PreparedStatement = dbc.prepareStatement(sqlString)
partition.grouped(db_batchsize).foreach(batch => {
batch.foreach{ row =>
{
val id = row.id
val fname = row.fname
val lname = row.lname
val userid = row.userid
var pstmt: PreparedStatement =
pstmt.setLong(1, row.id)
pstmt.setString(2, row.fname)
pstmt.setString(3, row.lname)
pstmt.setString(4, row.userid)
pstmt.addBatch()
}
}
pstmt.executeBatch()
dbc.commit()
})
dbc.close()
})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With