Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adding a constant value to each partition using Spark Scala

I'm trying to add an id to every single group of dates using Spark Scala.

For example, if the input was:

date
2019-01-29
2019-01-29
2019-07-31
2019-01-29
2019-07-31

The output would be:

id, date
ABC1, 2019-01-29
ABC1, 2019-01-29
ABC1, 2019-01-29
ABC2, 2019-07-31
ABC2, 2019-07-31

Can anyone help me with this?

I was successful with adding sequential line numbers for each partition, but I would like a constant value for each partition.

df.withColumn(lineNumColName, row_number().over(Window.partitionBy(partitionByCol).orderBy(orderByCol))).repartition(1).orderBy(orderByCol, lineNumColName)
like image 841
Tiffany Avatar asked Sep 06 '25 02:09

Tiffany


1 Answers

Option 1 (small dataset):

If you dataset is not to large you can use Window and dense_rank as shown next:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{concat,lit, dense_rank}

val df = Seq(("2019-01-29"),
("2019-01-29"),
("2019-07-31"),
("2019-01-29"),
("2019-07-31")).toDF("date")

val w = Window.orderBy($"date") 
val d_rank = dense_rank().over(w)
df.withColumn("id",  concat(lit("ABC"), d_rank)).show(false)

Output:

+----------+----+
|date      |id  |
+----------+----+
|2019-01-29|ABC1|
|2019-01-29|ABC1|
|2019-01-29|ABC1|
|2019-07-31|ABC2|
|2019-07-31|ABC2|
+----------+----+

Since we don't specify any value for the partitionBy part this will use only one partition and therefore it will be very inefficient.

Option 2 (large dataset):

A more efficient approach would be to assign ids to a large dataset using the zipWithIndex function:

val df_d = df.distinct.rdd.zipWithIndex().map{ r => (r._1.getString(0), r._2 + 1) }.toDF("date", "id")
df_d.show

// Output:
+----------+---+
|      date| id|
+----------+---+
|2019-01-29|  1|
|2019-07-31|  2|
+----------+---+

First we get the unique value of the dataframe with distinct then we call zipWithIndex to create a unique id for each date record.

Finally we join the two datasets:

df.join(df_d, Seq("date"))
.withColumn("id",  concat(lit("ABC"), $"id"))
.show

// Output:
+----------+----+
|      date|  id|
+----------+----+
|2019-01-29|ABC1|
|2019-01-29|ABC1|
|2019-01-29|ABC1|
|2019-07-31|ABC2|
|2019-07-31|ABC2|
+----------+----+
like image 149
abiratsis Avatar answered Sep 07 '25 22:09

abiratsis