I want to find the top level hierarchy of an employee in an organization and assign the reporting levels using pyspark?
We have already used spark GraphX to solve this issue with Scala support. We would like to do this in python but not using Graphframes (DFs first preference). is it possible to do it using spark DFs? If not, then we will go for Graphframes.
There are 2 DFs i.e., employee_df and required_hierarchy_df
Please refer to below example:
required_hierarchy_df:
employee_id | designation | supervisor_id | supervisor_designation
10 | Developer | 05 | Techincal Lead
employee_df :
employee_id | designation | supervisor_id | supervisor_designation
10 | Developer | 05 | Techincal Lead
05 | Technical Lead | 04 | Manager
04 | Director | 03 | Sr. Director
03 | Sr. Director| 02 | Chairman
02 | Chairman | 01 | CEO
01 | CEO | null | null
Expected Outputs :
Reporting Levels of an employee :
report_level_df :
employee_id | level_1_id | level_2_id | level_3_id | level_4_id | level_5_id
10 | 05 | 04 | 03 | 02 | 01
Top Hierarchy information in an organization :
top_level_df :
employee_id | designation | top_level_id | top_level_designation
10 | Developer | 01 | CEO
Consider not using spark as its only 2 million rows. Using a dict-/graph-/tree-like datastructure makes this very simple. I would recommend not to do this using Spark DataFrames.
Using Spark DataFrames you could solve this by a recursive join, creating the dataframe report_level_df. This is not a nice and/or efficient solution
We are interested in employee - supervisor relationships.
edges = employee_df.select('employee_id', 'supervisor_id')
Taking a single step up the ladder, so to speak, requires a single join
level_0 = edges \
.withColumnRenamed('employee_id', 'level_0') \
.withColumnRenamed('supervisor_id', 'level_1')
level_1 = edges \
.withColumnRenamed('employee_id', 'level_1') \
.withColumnRenamed('supervisor_id', 'level_2')
# Join, sort columns and show
level_0 \
.join(level_1, on='level_1') \
.select('level_0', 'level_1', 'level_2') \
.show()
And we want to traverse them up the chain, recursively.
total = edges \
.withColumnRenamed('employee_id', 'level_0') \
.withColumnRenamed('supervisor_id', 'level_1')
levels = 10
for i in range(1, levels):
level_i = edges \
.withColumnRenamed('employee_id', 'level_{}'.format(i)) \
.withColumnRenamed('supervisor_id', 'level_{}'.format(i+1))
total = total \
.join(level_i, on='level_{}'.format(i), how='left')
# Sort columns and show
total \
.select(['level_{}'.format(i) for i in range(levels)]) \
.show()
Except that we don't want to guess the number of levels, so we check every time if we are done yet. This requires a run over all data, and is therefore slow.
schema = 'employee_id int, supervisor_id int'
edges = spark.createDataFrame([[10, 5], [5, 4], [4, 3], [3, 2], [2, 1], [1, None]], schema=schema)
total = edges \
.withColumnRenamed('employee_id', 'level_0') \
.withColumnRenamed('supervisor_id', 'level_1')
i = 1
while True:
this_level = 'level_{}'.format(i)
next_level = 'level_{}'.format(i+1)
level_i = edges \
.withColumnRenamed('employee_id', this_level) \
.withColumnRenamed('supervisor_id', next_level)
total = total \
.join(level_i, on=this_level, how='left')
if total.where(f.col(next_level).isNotNull()).count() == 0:
break
else:
i += 1
# Sort columns and show
total \
.select(['level_{}'.format(i) for i in range(i+2)]) \
.show()
Result
+-------+-------+-------+-------+-------+-------+-------+
|level_5|level_4|level_3|level_2|level_1|level_0|level_6|
+-------+-------+-------+-------+-------+-------+-------+
| null| null| null| null| null| 1| null|
| null| null| null| null| 1| 2| null|
| null| null| null| 1| 2| 3| null|
| null| null| 1| 2| 3| 4| null|
| null| 1| 2| 3| 4| 5| null|
| 1| 2| 3| 4| 5| 10| null|
+-------+-------+-------+-------+-------+-------+-------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With