I have large DataFrames:A(200g), B(20m), C(15m), D(10m), E(12m), I want to join them together: A join B, C join D and E using spark sql in same SparkSession**. Just like:
absql:sql("select * from A a inner join B b on a.id=b.id").write.csv("/path/for/ab")
cdesql:sql("select * from C c inner join D d on c.id=d.id inner join E e on c.id=e.id").write.csv("/path/for/cde")
Problem:
When I use default spark.sql.autoBroadcastJoinThreshold=10m
When I set spark.sql.autoBroadcastJoinThreshold=20m
Instead of changing autoBroadcastJoinThreshold, you can mark the dataframes to be broadcasted. In this way, it's easy to decide which dataframes should be broadcasted or not.
In Scala it can look like this:
import org.apache.spark.sql.functions.broadcast
val B2 = broadcast(B)
B2.createOrReplaceTempView("B")
Here dataframe B has been marked for broadcasting and then been registered as a table to be used with Spark SQL.
Alternatively, this can be done directly with the dataframe API, the first join can be written as:
A.join(broadcast(B), Seq("id"), "inner")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With