I am trying to read a table on postgres db using spark-jdbc. For that I have come up with the following code:
object PartitionRetrieval {
var conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.default.parallelism", "20")
val log = LogManager.getLogger("Spark-JDBC Program")
Logger.getLogger("org").setLevel(Level.ERROR)
val conFile = "/home/myuser/ReconTest/inputdir/testconnection.properties"
val properties = new Properties()
properties.load(new FileInputStream(conFile))
val connectionUrl = properties.getProperty("gpDevUrl")
val devUserName = properties.getProperty("devUserName")
val devPassword = properties.getProperty("devPassword")
val driverClass = properties.getProperty("gpDriverClass")
val tableName = "base.ledgers"
try {
Class.forName(driverClass).newInstance()
} catch {
case cnf: ClassNotFoundException =>
log.error("Driver class: " + driverClass + " not found")
System.exit(1)
case e: Exception =>
log.error("Exception: " + e.printStackTrace())
System.exit(1)
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
import spark.implicits._
val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()
val rc = gpTable.filter(gpTable("source_system_name")==="ORACLE" && gpTable("period_year")==="2017").count()
println("gpTable Count: " + rc)
}
}
Right now, I am fetching the count of the rows just to see if the connection is success or failed. It is a huge table and it runs slower to get the count which I understand as there are no parameters given for partition number and column name on which the data partition should happen.
In lot of places, I see the jdbc object is created in the below way:
val gpTable2 = spark.read.jdbc(connectionUrl, tableName, connectionProperties)
and I created it in another format using options.
I am unable to understand how to give the numPartitions, partition column name on which I want the data to be partitioned when the jdbc connection is formed using 'options': val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()
Could anyone let me know
How do I add the parameters: numPartitions, lowerBound, upperBound
to the jdbc object written in this way:
val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()
How to add just columnname and numPartition Since I want to fetch
all the rows that are from the year: 2017 and I don't want a range
of rows to be picked (lowerBound, upperBound)
The options numPartitions, lowerBound, upperBound and PartitionColumn control the parallel read in spark. You need a integral column for PartitionColumn. If you don't have any in suitable column in your table, then you can use ROW_NUMBER as your partition Column.
Give this a try,
val rowCount = spark.read.format("jdbc").option("url", connectionUrl)
.option("dbtable","(select count(*) AS count * from tableName where source_system_name = "ORACLE" AND "period_year = "2017")")
.option("user",devUserName)
.option("password",devPassword)
.load()
.collect()
.map(row => row.getAs[Int]("count")).head
We got the count of the rows returned for the provided predicate which can be used as the upperBount.
val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
.option("dbtable","(select ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS RNO, * from tableName source_system_name = "ORACLE" AND "period_year = "2017")")
.option("user",devUserName)
.option("password",devPassword)
.option("numPartitions", 10)
.option("partitionColumn", "RNO")
.option("lowerBound", 1)
.option("upperBound", rowCount)
.load()
The numPartitions depends on the number of parallel connection to your Postgres DB. You can adjust this based on the parallelization required while reading from your DB.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With