I am trying to write a query in SPARK SQL performing join of three tables. But the query output is actually null. It is working fine for single table. My Join query is correct as I have already executed it in oracle database. What correction do I need to appply here? Spark version is 2.0.0.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
lines = sc.textFile("/Users/Hadoop_IPFile/purchase")
lines2 = sc.textFile("/Users/Hadoop_IPFile/customer")
lines3 = sc.textFile("/Users/Hadoop_IPFile/book")
parts = lines.map(lambda l: l.split("\t"))
purchase = parts.map(lambda p: Row(year=p[0],cid=p[1],isbn=p[2],seller=p[3],price=int(p[4])))
schemapurchase = sqlContext.createDataFrame(purchase)
schemapurchase.registerTempTable("purchase")
parts2 = lines.map(lambda l: l.split("\t"))
customer = parts2.map(lambda p: Row(cid=p[0],name=p[1],age=p[2],city=p[3],sex=p[4]))
schemacustomer = sqlContext.createDataFrame(customer)
schemacustomer.registerTempTable("customer")
parts3 = lines.map(lambda l: l.split("\t"))
book = parts3.map(lambda p: Row(isbn=p[0],name=p[1]))
schemabook = sqlContext.createDataFrame(book)
schemabook.registerTempTable("book")
result_purchase = sqlContext.sql("""SELECT DISTINCT customer.name AS name FROM purchase JOIN book ON purchase.isbn = book.isbn JOIN customer ON customer.cid = purchase.cid WHERE customer.name != 'Harry Smith' AND purchase.isbn IN (SELECT purchase.isbn FROM customer JOIN purchase ON customer.cid = purchase.cid WHERE customer.name = 'Harry Smith')""")
result = result_purchase.rdd.map(lambda p: "name: " + p.name).collect()
for name in result:
    print(name)
DataSet
---------
Purchase
1999    C1  B1  Amazon  90
2001    C1  B2  Amazon  20
2008    C2  B2  Barnes Noble    30
2008    C3  B3  Amazon  28
2009    C2  B1  Borders 90
2010    C4  B3  Barnes Noble    26
Customer
C1  Jackie Chan 50  Dayton  M
C2  Harry Smith 30  Beavercreek M
C3  Ellen Smith 28  Beavercreek F
C4  John Chan   20  Dayton  M
Book
B1  Novel
B2  Drama
B3  Poem
I found below instruction in some webpage, but it is still not working:
schemapurchase.join(schemabook, schemapurchase.isbn == schemabook.isbn) 
schemapurchase.join(schemacustomer, schemapurchase.cid == schemacustomer.cid)
Given this input DataFrames like in your example (sorry if some column names are wrong, I guessed them):
purchase:
+----+---+----+------------+-----+
|year|cid|isbn|        shop|price|
+----+---+----+------------+-----+
|1999| C1|  B1|      Amazon|   90|
|2001| C1|  B2|      Amazon|   20|
|2008| C2|  B2|Barnes Noble|   30|
|2008| C3|  B3|      Amazon|   28|
|2009| C2|  B1|     Borders|   90|
|2010| C4|  B3|Barnes Noble|   26|
+----+---+----+------------+-----+
customer:
+---+-----------+---+-----------+-----+
|cid|       name|age|       city|genre|
+---+-----------+---+-----------+-----+
| C1|Jackie Chan| 50|     Dayton|    M|
| C2|Harry Smith| 30|Beavercreek|    M|
| C3|Ellen Smith| 28|Beavercreek|    F|
| C4|  John Chan| 20|     Dayton|    M|
+---+-----------+---+-----------+-----+
book:
+----+-----+
|isbn|genre|
+----+-----+
|  B1|Novel|
|  B2|Drama|
|  B3| Poem|
+----+-----+
You can translate that sql query using DataFrame functions, like follow:
val result = purchase.join(book, purchase("isbn")===book("isbn"))
                     .join(customer, customer("cid")===purchase("cid"))
                     .where(customer("name") !== "Harry Smith")
                     .join(temp, purchase("isbn")===temp("purchase_isbn"))
                     .select(customer("name").as("NAME")).distinct()
where "temp" is the result of the "SELECT IN", that can be considered like the result of another one join:
val temp = customer.join(purchase, customer("cid")===purchase("cid") )
                   .where(customer("name")==="Harry Smith")
                   .select(purchase("isbn").as("purchase_isbn"))    
+-------------+
|purchase_isbn|
+-------------+
|           B2|
|           B1|
+-------------+
So the final result is:
+-----------+
|       NAME|
+-----------+
|Jackie Chan|
+-----------+
Consider this answer like a point you can start thinking from (too much joins can have bad impact on performance, for example).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With