I am trying to write a function in Spark/Scala that takes 2 RDDs and per item in the first, finds items from the second that fit within the date range of the first. This is the code I wrote to express the problem (I've added the annotations for clarity):
def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : RDD[(PerfLog, RDD[PerfLog])] =
{
durationLog.map((duration: PerfLog) => {
val sizes = sizeLogs.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
(duration, sizes)
})
}
If I call .collect() on the map expression at the end of the function, I get this exception.
15/06/19 15:57:05 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
I've found that if I modify the above code so that both parameters are collected at the start and treated as arrays for the rest of the function, it operates fine.
def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : Array[(PerfLog, Array[PerfLog])] =
{
val durationData = durationLog.collect()
val sizeData = sizeLogs.collect()
durationData.map((duration: PerfLog) => {
val sizes = sizeData.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
(duration, sizes)
})
}
While this works, this obviously does not seem like the correct answer as the parameters could grow to become quite large.
Why does it work when treated as an Array, but not as an RDD?
You can't iterate other RDD while iterating one. To overcome this problem you don't need to collect both RDDs, a better solution in to collect one RDD (the smaller one, for better performance) and then use those two data structures (RDD and Array) to get your n^2 operation.
def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : RDD[(PerfLog, Array[PerfLog])] =
{
val sizeData = sizeLogs.collect
durationLog.map((duration: PerfLog) => {
val sizes = sizeData.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
(duration, sizes)
})
}
For more better performance use Spark Broadcast. It actually broadcast the variable to all Nodes. as
def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : RDD[(PerfLog, Array[PerfLog])] =
{
val sizeData = sc.broadcast(sizeLogs.collect)
durationLog.map((duration: PerfLog) => {
val sizes = sizeData.value.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
(duration, sizes)
})
}
Hopefully It will help you.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With