Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Add all the dates (week) between two dates in new Row in spark Scala

convert spark data-frame

+----+---------+------+
|name|date     |amount|
+----+---------+------+
|Jhon|4/6/2018 |  100 |
|Jhon|4/6/2018 |  200 |
+----+---------+------+
|Jhon|4/13/2018|   300|
+----+---------+------+
|Jhon|4/20/2018 |  500|
+----+---------+------+
|Lee |5/4/2018 |  100 |
+----+---------+------+
|Lee |4/4/2018 |  200 |
+----+---------+------+
|Lee |5/4/2018 |  300 |
+----+---------+------+
|Lee |4/11/2018 |  700|
+----+---------+------+

To Expected Data-frame:

+----+---------+------+
|name|date     |amount|
+----+---------+------+
|Jhon|4/6/2018 |  100 |
|Jhon|4/6/2018 |  200 |
+----+---------+------+
|Jhon|4/13/2018|   100|
+----+---------+------+
|Jhon|4/13/2018|   200|
+----+---------+------+
|Jhon|4/13/2018|   300|
+----+---------+------+
|Jhon|4/20/2018 |  100 |
+----+---------+------+
|Jhon|4/20/2018 |  200|
+----+---------+------+
|Jhon|4/20/2018|   300|
+----+---------+------+
|Jhon|4/20/2018 |  500|
+----+---------+------+
|Lee |5/4/2018 |  100 |
+----+---------+------+
|Lee |5/4/2018 |  200 |
+----+---------+------+
|Lee |5/4/2018 |  300 |
+----+---------+------|
|Lee |5/11/2018 |  100|
+----+---------+------+
|Lee |4/11/2018 |  200|
+----+---------+------+
|Lee |5/11/2018 |  300|
+----+---------+------+
|Lee |4/11/2018 |  700|
+----+---------+------+

So here 300 is the new value for 04/13/2018 and 100,200 from 04/06/2018 will also shown for 04/13/2018, similarly for next Friday dates for distinct names. Do we have any way to do this in Spark Scala. Any help will be greatly appreciated.

My code is working for only name 'John' and only foFridayfriday date '4/6/2018' and 4/13/2018

def main(args: Array[String]){
    val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlc = new org.apache.spark.sql.SQLContext(sc)
    val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
    import ss.sqlContext.implicits._
    var df1 = sqlc.read.format("com.databricks.spark.csv")
             .option("header", "true")
             .option("inferSchema", "true")
             .load("oldRecords.csv")
    df1.show(false)
    println("---- df1 row count ----"+df1.count())
    if(df1.count()>0){
      for (i <- 0 until (df1.count().toInt)-1) {
        var df2 = df1.unionAll(df1)//.union(df1)//df3
        //df2.show(false)
        var w1 = org.apache.spark.sql.expressions.Window.orderBy("date")
        var df3 = df2.withColumn("previousAmount",  lag("amount",1).over(w1)).withColumn("newdate", lag("date",1).over(w1))
        // df3.show(false)
        var df4 = df3.filter((df3.col("newdate").isNotNull))//(df3.col("new_date").isNotNull)
        //df4.show(false)
        var df5 = df4.select("name","amount","newdate").distinct() 
        println("-----------"+df5.show(false))
        df1 = df5.withColumnRenamed("newdate", "date")
      }
    }
    }
like image 792
veeta Avatar asked Nov 28 '25 04:11

veeta


1 Answers

As per your question, If you are trying to add all the week to the highest date of that name. Here is what you can do.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.joda.time.LocalDate
// input data 
val dataDF  = Seq(
  ("Jhon", "4/6/2018", 100),
  ("Jhon", "4/6/2018", 200),
  ("Jhon", "4/13/2018", 300),
  ("Jhon", "4/20/2018", 500),
  ("Lee", "5/4/2018", 100),
  ("Lee", "4/4/2018", 200),
  ("Lee", "5/4/2018", 300),
  ("Lee", "4/11/2018", 700)
).toDF("name", "date", "amount")
  .withColumn("date", to_date($"date", "MM/dd/yyyy"))

val window = Window.partitionBy($"name")

//find the maximum date of each name
val df = dataDF.withColumn("maxDate", max($"date").over(window))

Create a UDF to find all weeks between two weeks

val calculateDate = udf((min: String, max: String) => {
  // to collect all the dates
  val totalDates = scala.collection.mutable.MutableList[LocalDate]()
  var start = LocalDate.parse(min)
  val end = LocalDate.parse(max)
  while ( {
    !start.isAfter(end)
  }) {
    totalDates += start
    start = start.plusWeeks(1)
  }
  totalDates.map(_.toString("MM/dd/yyyy"))
})

Now apply the UDF and explode the obtained array from UDF

val finalDf = df.withColumn("date", explode(calculateDate($"date", $"maxDate")))
                .drop("maxDate")

Output:

+----+----------+------+
|name|date      |amount|
+----+----------+------+
|Jhon|04/06/2018|100   |
|Jhon|04/13/2018|100   |
|Jhon|04/20/2018|100   |
|Jhon|04/06/2018|200   |
|Jhon|04/13/2018|200   |
|Jhon|04/20/2018|200   |
|Jhon|04/13/2018|300   |
|Jhon|04/20/2018|300   |
|Jhon|04/20/2018|500   |
|Lee |05/04/2018|100   |
|Lee |04/04/2018|200   |
|Lee |04/11/2018|200   |
|Lee |04/18/2018|200   |
|Lee |04/25/2018|200   |
|Lee |05/02/2018|200   |
|Lee |05/04/2018|300   |
|Lee |04/11/2018|700   |
|Lee |04/18/2018|700   |
|Lee |04/25/2018|700   |
|Lee |05/02/2018|700   |
+----+----------+------+

I hope this helps!

like image 119
koiralo Avatar answered Nov 30 '25 22:11

koiralo