I would like to reuse my DataFrame (without falling back to doing this using "Map" function in RDD/Dataset) which I marking as broadcast-eable, but seems Spark keeps broadcasting it again and again.
Having a table "bank" (test table). I perform the following:
val cachedDf = spark.sql("select * from bank").cache
cachedDf.count
val dfBroadcasted = broadcast(cachedDf)
val dfNormal = spark.sql("select * from bank")
dfNormal.join(dfBroadcasted, List("age"))
.join(dfBroadcasted, List("age")).count
I'm caching before just in case it made a difference, but its the same with or without.
If I execute the above code, I see the following SQL plan:

As you can see, my broadcasted DF gets broadcasted TWICE with also different timings (if I add more actions afterwards, they broadcast again too).
I care about this, because I actually have a long-running program which has a "big" DataFrame which I can use to filter out HUGE DataFrames, and I would like that "big" DataFrame to be reused.
Is there a way to force reusability? (not only inside the same action, but between actions, I could survive with the same action tho)
Thanks!,
Ok, updating the question.
Summarising: INSIDE the same action, left_semis will reuse broadcasts while normal/left joins won't. Not sure related with the fact that Spark/developers already know the columns of that DF won't affect the output at all so they can reuse it or it's just an optimization spark is missing.
My problem seems mostly-solved, although it would be great if someone knew how to keep the broadcast across actions.
If I use left_semi (which is the join i'm going to use in my real app), the broadcast is only performed once.
With:
dfNormalxx.join(dfBroadcasted, Seq("age"),"left_semi")
.join(dfBroadcasted, Seq("age"),"left_semi").count
The plan becomes (I also changed the size so it matches my real one, but this made no difference):

Also the wall total time is much better than when using "left_semi" (I set 1 executor so it doesn't get parallelized, just wanted to check if the job was really being done twice):

Even though my collect takes 10 seconds, this will speedup table reads+groupBys which are taking like 6-7minutes
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With