I have a small table (2k ) records and big table (5 mil) records.I need to fetch all data from small tables and only matching data from large table so to achieve this I have executed below query
select /*+ broadcast(small)*/ small.* From small left outer join large
Though the query return correct result but when I check the query plan it shows sort merged broadcast hash join.
Is there any limitations if small table is left table we can't broadcast and what's the way out then.
Since you're looking to select the entire dataset from a small table rather than a large one, Spark won't enforce a broadcast join. However, if you change the join sequence or convert to an equi-join, Spark will happily enable a broadcast join.
Eg:
Reason: The reason for this is that Spark shares the small table (also known as the broadcast table) to all data nodes where the big table data is present. In your case, you need all the data from the small table but only the matching data from the big table. Spark cannot determine whether a particular record was matched at another data node or if there was no match at all, so there is ambiguity when selecting all the records from the small table if it were distributed. As a result, Spark won't use broadcast join in this scenario.
Change the order of the tables as you are doing left join by broadcasting left table, so right table to be broadcasted (or) change the join type to right.
select /*+ broadcast(small)*/ small.* From small right outer join large
select /*+ broadcast(small)*/ small.* From large left outer join small
Example:
df=spark.createDataFrame([(1,'a')],['id','name'])
df1=spark.createDataFrame([(1,'a')],['id','name'])
#broadcasting on right df1 and performing left join
df.join(broadcast(df1),['id'],'left').explain()
#== Physical Plan ==
#*(2) Project [id#0L, name#1, name#5]
#+- *(2) BroadcastHashJoin [id#0L], [id#4L], LeftOuter, BuildRight
# :- Scan ExistingRDD[id#0L,name#1]
# +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
# +- *(1) Filter isnotnull(id#4L)
# +- Scan ExistingRDD[id#4L,name#5]
#broadcasting df1 and right join defaults to Sortmerge join
df.join(broadcast(df1),['id'],'right').explain()
#== Physical Plan ==
#*(4) Project [id#4L, name#1, name#5]
#+- SortMergeJoin [id#0L], [id#4L], RightOuter
# :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0
# : +- Exchange hashpartitioning(id#0L, 200)
# : +- *(1) Filter isnotnull(id#0L)
# : +- Scan ExistingRDD[id#0L,name#1]
# +- *(3) Sort [id#4L ASC NULLS FIRST], false, 0
# +- Exchange hashpartitioning(id#4L, 200)
# +- Scan ExistingRDD[id#4L,name#5]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With