Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Broadcast join in spark not working for left outer

I have a small table (2k ) records and big table (5 mil) records.I need to fetch all data from small tables and only matching data from large table so to achieve this I have executed below query select /*+ broadcast(small)*/ small.* From small left outer join large Though the query return correct result but when I check the query plan it shows sort merged broadcast hash join. Is there any limitations if small table is left table we can't broadcast and what's the way out then.

like image 484
S Ganguly Avatar asked Oct 29 '25 08:10

S Ganguly


2 Answers

Since you're looking to select the entire dataset from a small table rather than a large one, Spark won't enforce a broadcast join. However, if you change the join sequence or convert to an equi-join, Spark will happily enable a broadcast join.

Eg:

  1. Big-Table left outer join Small-Table -- Broadcast Enabled
  2. Small-Table left outer join Big-Table -- Broadcast Disabled

Reason: The reason for this is that Spark shares the small table (also known as the broadcast table) to all data nodes where the big table data is present. In your case, you need all the data from the small table but only the matching data from the big table. Spark cannot determine whether a particular record was matched at another data node or if there was no match at all, so there is ambiguity when selecting all the records from the small table if it were distributed. As a result, Spark won't use broadcast join in this scenario.

like image 51
Krishna G Avatar answered Oct 31 '25 13:10

Krishna G


Change the order of the tables as you are doing left join by broadcasting left table, so right table to be broadcasted (or) change the join type to right.

select /*+ broadcast(small)*/ small.* From small right outer join large
select /*+ broadcast(small)*/ small.* From large left outer join small

Example:

df=spark.createDataFrame([(1,'a')],['id','name'])
df1=spark.createDataFrame([(1,'a')],['id','name'])

#broadcasting on right df1 and performing left join
df.join(broadcast(df1),['id'],'left').explain()
#== Physical Plan ==
#*(2) Project [id#0L, name#1, name#5]
#+- *(2) BroadcastHashJoin [id#0L], [id#4L], LeftOuter, BuildRight
#   :- Scan ExistingRDD[id#0L,name#1]
#   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
#      +- *(1) Filter isnotnull(id#4L)
#         +- Scan ExistingRDD[id#4L,name#5]


#broadcasting df1 and right join defaults to Sortmerge join
df.join(broadcast(df1),['id'],'right').explain()
#== Physical Plan ==
#*(4) Project [id#4L, name#1, name#5]
#+- SortMergeJoin [id#0L], [id#4L], RightOuter
#   :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0
#   :  +- Exchange hashpartitioning(id#0L, 200)
#   :     +- *(1) Filter isnotnull(id#0L)
#   :        +- Scan ExistingRDD[id#0L,name#1]
#   +- *(3) Sort [id#4L ASC NULLS FIRST], false, 0
#      +- Exchange hashpartitioning(id#4L, 200)
#         +- Scan ExistingRDD[id#4L,name#5]
like image 27
notNull Avatar answered Oct 31 '25 13:10

notNull



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!