df_hrrchy
|lefId  |Lineage                               |
|-------|--------------------------------------|
|36326  |["36326","36465","36976","36091","82"]|
|36121  |["36121","36908","36976","36091","82"]|
|36380  |["36380","36465","36976","36091","82"]|
|36448  |["36448","36465","36976","36091","82"]|
|36683  |["36683","36465","36976","36091","82"]|
|36949  |["36949","36908","36976","36091","82"]|
|37349  |["37349","36908","36976","36091","82"]|
|37026  |["37026","36908","36976","36091","82"]|
|36879  |["36879","36465","36976","36091","82"]|
df_trans
|tranID     |   T_Id                                                                  |
|-----------|-------------------------------------------------------------------------|
|1000540    |["36121","36326","37349","36949","36380","37026","36448","36683","36879"]|
df_creds
|T_Id   |T_val  |T_Goal |Parent_T_Id    |Parent_Val      |parent_Goal|
|-------|-------|-------|---------------|----------------|-----------|
|36448  |100    |1      |36465          |200             |1          |
|36465  |200    |1      |36976          |300             |2          |
|36326  |90     |1      |36465          |200             |1          |
|36091  |500    |19     |82             |600             |4          |
|36121  |90     |1      |36908          |200             |1          |
|36683  |90     |1      |36465          |200             |1          |
|36908  |200    |1      |36976          |300             |2          |
|36949  |90     |1      |36908          |200             |1          |
|36976  |300    |2      |36091          |500             |19         |
|37026  |90     |1      |36908          |200             |1          |
|37349  |100    |1      |36908          |200             |1          |
|36879  |90     |1      |36465          |200             |1          |
|36380  |90     |1      |36465          |200             |1          |
Desired Result
| T_id | children | T_Val | T_Goal | parent_T_id | parent_Goal | trans_id | 
|---|---|---|---|---|---|---|
| 36091 | ["36976"] | 500 | 19 | 82 | 4 | 1000540 | 
| 36465 | ["36448","36326","36683","36879","36380"] | 200 | 1 | 36976 | 2 | 1000540 | 
| 36908 | ["36121","36949","37026","37349"] | 200 | 1 | 36976 | 2 | 1000540 | 
| 36976 | ["36465","36908"] | 300 | 2 | 36091 | 19 | 1000540 | 
| 36683 | null | 90 | 1 | 36465 | 1 | 1000540 | 
| 37026 | null | 90 | 1 | 36908 | 1 | 1000540 | 
| 36448 | null | 100 | 1 | 36465 | 1 | 1000540 | 
| 36949 | null | 90 | 1 | 36908 | 1 | 1000540 | 
| 36326 | null | 90 | 1 | 36465 | 1 | 1000540 | 
| 36380 | null | 90 | 1 | 36465 | 1 | 1000540 | 
| 36879 | null | 90 | 1 | 36465 | 1 | 1000540 | 
| 36121 | null | 90 | 1 | 36908 | 1 | 1000540 | 
| 37349 | null | 100 | 1 | 36908 | 1 | 1000540 | 
Code Tried
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.functions import explode, collect_set, expr, col, collect_list,array_contains, lit
from functools import reduce
for row in df_transactions.rdd.toLocalIterator():
# def find_nodemap(row):
  dfs = [] 
  df_hy_set = (df_hrrchy.filter(df_hrrchy. lefId.isin(row["T_ds"]))
                      .select(explode("Lineage").alias("Terrs"))
                      .agg(collect_set(col("Terrs")).alias("hierarchy_list"))
                      .select(F.lit(row["trans_id"]).alias("trans_id "),"hierarchy_list")
                     )
  
  df_childrens = (df_creds.join(df_ hy _set, expr("array_contains(hierarchy_list, T_id)"))
        .select("T_id", "T_Val","T_Goal","parent_T_id", "parent_Goal", "trans _id" )
        .groupBy("parent_T_id").agg(collect_list("T_id").alias("children"))
       )
  df_filter_creds = (df_creds.join(df_ hy _set, expr("array_contains(hierarchy_list, T_id)"))
        .select ("T_id", "T_val","T_Goal","parent_T_id", "parent_Goal”, "trans_id")
       )
  df_nodemap = (df_filter_ creds.alias("A").join(df_childrens.alias("B"), col("A.T_id") == col("B.parent_T_id"), "left")
        .select("A.T_id","B.children", "A.T_val","A.terr_Goal","A.parent_T_id", "A.parent_Goal", "A.trans_ id")
       )
  display(df_nodemap)
#   dfs.append(df_nodemap)
  
# df = reduce(DataFrame.union, dfs)
# display(df)
# # display(df)
My problem - Its a bad design. df_trans is having millions of data and looping through dataframe , its taking forever. Without looping can I do it. I tried couple of other methods, not able to get the desired result.
You certainly need to process entire DataFrame in batch, not iterate row by row.
Key points are to "reverse" df_hrrchy, ie. from parent lineage obtain list of children for every T_Id:
val df_children = df_hrrchy.withColumn("children", slice($"Lineage", lit(1), size($"Lineage") - 1))
                           .withColumn("parents", slice($"Lineage", 2, 999999))
                           .select(explode(arrays_zip($"children", $"parents")).as("rels"))
                           .distinct
                           .groupBy($"rels.parents".as("T_Id"))
                           .agg(collect_set($"rels.children").as("children"))
df_children.show(false)
+-----+-----------------------------------+
|T_Id |children                           |
+-----+-----------------------------------+
|36091|[36976]                            |
|36465|[36448, 36380, 36326, 36879, 36683]|
|36976|[36465, 36908]                     |
|82   |[36091]                            |
|36908|[36949, 37349, 36121, 37026]       |
+-----+-----------------------------------+
then expand list of T_Ids in df_trans and also include all T_Ids from the hierarchy:
val df_trans_map = df_trans.withColumn("T_Id", explode($"T_Id"))
                           .join(df_hrrchy, array_contains($"Lineage", $"T_Id"))
                           .select($"tranID", explode($"Lineage").as("T_Id"))
                           .distinct
df_trans_map.show(false)
+-------+-----+
|tranID |T_Id |
+-------+-----+
|1000540|36976|
|1000540|82   |
|1000540|36091|
|1000540|36465|
|1000540|36326|
|1000540|36121|
|1000540|36908|
|1000540|36380|
|1000540|36448|
|1000540|36683|
|1000540|36949|
|1000540|37349|
|1000540|37026|
|1000540|36879|
+-------+-----+
With this it is just a simple join to obtain final result:
df_trans_map.join(df_creds, Seq("T_Id"))
            .join(df_children, Seq("T_Id"), "left_outer")
            .show(false)
+-----+-------+-----+------+-----------+----------+-----------+-----------------------------------+
|T_Id |tranID |T_val|T_Goal|Parent_T_Id|Parent_Val|parent_Goal|children                           |
+-----+-------+-----+------+-----------+----------+-----------+-----------------------------------+
|36976|1000540|300  |2     |36091      |500       |19         |[36465, 36908]                     |
|36091|1000540|500  |19    |82         |600       |4          |[36976]                            |
|36465|1000540|200  |1     |36976      |300       |2          |[36448, 36380, 36326, 36879, 36683]|
|36326|1000540|90   |1     |36465      |200       |1          |null                               |
|36121|1000540|90   |1     |36908      |200       |1          |null                               |
|36908|1000540|200  |1     |36976      |300       |2          |[36949, 37349, 36121, 37026]       |
|36380|1000540|90   |1     |36465      |200       |1          |null                               |
|36448|1000540|100  |1     |36465      |200       |1          |null                               |
|36683|1000540|90   |1     |36465      |200       |1          |null                               |
|36949|1000540|90   |1     |36908      |200       |1          |null                               |
|37349|1000540|100  |1     |36908      |200       |1          |null                               |
|37026|1000540|90   |1     |36908      |200       |1          |null                               |
|36879|1000540|90   |1     |36465      |200       |1          |null                               |
+-----+-------+-----+------+-----------+----------+-----------+-----------------------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With