I was reading a lot about the differences between map and mapPartitions. Still I have some doubts.
The thing is after reading I decided to change the map functions for mapPartitions in my code because apparently mapPartitions is faster than map.
My question is about to be sure if my decision is right in scenarios like the following (comments show the previous code):
val reducedRdd = rdd.mapPartitions(partition => partition.map(r => (r.id, r)))
//val reducedRdd = rdd.map(r => (r.id, r))
.reduceByKey((r1, r2) => r1.combineElem(r2))
// .map(e => e._2)
.mapPartitions(partition => partition.map(e => e._2))
I am thinking it right? Thanks!
In your case, mapPartitions should not make any difference.
mapPartitions vs map
mapPartitions is useful when we have some common computation which we want to do for each partition. Example -
rdd.mapPartitions{
partition =>
val complicatedRowConverter = <SOME-COSTLY-COMPUTATION>
partition.map {
row => (row.id, complicatedRowConverter(row) )
}
}
In above example, we are creating a complicatedRowConverter function which is dervived from some costly computation. This function will be same for entire
RDD partition and we don't need recreate it again and again. The other way to do same thing can be -
rdd.map { row =>
val complicatedRowConverter = <SOME-COSTLY-COMPUTATION>
(row.id, complicatedRowConverter(row) )
}
}
This will be slow because we are unnecessarily running this statement for every row - val complicatedRowConverter = <SOME-COSTLY-COMPUTATION>.
In your case, you don't have any precomputation or anything else for each partition. In the mapPartition, you are simply iterating over each row and mapping it to (row.id, row).
So mapPartition here won't benefit and you can use simple map.
tl;dr mapPartitions will be fast in this case.
consider a function
def someFunc(row): row {
// do some processing on row
// return new row
}
Say we are processing 1million records
We will end up calling the someFunc 1 million.
There are bascally 1m virtual function call and other kernel data structures created for the processing
we would write this as
mapPartition { partIter =>
partIter.map {
// do some processing on row
// return new row
}
}
No virual functions, context switch here.
Hence mapPartitions will be faster.
Also, like mentioned in the @moriarity007's answer, we also need to factor in the object creation overhead involved with the operation, when deciding between the operator to use.
Also, I would recommend using the dataframe transforms and actions to do processing, where the compute can be even faster, since Spark catalyst optimizes your code and also take advantage of code generation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With