I am trying to fetch rows from a lookup table (3 rows and 3 columns) and iterate row by row and pass values in each row to a SPARK SQL as parameters.
DB | TBL | COL
----------------
db | txn | ID
db | sales | ID
db | fee | ID
I tried this in spark shell for one row, it worked. But I am finding it difficult to iterate over rows.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val db_name:String = "db"
val tbl_name:String = "transaction"
val unique_col:String = "transaction_number"
val dupDf = sqlContext.sql(s"select count(*), transaction_number from $db_name.$tbl_name group by $unique_col having count(*)>1")
Please let me know how I can iterate over the rows and pass as parameters ?
Above 2 approaches are may be right in general.. but some how I dont like collecting the data because of performance reasons... specially if data is huge...
org.apache.spark.util.CollectionAccumulator
is right candidate for this kind of requirements... see docs
Also if data is huge then foreachPartition
is right candidate for this for performance reasons again!
Below is the implementation
package examples
import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.CollectionAccumulator
import scala.collection.JavaConversions._
import scala.collection.mutable
object TableTest extends App {
val logger = org.apache.log4j.Logger.getLogger("org")
logger.setLevel(Level.WARN)
val spark = SparkSession.builder.appName(getClass.getName)
.master("local[*]").getOrCreate
import spark.implicits._
val lookup =
Seq(("db", "txn", "ID"), ("db", "sales", "ID")
, ("db", "fee", "ID")
).toDF("DB", "TBL", "COL")
val collAcc: CollectionAccumulator[String] = spark.sparkContext.collectionAccumulator[String]("mySQL Accumulator")
val data = lookup.foreachPartition { partition =>
partition.foreach {
{
record => {
val selectString = s"select count(*), transaction_number from ${record.getAs[String]("DB")}.${record.getAs[String]("TBL")} group by ${record.getAs[String]("COL")} having count(*)>1";
collAcc.add(selectString)
println(selectString)
}
}
}
}
val mycollectionOfSelects: mutable.Seq[String] = asScalaBuffer(collAcc.value)
val finaldf = mycollectionOfSelects.map { x => spark.sql(x)
}.reduce(_ union _)
finaldf.show
}
Sample Result :
[2019-08-13 12:11:16,458] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[Stage 0:> (0 + 0) / 2]
select count(*), transaction_number from db.txn group by ID having count(*)>1
select count(*), transaction_number from db.sales group by ID having count(*)>1
select count(*), transaction_number from db.fee group by ID having count(*)>1
Note : since those are psuedo tables I have NOT displayed dataframe.
val lookup =
Seq(("db", "txn", "ID"), ("db", "sales", "ID")).toDF("DB", "TBL", "COL")
val data = lookup
.collect()
.map(
x =>
(x.getAs[String]("DB"), x.getAs[String]("TBL"), x.getAs[String]("COL"))
)
.map(
y =>
sparkSession.sql(
s"select count(*), transaction_number from ${y._1}.${y._2} group by ${y._3} having count(*)>1"
)
)
.reduce(_ union _)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With