Suppose I have a PySpark DataFrame with columns date, item_id, item_type, and item_vol. Below is the sample input:

And I am trying to get the output as below:

Here, each item_id can have multiple item_types and item_vols. I want to transform this DataFrame into a wide format where each row represents a unique combination of date and item_id, and columns item_type_X and item_vol_X represent the Xth item type and item volume for that item ID, respectively. If an item ID has fewer than X item types or item volumes, the corresponding columns should be filled with null. Item item_type_X and item_vol_X have a limit of 5. Which becomes item_type_5 and item_vol_5
I was using the below code.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import when
from pyspark.sql.functions import col, collect_list
# create a SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()
# create the schema for the DataFrame
schema = StructType([
StructField("date", StringType(), True),
StructField("item_id", StringType(), True),
StructField("item_type", StringType(), True),
StructField("item_vol", IntegerType(), True)
])
# create the DataFrame
data = [
('2019-01-01', 'item3', 'aa', 1),
('2019-01-01', 'item3', 'bb', 2),
('2019-01-01', 'item67', 'cc', 4),
('2019-01-01', 'item67', 'dd', None),
('2019-01-01', 'item68', 'gas', 9),
]
new_df = spark.createDataFrame(data, schema).orderBy("date", "item_id", "item_type", "item_vol")
# show the DataFrame
new_df.show()
# group by date and item_id and collect lists of item_type and item_vol
ordered_df = new_df\
.groupBy("date", "item_id") \
.agg(collect_list("item_type").alias("item_type_list"),
collect_list(when(col("item_vol").isNull(), "null").otherwise(col("item_vol"))).alias("item_vol_list"))
ordered_df.show(5, False)
from pyspark.sql.functions import split, col
num_cols = 5
pivot_cols = []
for i in range(num_cols):
item_type_expr = col("item_type_list").getItem(i).alias(f"item_type_{i+1}")
item_vol_expr = col("item_vol_list").getItem(i).alias(f"item_vol_{i+1}")
pivot_cols.extend([item_type_expr, item_vol_expr])
# split the "item_type_list" and "item_vol_list" columns and store each item in different columns
split_df = ordered_df.select("date", "item_id",
*pivot_cols
)
# show the resulting DataFrame
split_df.show(10, False)
However, using collect_list does not retain the order of the rows. I wanted to keep item_type in ascending order. But using collect_list in groupBy sometimes messes up the order of the column. I mean like below:

It would be appreciated, if someone could help. Thanks.
# Define a window spec
W = Window.partitionBy('date', 'item_id').orderBy('item_type')
# Assign sequential numbers to uniquely identify rows per date and item_id
# such that row numbers are ordered by ascending order of item_type
new_df = new_df.withColumn('col', F.row_number().over(W))
# groupby and reshape with pivot
result = (
new_df
.groupBy('date', 'item_id')
.pivot('col')
.agg(F.first('item_type').alias('item_type'),
F.first('item_vol').alias('item_vol') )
)
# Rename and fill the missing columns with null
for i in range(1, 6):
if f'{i}_item_type' in result.columns:
result = result.withColumnRenamed(f'{i}_item_type', f'item_type_{i}')
result = result.withColumnRenamed(f'{i}_item_vol', f'item_vol_{i}')
else:
result = result.withColumn(f'item_type_{i}', F.lit(None))
result = result.withColumn(f'item_vol_{i}', F.lit(None))
+----------+-------+-----------+----------+-----------+----------+-----------+----------+-----------+----------+-----------+----------+
| date|item_id|item_type_1|item_vol_1|item_type_2|item_vol_2|item_type_3|item_vol_3|item_type_4|item_vol_4|item_type_5|item_vol_5|
+----------+-------+-----------+----------+-----------+----------+-----------+----------+-----------+----------+-----------+----------+
|2019-01-01| item3| aa| 1| bb| 2| null| null| null| null| null| null|
|2019-01-01| item67| cc| 4| dd| null| null| null| null| null| null| null|
|2019-01-01| item68| gas| 9| null| null| null| null| null| null| null| null|
+----------+-------+-----------+----------+-----------+----------+-----------+----------+-----------+----------+-----------+----------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With