Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to join/merge a list of dataframes with common keys in PySpark?

df1
     uid1  var1
0  John         3
1  Paul         4
2  George       5
df2
     uid1  var2
0  John         23
1  Paul         44
2  George       52
df3
     uid1  var3
0  John         31
1  Paul         45
2  George       53
df_lst=[df1,df2,df3]

How do I merge/join the 3 dataframes in the list based on common key uid1 ?

Edit: Expected output

   df1
     uid1  var1     var2    var3
0  John         3        23      31
1  Paul         4        44      45
2  George       5        52      53
like image 613
GeorgeOfTheRF Avatar asked Jun 13 '17 08:06

GeorgeOfTheRF


People also ask

How do I join two DataFrames in PySpark with the same column names?

➠ Join Syntax: Join function can take up to 3 parameters, 1st parameter is mandatory and other 2 are optional. 1st parameter is used to specify other dataframe i.e. right side of the join. String for a column name if both dataframes have same named joining column.

How do I merge two DataFrames in pandas based on common column?

To merge two Pandas DataFrame with common column, use the merge() function and set the ON parameter as the column name.

How do I join PySpark DataFrames on multiple columns?

The join syntax of PySpark join() takes, right dataset as first argument, joinExprs and joinType as 2nd and 3rd arguments and we use joinExprs to provide the join condition on multiple columns. Note that both joinExprs and joinType are optional arguments.


2 Answers

You can join a list of dataframe. Below is the simple example

import spark.implicits._
    val df1 = spark.sparkContext.parallelize(Seq(
      (0,"John",3),
    (1,"Paul",4),
    (2,"George",5)
    )).toDF("id", "uid1", "var1")

    import spark.implicits._
    val df2 = spark.sparkContext.parallelize(Seq(
      (0,"John",23),
      (1,"Paul",44),
      (2,"George",52)
    )).toDF("id", "uid1", "var2")

    import spark.implicits._
    val df3 = spark.sparkContext.parallelize(Seq(
      (0,"John",31),
      (1,"Paul",45),
      (2,"George",53)
    )).toDF("id", "uid1", "var3")


    val df = List(df1, df2, df3)

    df.reduce((a,b) => a.join(b, Seq("id", "uid1")))

Output:

+---+------+----+----+----+
| id|  uid1|var1|var2|var3|
+---+------+----+----+----+
|  1|  Paul|   4|  44|  45|
|  2|George|   5|  52|  53|
|  0|  John|   3|  23|  31|
+---+------+----+----+----+

Hope this helps!

like image 68
koiralo Avatar answered Sep 18 '22 14:09

koiralo


Let me suggest python answer:

from pyspark import SparkContext
SparkContext._active_spark_context.stop()
sc = SparkContext()
sqlcontext = SQLContext(sc)

import pyspark.sql.types as t

rdd_list = [sc.parallelize([('John',i+1),('Paul',i+2),('George',i+3)],1) \
            for i in [100,200,300]]
df_list = []
for i,r in enumerate(rdd_list):
    schema = t.StructType().add('uid1',t.StringType())\
                           .add('var{}'.format(i+1),t.IntegerType())
    df_list.append(sqlcontext.createDataFrame(r, schema))
    df_list[-1].show()
+------+----+
|  uid1|var1|
+------+----+
|  John| 101|
|  Paul| 102|
|George| 103|
+------+----+

+------+----+
|  uid1|var2|
+------+----+
|  John| 201|
|  Paul| 202|
|George| 203|
+------+----+

+------+----+
|  uid1|var3|
+------+----+
|  John| 301|
|  Paul| 302|
|George| 303|
+------+----+
df_res = df_list[0]
for df_next in df_list[1:]:
    df_res = df_res.join(df_next,on='uid1',how='inner')
df_res.show()
+------+----+----+----+
|  uid1|var1|var2|var3|
+------+----+----+----+
|  John| 101| 201| 301|
|  Paul| 102| 202| 302|
|George| 103| 203| 303|
+------+----+----+----+

One more option:

def join_red(left,right):
    return left.join(right,on='uid1',how='inner')

res = reduce(join_red, df_list)
res.show()
+------+----+----+----+
|  uid1|var1|var2|var3|
+------+----+----+----+
|  John| 101| 201| 301|
|  Paul| 102| 202| 302|
|George| 103| 203| 303|
+------+----+----+----+
like image 24
Fedo Avatar answered Sep 19 '22 14:09

Fedo