Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark group elements by column and creating dictionaries

I have a Spark Dataframe read from a csv file in this way:

df = ss.read \
     .format("csv") \
     .option("delimiter", ";") \
     .option("header", "false") \
     .option("inferSchema", "true") \
     .option("escape", "\"") \
     .option("multiline", "true") \
     .option("wholeFile", "true") \
     .load(file_path)

The Dataframe is like this one:

|cod_cli|article_name|rank|
|123    |art_1       |1   |
|123    |art_2       |2   |
|123    |art_3       |3   |
|456    |art_4       |1   |
|456    |art_5       |2   |
|456    |art_6       |3   |

I want to group the element by the column cod_cli and create multiple columns, one for each product in the grouped set, and as a value a dictionary key-value with the key as the column name and as a value the value related to that column name, like this:

|cod_cli|Product 1                  |Product 2                  |Product 3                  |
|123    |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456    |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|

The dictionary value can be both a string (better) or a map. I tried in this way:

df = df \
     .groupBy(F.col("cod_cli")) \
     .agg(F.collect_list(F.array("cod_art","rank")))

But in this way, I'm creating a column with an array column with all the grouped elements.

Please anyone can help me?

Thank you

UPDATE

The solution proposed is this one:

df = df.withColumn(
            "Product",
            F.to_json(
                F.struct(F.col("cod_art"), F.col("rank"))
            )
        )

In this way, I create a column "Product" with the json string desired, example {cod_art : art_1, rank : 1}.

Then:

df = df \
     .groupBy(F.col("cod_cli")) \
     .pivot("rank") \
     .agg(F.first("Product"))

In this way, I can create one column for each product, grouped by the cod_cli attribute, and handle situations in which I have more than 3 products as columns:

|cod_cli|1                          |2                          |3               
|123    |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456    |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|
like image 942
br1 Avatar asked Oct 21 '25 09:10

br1


1 Answers

You can do this without pivot(expensive operation), using collect_list of struct, then to_json with create_map.

from pyspark.sql import functions as F

df\
  .groupBy("cod_cli").agg(F.collect_list(F.struct("article_name","rank"))\
                          .alias("arr"))\
  .select("cod_cli", *(F.to_json(F.create_map(F.lit("cod_art"),(F.col("arr.article_name")[x]),F.lit("rank"),(F.col("arr.rank")[x])))\
                       .alias("Product{}".format(x+1)) for x in range(3)))\
  .show(truncate=False)

#+-------+------------------------------+------------------------------+------------------------------+
#|cod_cli|Product1                      |Product2                      |Product3                      |
#+-------+------------------------------+------------------------------+------------------------------+
#|123    |{"cod_art":"art_1","rank":"1"}|{"cod_art":"art_2","rank":"2"}|{"cod_art":"art_3","rank":"3"}|
#|456    |{"cod_art":"art_4","rank":"1"}|{"cod_art":"art_5","rank":"2"}|{"cod_art":"art_6","rank":"3"}|
#+-------+------------------------------+------------------------------+------------------------------+
like image 134
murtihash Avatar answered Oct 23 '25 00:10

murtihash



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!