I have a Spark Scala program which uses a REST API to get data batch by batch, and once all the data is retrieved I operate on them.
Current Program:
For each batch, create RDD and merge it with the previous RDD
created using the previous API call rdd.union(currentRdd).
Operate on final RDD
A simple program to reproduce the issue:
def main(args: Array[String]) = {
val conf = new SparkConf().setAppName("Union test").setMaster("local[1]")
val sc = new SparkContext(conf)
val limit = 1000;
var rdd = sc.emptyRDD[Int]
for (x <- 1 to limit) {
val currentRdd = sc.parallelize(x to x + 3)
rdd = rdd.union(currentRdd)
}
println(rdd.sum())
}
Problem:
- When number of batches are high the program throws a StackOverflowError : Exception in thread "main" java.lang.StackOverflowError
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply
I assume, that when the number of batches increases the RDD dependency graph becomes really complex and throwing the error.
What is the best way to resolve this problem?
There is already SparkContext.union that knows how to properly compute a union of multiple RDDs:
val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
val rdd = sc.union(rdds)
Alternatively, you could try using this helper function to avoid the creation of a long chain of unions:
val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
val rdd = balancedReduce(rdds)(_ union _)
The reason why it should work is essentially the same as in the linked answer: O(n) chain of unions blows the stack, O(log(n))-high binary tree of unions doesn't.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With