Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: NullPointerException when RDD isn't collected before map

I am trying to write a function in Spark/Scala that takes 2 RDDs and per item in the first, finds items from the second that fit within the date range of the first. This is the code I wrote to express the problem (I've added the annotations for clarity):

def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : RDD[(PerfLog, RDD[PerfLog])] =
{
    durationLog.map((duration: PerfLog) => {
        val sizes = sizeLogs.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
        (duration, sizes)
    })
}

If I call .collect() on the map expression at the end of the function, I get this exception.

15/06/19 15:57:05 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.NullPointerException
    at org.apache.spark.rdd.RDD.filter(RDD.scala:282)

I've found that if I modify the above code so that both parameters are collected at the start and treated as arrays for the rest of the function, it operates fine.

def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : Array[(PerfLog, Array[PerfLog])] =
{
    val durationData = durationLog.collect()
    val sizeData = sizeLogs.collect()

    durationData.map((duration: PerfLog) => {
        val sizes = sizeData.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
        (duration, sizes)
    })
}

While this works, this obviously does not seem like the correct answer as the parameters could grow to become quite large.

Why does it work when treated as an Array, but not as an RDD?

like image 882
one-t Avatar asked Nov 29 '25 11:11

one-t


1 Answers

You can't iterate other RDD while iterating one. To overcome this problem you don't need to collect both RDDs, a better solution in to collect one RDD (the smaller one, for better performance) and then use those two data structures (RDD and Array) to get your n^2 operation.

def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : RDD[(PerfLog, Array[PerfLog])] =
{
   val sizeData = sizeLogs.collect

   durationLog.map((duration: PerfLog) => {
    val sizes = sizeData.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
     (duration, sizes)
   })
}

For more better performance use Spark Broadcast. It actually broadcast the variable to all Nodes. as

def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : RDD[(PerfLog, Array[PerfLog])] =
{
   val sizeData = sc.broadcast(sizeLogs.collect)

   durationLog.map((duration: PerfLog) => {
    val sizes = sizeData.value.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
     (duration, sizes)
   })
}

Hopefully It will help you.

like image 61
Zia Kiyani Avatar answered Dec 01 '25 03:12

Zia Kiyani