Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use groupBy, collect_list, arrays_zip, & explode together in pyspark to solve certain business problem

I am new to pyspark world.
Want to join two DataFrames df and df_sd on colum days While joining it should also use column Name from df DataFrame. If there is no matching value for Name and days combination from df DataFrame then it should have null. Please see below code and desired output for better understanding.

 import findspark

findspark.init("/opt/spark")

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType

Mydata = Row("Name", "Number", "days")

spark = SparkSession \
    .builder \
    .appName("DataFrame Learning") \
    .getOrCreate()

sqlContext = SQLContext(spark)

mydata1 = Mydata("A", 100, 1)
mydata2 = Mydata("A", 200, 2)
mydata3 = Mydata("B", 300, 1)
mydata4 = Mydata("B", 400, 2)
mydata5 = Mydata("B", 500, 3)
mydata6 = Mydata("C", 600, 1)
myDataAll = [mydata1, mydata2, mydata3, mydata4, mydata5, mydata6]

STANDARD_TENORS = [1, 2, 3]

df_sd = spark.createDataFrame(STANDARD_TENORS, IntegerType())
df_sd = df_sd.withColumnRenamed("value", "days")

df_sd.show()

df = spark.createDataFrame(myDataAll)
df.show()
+----+
# |days|
# +----+
# |   1|
# |   2|
# |   3|
# +----+
# 
# +----+------+----+
# |Name|Number|days|
# +----+------+----+
# |   A|   100|   1|
# |   A|   200|   2|
# |   B|   300|   1|
# |   B|   400|   2|
# |   B|   500|   3|
# |   C|   600|   1|
# +----+------+----+

Please see below expected results from join


# +----+------+----+
# |Name|Number|days|
# +----+------+----+
# |   A|   100|   1|
# |   A|   200|   2|
# |   A|Null  |   3|
# |   B|   300|   1|
# |   B|   400|   2|
# |   B|   500|   3|
# |   C|   600|   1|
# |   C|Null  |   2|
# |   C|Null  |   3|
# +----+------+----+

like image 533
GPopat Avatar asked Jan 01 '26 02:01

GPopat


1 Answers

If df_sd will not be huge list, and you have spark2.4, you can do this by creating a new column in df with the list of days(1,2,3)and then use groupBy,collect_list, arrays_zip, & explode. The orderBy before the groupBy is there to ensure that the list gets collected in the right order.

df.show()
+----+------+----+
|Name|Number|days|
+----+------+----+
|   A|   100|   1|
|   A|   200|   2|
|   B|   300|   1|
|   B|   400|   2|
|   B|   500|   3|
|   C|   600|   1|
+----+------+----+
STANDARD_TENORS #->  [1, 2, 3] 
                #-> should be ordered



from pyspark.sql import functions as F
df.withColumn("days2", F.array(*[F.lit(x) for x in STANDARD_TENORS]))\
  .orderBy("Name","days")\
  .groupBy("Name").agg(F.collect_list("Number").alias("Number")\
                      ,F.first("days2").alias("days"))\
  .withColumn("zipped", F.explode(F.arrays_zip("Number","days")))\
  .select("Name","zipped.*").orderBy("Name","days").show()


+----+------+----+
|Name|Number|days|
+----+------+----+
|   A|   200|   1|
|   A|   100|   2|
|   A|  null|   3|
|   B|   300|   1|
|   B|   400|   2|
|   B|   500|   3|
|   C|   600|   1|
|   C|  null|   2|
|   C|  null|   3|
+----+------+----+

If you want to use join, you can do it in a similar manner:

from pyspark.sql import functions as F
df_sd.agg(F.collect_list("days").alias("days")).join(\
df.orderBy("Name","days").groupBy("Name")\
.agg(F.collect_list("Number").alias("Number"),F.collect_list("days").alias("days1")),\
                      F.size("days")>=F.size("days1")).drop("days1")\
     .withColumn("zipped", F.explode(F.arrays_zip("Number","days")))\
     .select("Name","zipped.*")\
     .orderBy("Name","days")\
     .show()

UPDATE:

Updated In order to handle any order whatsoever or for any value present in Number.. I could have made the code a little more concise, but I kept it like that so you can see all those columns I used in order to understand the logic. Feel free to ask any questions.

df.show()
#newsampledataframe
+----+------+----+
|Name|Number|days|
+----+------+----+
|   A|   100|   1|
|   A|   200|   2|
|   B|   300|   1|
|   B|   400|   2|
|   B|   500|   3|
|   C|   600|   3|
+----+------+----+
#STANDARD_TENORS = [1, 2, 3]

from pyspark.sql import functions as F
df.withColumn("days2", F.array(*[F.lit(x) for x in STANDARD_TENORS]))\
  .groupBy("Name").agg(F.collect_list("Number").alias("col1")\
                      ,F.first("days2").alias("days2"),F.collect_list("days").alias("x"))\
  .withColumn("days3", F.arrays_zip(F.col("col1"),F.col("x")))\
  .withColumn("days4", F.array_except("days2","x"))\
  .withColumn("day5", F.expr("""transform(days4,x-> struct(bigint(-1),x))"""))\
  .withColumn("days3", F.explode(F.array_union("days3","day5"))).select("Name","days3.*")\
  .withColumn("Number", F.when(F.col("col1")==-1, F.lit(None)).otherwise(F.col("col1"))).drop("col1")\
  .select("Name", "Number", F.col("x").alias("days"))\
  .orderBy("Name","days")\
  .show(truncate=False)
like image 92
murtihash Avatar answered Jan 04 '26 15:01

murtihash



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!