Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to compare values in a pyspark dataframe column with another dataframe in pyspark

I have a pyspark dataframe(df1) whose first first row is as below:

[Row(_c0='{"type":"Fi","values":[0.20100994408130646,1.172734797000885,0.06788740307092667,0.2314232587814331,0.2012220323085785]}', _c1='0')]

I want to compare the "values" list with the first column of below dataframe(df2) values as shown below:

0    0.57581    1.25461    0.68694    0.974580    1.54789    0.23646
1    0.98745    0.23655    2.58970    4.587580    0.89756    1.25678
2    0.45780    5.78940    0.65986    2.125400    0.98745    1.23658
3    2.56834    0.25698    4.26587    0.569872    0.36987    0.68975
4    0.25678    1.23654    5.68320    0.986230    0.87563    2.58975

Similarly I have many rows in df1, I have to see which values in df1 "values" list is greater than the corresponding column in df2.I need to find those indices which satisfy the above condition and store it as list in another column to df1.

For instance 1.172737 > 0.98745 so its index is 1.Hence I will have another column in df1 named(indices) which contains value1 and it has to append the same if another value comes up.

The comparison is between respective column and rows.The above shown df1 row is 1st row,so it has to compared with first column in df2.

If I have underemphasised sth please let me know in the comments.

like image 373
Fasty Avatar asked Dec 08 '25 09:12

Fasty


1 Answers

This code works with Python 2.7 and Spark 2.3.2 :

from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, IntegerType

# Create test dataframes
df1 = spark.createDataFrame([
        ['{"type":"Fi","values":[0.20100994408130646,1.172734797000885,0.06788740307092667,0.2314232587814331,0.2012220323085785]}', '0'],
        ['{"type":"Fi","values":[0.6, 0.8, 0.5, 2.1, 0.4]}', '0']
    ],['_c0','_c1'])
df2 = spark.createDataFrame([
        [0, 0.57581, 1.25461, 0.68694, 0.974580, 1.54789, 0.23646],
        [1, 0.98745, 0.23655, 2.58970, 4.587580, 0.89756, 1.25678],
        [2, 0.45780, 5.78940, 0.65986, 2.125400, 0.98745, 1.23658],
        [3, 2.56834, 0.25698, 4.26587, 0.569872, 0.36987, 0.68975],
        [4, 0.25678, 1.23654, 5.68320, 0.986230, 0.87563, 2.58975]
    ],['id','v1', 'v2', 'v3', 'v4', 'v5', 'v6'])

# Get schema and load json correctly
json_schema = spark.read.json(df1.rdd.map(lambda row: row._c0)).schema
df1 = df1.withColumn('json', F.from_json('_c0', json_schema))

# Get column 1 values to compare
values = [row['v1'] for row in df2.select('v1').collect()]

# Define udf to compare values
def cmp_values(lst):
    list_cmp = map(lambda t: t[0] > t[1], zip(lst, values))  # Boolean list
    return [idx for idx, cond in enumerate(list_cmp) if cond]  # Indices of satisfying elements

udf_cmp_values = F.udf(cmp_values, ArrayType(IntegerType()))

# Apply udf on array
df1 = df1.withColumn('indices', udf_cmp_values(df1.json['values']))
df1.show()

+--------------------+---+--------------------+---------+
|                 _c0|_c1|                json|  indices|
+--------------------+---+--------------------+---------+
|{"type":"Fi","val...|  0|[Fi, [0.201009944...|      [1]|
|{"type":"Fi","val...|  0|[Fi, [0.6, 0.8, 0...|[0, 2, 4]|
+--------------------+---+--------------------+---------+
like image 189
Pierre Gourseaud Avatar answered Dec 10 '25 21:12

Pierre Gourseaud



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!