Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Loop through large dataframe in Pyspark - alternative

df_hrrchy

|lefId  |Lineage                               |
|-------|--------------------------------------|
|36326  |["36326","36465","36976","36091","82"]|
|36121  |["36121","36908","36976","36091","82"]|
|36380  |["36380","36465","36976","36091","82"]|
|36448  |["36448","36465","36976","36091","82"]|
|36683  |["36683","36465","36976","36091","82"]|
|36949  |["36949","36908","36976","36091","82"]|
|37349  |["37349","36908","36976","36091","82"]|
|37026  |["37026","36908","36976","36091","82"]|
|36879  |["36879","36465","36976","36091","82"]|

df_trans

|tranID     |   T_Id                                                                  |
|-----------|-------------------------------------------------------------------------|
|1000540    |["36121","36326","37349","36949","36380","37026","36448","36683","36879"]|


df_creds

|T_Id   |T_val  |T_Goal |Parent_T_Id    |Parent_Val      |parent_Goal|
|-------|-------|-------|---------------|----------------|-----------|
|36448  |100    |1      |36465          |200             |1          |
|36465  |200    |1      |36976          |300             |2          |
|36326  |90     |1      |36465          |200             |1          |
|36091  |500    |19     |82             |600             |4          |
|36121  |90     |1      |36908          |200             |1          |
|36683  |90     |1      |36465          |200             |1          |
|36908  |200    |1      |36976          |300             |2          |
|36949  |90     |1      |36908          |200             |1          |
|36976  |300    |2      |36091          |500             |19         |
|37026  |90     |1      |36908          |200             |1          |
|37349  |100    |1      |36908          |200             |1          |
|36879  |90     |1      |36465          |200             |1          |
|36380  |90     |1      |36465          |200             |1          |

Desired Result

T_id children T_Val T_Goal parent_T_id parent_Goal trans_id
36091 ["36976"] 500 19 82 4 1000540
36465 ["36448","36326","36683","36879","36380"] 200 1 36976 2 1000540
36908 ["36121","36949","37026","37349"] 200 1 36976 2 1000540
36976 ["36465","36908"] 300 2 36091 19 1000540
36683 null 90 1 36465 1 1000540
37026 null 90 1 36908 1 1000540
36448 null 100 1 36465 1 1000540
36949 null 90 1 36908 1 1000540
36326 null 90 1 36465 1 1000540
36380 null 90 1 36465 1 1000540
36879 null 90 1 36465 1 1000540
36121 null 90 1 36908 1 1000540
37349 null 100 1 36908 1 1000540

Code Tried

from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.functions import explode, collect_set, expr, col, collect_list,array_contains, lit
from functools import reduce


for row in df_transactions.rdd.toLocalIterator():
# def find_nodemap(row):
  dfs = [] 
  df_hy_set = (df_hrrchy.filter(df_hrrchy. lefId.isin(row["T_ds"]))
                      .select(explode("Lineage").alias("Terrs"))
                      .agg(collect_set(col("Terrs")).alias("hierarchy_list"))
                      .select(F.lit(row["trans_id"]).alias("trans_id "),"hierarchy_list")
                     )
  
  df_childrens = (df_creds.join(df_ hy _set, expr("array_contains(hierarchy_list, T_id)"))
        .select("T_id", "T_Val","T_Goal","parent_T_id", "parent_Goal", "trans _id" )
        .groupBy("parent_T_id").agg(collect_list("T_id").alias("children"))
       )
  df_filter_creds = (df_creds.join(df_ hy _set, expr("array_contains(hierarchy_list, T_id)"))
        .select ("T_id", "T_val","T_Goal","parent_T_id", "parent_Goal”, "trans_id")
       )
  df_nodemap = (df_filter_ creds.alias("A").join(df_childrens.alias("B"), col("A.T_id") == col("B.parent_T_id"), "left")
        .select("A.T_id","B.children", "A.T_val","A.terr_Goal","A.parent_T_id", "A.parent_Goal", "A.trans_ id")
       )
  display(df_nodemap)
#   dfs.append(df_nodemap)
  
# df = reduce(DataFrame.union, dfs)
# display(df)
# # display(df)

My problem - Its a bad design. df_trans is having millions of data and looping through dataframe , its taking forever. Without looping can I do it. I tried couple of other methods, not able to get the desired result.

like image 457
sys Avatar asked Oct 28 '25 03:10

sys


1 Answers

You certainly need to process entire DataFrame in batch, not iterate row by row.

Key points are to "reverse" df_hrrchy, ie. from parent lineage obtain list of children for every T_Id:

val df_children = df_hrrchy.withColumn("children", slice($"Lineage", lit(1), size($"Lineage") - 1))
                           .withColumn("parents", slice($"Lineage", 2, 999999))
                           .select(explode(arrays_zip($"children", $"parents")).as("rels"))
                           .distinct
                           .groupBy($"rels.parents".as("T_Id"))
                           .agg(collect_set($"rels.children").as("children"))
df_children.show(false)

+-----+-----------------------------------+
|T_Id |children                           |
+-----+-----------------------------------+
|36091|[36976]                            |
|36465|[36448, 36380, 36326, 36879, 36683]|
|36976|[36465, 36908]                     |
|82   |[36091]                            |
|36908|[36949, 37349, 36121, 37026]       |
+-----+-----------------------------------+

then expand list of T_Ids in df_trans and also include all T_Ids from the hierarchy:

val df_trans_map = df_trans.withColumn("T_Id", explode($"T_Id"))
                           .join(df_hrrchy, array_contains($"Lineage", $"T_Id"))
                           .select($"tranID", explode($"Lineage").as("T_Id"))
                           .distinct
df_trans_map.show(false)

+-------+-----+
|tranID |T_Id |
+-------+-----+
|1000540|36976|
|1000540|82   |
|1000540|36091|
|1000540|36465|
|1000540|36326|
|1000540|36121|
|1000540|36908|
|1000540|36380|
|1000540|36448|
|1000540|36683|
|1000540|36949|
|1000540|37349|
|1000540|37026|
|1000540|36879|
+-------+-----+

With this it is just a simple join to obtain final result:

df_trans_map.join(df_creds, Seq("T_Id"))
            .join(df_children, Seq("T_Id"), "left_outer")
            .show(false)

+-----+-------+-----+------+-----------+----------+-----------+-----------------------------------+
|T_Id |tranID |T_val|T_Goal|Parent_T_Id|Parent_Val|parent_Goal|children                           |
+-----+-------+-----+------+-----------+----------+-----------+-----------------------------------+
|36976|1000540|300  |2     |36091      |500       |19         |[36465, 36908]                     |
|36091|1000540|500  |19    |82         |600       |4          |[36976]                            |
|36465|1000540|200  |1     |36976      |300       |2          |[36448, 36380, 36326, 36879, 36683]|
|36326|1000540|90   |1     |36465      |200       |1          |null                               |
|36121|1000540|90   |1     |36908      |200       |1          |null                               |
|36908|1000540|200  |1     |36976      |300       |2          |[36949, 37349, 36121, 37026]       |
|36380|1000540|90   |1     |36465      |200       |1          |null                               |
|36448|1000540|100  |1     |36465      |200       |1          |null                               |
|36683|1000540|90   |1     |36465      |200       |1          |null                               |
|36949|1000540|90   |1     |36908      |200       |1          |null                               |
|37349|1000540|100  |1     |36908      |200       |1          |null                               |
|37026|1000540|90   |1     |36908      |200       |1          |null                               |
|36879|1000540|90   |1     |36465      |200       |1          |null                               |
+-----+-------+-----+------+-----------+----------+-----------+-----------------------------------+
like image 93
Kombajn zbożowy Avatar answered Oct 29 '25 18:10

Kombajn zbożowy