THE SCENARIO
I'm trying to write a Spark program that efficiently performs a left outer join between two RDDs. One caveat is that these RDDs can have duplicate keys, which apparently causes the whole program to be inefficient.
What I'm trying to achieve is simple:
rdd1 and rdd2 (both have the same structure: (k, v))rdd1 and rdd2, generate another RDD rdd3 that has the structure: (k1, v1, List(v2..))k1 and v1 come from rdd1 (same values, this will lead to rdd1 and rdd3 have the same length)List(v2..) is a list whose values are coming from the values of rdd2rdd2's v to the list in rdd3's tuple, its k (the key from rdd2) should match the k from rdd1MY ATTEMPT
My approach was to use a left outer join. So, I came up with something like this:
rdd1.leftOuterJoin(rdd2).map{case(k, (v1, v2)) => ((k, v1), Array(v2))}
.reduceByKey(_ ++ _)
This actually produces the result that I'm trying to acheive. But, when I use a huge data, the program becomes very slow.
AN EXAMPLE
Just in case my idea is not clear yet, I have the following example:
Given two RDDs that have the following data:
rdd1:
key | value
-----------
1 | a
1 | b
1 | c
2 | a
2 | b
3 | c
rdd2:
key | value
-----------
1 | v
1 | w
1 | x
1 | y
1 | z
2 | v
2 | w
2 | x
3 | y
4 | z
The resulting rdd3 should be
key | value | list
------------------------
1 | a | v,w,x,y,z
1 | b | v,w,x,y,z
1 | c | v,w,x,y,z
2 | a | v,w,x
2 | b | v,w,x
3 | c | y
First of all don't use:
map { ... => (..., Array(...)) }.reduceByKey(_ ++ _)
That's pretty much as inefficient as it gets. To group values like this using RDDs you should really go with groupByKey.
Additionally just to groupByKey afterwards is pretty wasteful. You are doing the same job (grouping by key) on the right hand side twice. It makes more sense to use cogroup directly (that's how RDD joins work) and flatMap
val rdd1 = sc.parallelize(Seq(
(1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b"),(3, "c")
))
val rdd2 = sc.parallelize(Seq(
(1, "v"), (1, "w"), (1, "x"), (1, "y"), (1, "z"), (2, "v"),
(2, "w"), (2, "x"), (3, "y"),(4, "z")
))
val rdd = rdd1
.cogroup(rdd2)
.flatMapValues { case (left, right) => left.map((_, right)) }
.map { case (k1, (k2, vs)) => ((k1, k2), vs) }
You can also use DataSet API which tends to be more efficient in such cases
import org.apache.spark.sql.functions.collect_list
val df1 = rdd1.toDF("k", "v")
val df2 = rdd2.toDF("k", "v")
df2.groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
The result:
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
If the intersect of the sets of keys is small you can try to optimize the process by applying a filter first
val should_keep = {
val f = df1.stat.bloomFilter("k", df1.count, 0.005)
udf((x: Any) => f.mightContain(x))
}
df2.where(should_keep($"k")).groupBy("k")
.agg(collect_list("v").as("list"))
.join(rdd1.toDF("k", "v"), Seq("k"), "rightouter")
.show
+---+---------------+---+
| k| list| v|
+---+---------------+---+
| 1|[v, w, x, y, z]| a|
| 1|[v, w, x, y, z]| b|
| 1|[v, w, x, y, z]| c|
| 3| [y]| c|
| 2| [v, w, x]| a|
| 2| [v, w, x]| b|
+---+---------------+---+
When using Dataset API please be sure to adjust spark.sql.shuffle.partitions to reflect the amount of data you process.
Note:
None of that will help you if number of duplicates in rdd2 is large. In such case the overall problem formulation is impossible to defend and you should try to reformulate it, taking into account requirements of the downstream process.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With