Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Dataset appending unique ID

I'm looking whether there is an "already implemented alternative" to append unique ID on a spark dataset.

My scenario: I have an incremental job that runs each day processing a batch of information. In this job, I create a dimension table of something and assign unique IDs to each row using monotonically_increasing_id(). On next day, I want to append some rows to that something table and want to generate unique IDs for those rows.

Example:

day 1:

something_table    
uniqueID   name
100001     A
100002     B

day 2:

something_table
uniqueId   name
100001     A
100002     B
100003     C -- new data that must be created on day 2

Sniped code for day 1:

case class BasicSomething(name: String)
case class SomethingTable(id: Long, name: String)

val ds: Dataset[BasicSomething] = spark.createDataset(Seq(BasicSomething("A"), BasicSomething("B")))

ds.withColumn("uniqueId", monotonically_increasing_id())
.as[SomethingTable]
.write.csv("something")

I have no idea of how to keep state for monotonically_increasing_id() in a way that in the next day it will know the existing ids from something_table unique id.

like image 835
Henrique dos Santos Goulart Avatar asked Nov 15 '25 16:11

Henrique dos Santos Goulart


1 Answers

You can always get the last uniqueId of a dataset that you have created. Thus you can use that uniqueId with monotically_increasing_id() and create new uniqueIds.

ds.withColumn("uniqueId", monotonically_increasing_id()+last uniqueId of previous dataframe)
like image 96
Ramesh Maharjan Avatar answered Nov 18 '25 12:11

Ramesh Maharjan