Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SPARK - Use RDD.foreach to Create a Dataframe and execute actions on the Dataframe

I am new to SPARK and figuring out a better way to achieve the following scenario. There is a database table containing 3 fields - Category, Amount, Quantity. First I try to pull all the distinct Categories from the database.

 val categories:RDD[String] = df.select(CATEGORY).distinct().rdd.map(r => r(0).toString)

Now for each category I want to execute the Pipeline which essentially creates dataframes from each category and apply some Machine Learning.

 categories.foreach(executePipeline)
 def execute(category: String): Unit = {
   val dfCategory = sqlCtxt.read.jdbc(JDBC_URL,"SELECT * FROM TABLE_NAME WHERE CATEGORY="+category)
dfCategory.show()    
}

Is it possible to do something like this ? Or is there any better alternative ?

like image 213
atreya biswas Avatar asked Dec 07 '25 08:12

atreya biswas


1 Answers

// You could get all your data with a single query and convert it to an rdd
val data = sqlCtxt.read.jdbc(JDBC_URL,"SELECT * FROM TABLE_NAME).rdd

// then group the data by category
val groupedData = data.groupBy(row => row.getAs[String]("category"))

// then you get an RDD[(String, Iterable[org.apache.spark.sql.Row])]
// and you can iterate over it and execute your pipeline
groupedData.map { case (categoryName, items) =>
  //executePipeline(categoryName, items)
}
like image 161
Daniel B. Avatar answered Dec 10 '25 02:12

Daniel B.



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!