Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get below result from source dataframe in pyspark

Tags:

pyspark

I have a dataframe like

+------------+--------------------+-------------+
|PERSON_NBR  |PERSON_VERSION_NBR  |RECRD_TYPE_CD|
+------------+--------------------+-------------+
|  0065321901|                   1|            1|
|  0065321901|                   2|            4|
|  0065321901|                   3|            5|
|  0065321901|                   4|            2|
|  0065321901|                   5|            6|
|  0065321901|                   6|            7|
|  0065321901|                   5|            2|
|  0065321901|                   6|            8|
|  0775123711|                   1|            1|
|  0775123711|                   2|            3|
|  0775123711|                   3|            2|
|  1237251722|                   1|            1|
|  1237251722|                   2|            3|
+------------+--------------------+-------------+

I want to add a new column NEW_VERSION as 1 and in case RECRD_TYPE_CD is 2 then increase 1 to the next record for each PERSON

Output:

+------------+--------------------+-------------+----------------+
|PERSON_NBR  |PERSON_VERSION_NBR  |RECRD_TYPE_CD| **NEW_VERSION**|
+------------+--------------------+-------------+----------------+
|  0065321901|                   1|            1|               1|
|  0065321901|                   2|            4|               1|
|  0065321901|                   3|            5|               1|
|  0065321901|                   4|          **2|             **1|
|  0065321901|                   5|            6|               2|
|  0065321901|                   6|            7|               2|
|  0065321901|                   7|          **2|             **2|
|  0065321901|                   8|            8|               3|
|  0775123711|                   1|            1|               1|
|  0775123711|                   2|            3|               1|
|  0775123711|                   3|            2|               1|
|  1237251722|                   1|            1|               1|
|  1237251722|                   2|            3|               1|
+------------+--------------------+-------------+----------------+

Thank you very much.

like image 869
Suman Avatar asked Jan 01 '26 19:01

Suman


1 Answers

Code

# Define a window spec
W = Window.partitionBy('PERSON_NBR').orderBy('PERSON_VERSION_NBR')

# Is record type 2?
is2 = F.col('RECRD_TYPE_CD') == 2

# Cumulative sum per PERSON_NBR to create a counter
cs = F.sum(is2.cast('int')).over(W)

# Shift the counter
lagged_cs = F.lag(cs).over(W)

# Fill 0 when counter is null
lagged_cs = F.when(lagged_cs.isNull(), 0).otherwise(lagged_cs) + 1

# Assign the column
df = df.select('*', lagged_cs.alias('VERSION'))

Result

df.show()
+----------+------------------+-------------+-------+
|PERSON_NBR|PERSON_VERSION_NBR|RECRD_TYPE_CD|VERSION|
+----------+------------------+-------------+-------+
|0065321901|                 1|            1|      1|
|0065321901|                 2|            4|      1|
|0065321901|                 3|            5|      1|
|0065321901|                 4|            2|      1|
|0065321901|                 5|            6|      2|
|0065321901|                 6|            7|      2|
|0065321901|                 7|            2|      2|
|0065321901|                 8|            8|      3|
|0775123711|                 1|            1|      1|
|0775123711|                 2|            3|      1|
|0775123711|                 3|            2|      1|
|1237251722|                 1|            1|      1|
|1237251722|                 2|            3|      1|
+----------+------------------+-------------+-------+

PS: Here I'm assuming that PERSON_VERSION_NBR contains unique values per PERSON_NBR on which a window can be ordered by

like image 148
Shubham Sharma Avatar answered Jan 06 '26 22:01

Shubham Sharma



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!