Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark reuse broadcast DF

Tags:

apache-spark

I would like to reuse my DataFrame (without falling back to doing this using "Map" function in RDD/Dataset) which I marking as broadcast-eable, but seems Spark keeps broadcasting it again and again.

Having a table "bank" (test table). I perform the following:

  val cachedDf = spark.sql("select * from bank").cache
  cachedDf.count

  val dfBroadcasted = broadcast(cachedDf)

  val dfNormal = spark.sql("select * from bank")

  dfNormal.join(dfBroadcasted, List("age"))
    .join(dfBroadcasted, List("age")).count

I'm caching before just in case it made a difference, but its the same with or without.

If I execute the above code, I see the following SQL plan:

SQL plan for the code

As you can see, my broadcasted DF gets broadcasted TWICE with also different timings (if I add more actions afterwards, they broadcast again too).

I care about this, because I actually have a long-running program which has a "big" DataFrame which I can use to filter out HUGE DataFrames, and I would like that "big" DataFrame to be reused.

Is there a way to force reusability? (not only inside the same action, but between actions, I could survive with the same action tho)

Thanks!,

like image 483
BiS Avatar asked Dec 14 '25 14:12

BiS


1 Answers

Ok, updating the question.

Summarising: INSIDE the same action, left_semis will reuse broadcasts while normal/left joins won't. Not sure related with the fact that Spark/developers already know the columns of that DF won't affect the output at all so they can reuse it or it's just an optimization spark is missing.

My problem seems mostly-solved, although it would be great if someone knew how to keep the broadcast across actions.

If I use left_semi (which is the join i'm going to use in my real app), the broadcast is only performed once.

With:

    dfNormalxx.join(dfBroadcasted, Seq("age"),"left_semi")
.join(dfBroadcasted, Seq("age"),"left_semi").count

The plan becomes (I also changed the size so it matches my real one, but this made no difference):

enter image description here

Also the wall total time is much better than when using "left_semi" (I set 1 executor so it doesn't get parallelized, just wanted to check if the job was really being done twice):

enter image description here

Even though my collect takes 10 seconds, this will speedup table reads+groupBys which are taking like 6-7minutes

like image 84
BiS Avatar answered Dec 19 '25 07:12

BiS



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!