Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: break partition iterator for better memory management?

I'm trying to develop a heavy mathematical calculations in Spark, both in term of time and memory (up to O(n^2) for both of them). I've found that the partition holding an Iterator is not really adequate for big calculus since it forces to instantiate (though lazily since it's an Iterator) one object per line. Indeed in a most simple scenario, one would hold a vector per line for instance. But it's both harmful for memory, as we know the JVM overhead for objects and all the pressure that is put on the GC, and for speed, as I may really improve performances improving my linear algebra operations up to BLAS level-3 (matrix by matrix instead of matrix by vector which I'm stuck with in this paradigm). In a very schematic here's what I want to achieve:

while (???) { // loop over some condition, doesn't really matter what
    val matrix = ??? // an instance of a matrix
    val broadMatrix = sparkContext.broadcast(matrix)
    // rdd is an instance of RDD[Vector] that is already cached
    rdd.mapPartition {
        iter =>
            val matrixValue = broadMatrix.value()
            iter.map (vector => matrixValue * vec)
    }
    // a bunch of other things relying on that result
}

Here are my thoughts:

  1. as my rdd in the code above is cached, then having an Iterator is useless, isn't it? Since the only advantage of it is not to hold in memory all the lines at the same time: but here it's been computed and cached so all the lines are held in memory... Yes of course one could argue that Spark's might have an intelligent cache that serializes and compress data (which I doubt when the storage level is MEMORY_ONLY though...).

  2. if 1. is true, then the only thing it produces is a huge memory overhead, as I have as many JVM objects as there are rows in my rdd but I could lower it down to a single JVM object per partition. I could even lower it down to a single object per Executor having a scala object that would act as shared memory for all the partitions living on the same executor (this I fear might be hard to handle though as I want to keep Spark's resilience, hence if a partition should be remove for any reason and re-appear on another executor I don't want to handle it by myself but let Spark move all the related objects by itself...).

My idea hence would be to transform this rdd of vector into one containing matrices, something like:

while (???) { // loop over some condition, doesn't really matter what
    val matrix = ??? // an instance of a matrix
    val broadMatrix = sparkContext.broadcast(matrix)
    // rdd is an instance of RDD[Vector] that is already cached
    rdd.mapPartition {
        iter =>
            val matrixValue = broadMatrix.value()
            // iter actually contains one single element which is the matrix containing all vectors stacked
            // here we have a BLAS-3 operation
            iter.map (matrix => matrixValue * matrix)
    }
    // a bunch of other things relying on that result
}

Anyone already faced this dilemna? Have you experienced advance usage of memory management as this one?

like image 312
Vince.Bdn Avatar asked Sep 14 '25 08:09

Vince.Bdn


1 Answers

as I may really improve performances improving my linear algebra operations up to BLAS level-3 (matrix by matrix instead of matrix by vector which I'm stuck with in this paradigm).

Using Iterators doesn't force you in any way to use Vectors, or even more than one element for each partition. You can easily create a single Matrix object for each split if you want.

both harmful for memory, as we know the JVM overhead for objects and all the pressure that is put on the GC

I'd argue that it is more complicated than this. The reason for using Iterators is to be able to handle partitions which are larger than memory. With lazy Iterators and small objects Spark can spill partial results to disk and make them accessible for garbage collecting. This cannot happen when you use a single large object. From my experience Spark is much more susceptible to GC problems with large objects.

Based on the description I suspect it would make sense to avoid storing data explicitly and instead initializing objects explicitly using off heap memory. This should keep GC at bay and allow you to handle large objects. But it is way above may pay grade.

like image 79
Alper t. Turker Avatar answered Sep 17 '25 01:09

Alper t. Turker