I run the "join" operation on the Apache Spark and see that there is no weak scalability. It will be grateful if anyone can explain this.
I create two dataframes ("a", "b") and ("a", "c") and join the dataframes by the first column. I generate dataframe values for "one to one" join. Also, I use the same partitioner to avoid shuffle.
Number of rows in the dataframes - 1024 * 1024 * 16 * cores_total (cores_total - total number of cores on which program is launched). Column "a" consist of random Int values, all values of the "b" column equal to 1, all values of the "c" column equal to 2.
Theoretically, with the increase of the data size and cores by 64 times, the execution time should remain the same, but the execution time slightly grows. I obtain the following execution times:

Apache Spark version - 2.1.0. We use 8 cluster nodes, equipped with 1 Gbit Ethernet, each node has 2x Intel Xeon E5-2630, 64 GB RAM.
/* join perf */
import scala.io.Source
import scala.math._
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.util.control.Breaks._
import scala.collection.mutable._
import org.apache.spark.rdd._
import org.apache.spark.sql._
import scala.util.Random
import org.apache.spark.util.SizeEstimator
import org.apache.spark.HashPartitioner
object joinPerf {
def get_array(n: Int): Array[Int] = {
var res = Array[Int]()
for (x <- 1 to n) {
res :+= Random.nextInt
}
return res
}
def main(args: Array[String]) {
val start_time = System.nanoTime
val conf = new SparkConf().setAppName("joinPerf")
val sc = new SparkContext(conf)
val cores_total = sc.getConf.get("spark.cores.max").toInt
val partitions_total = sc.getConf.get("spark.default.parallelism").toInt
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
import sqlContext.implicits._
println("start")
val elems_total = 1024 * 1024 * 16 * cores_total
val start_cnt = 1024 * 1024
Random.setSeed(785354)
var vals = Vector[Int]()
for (x <- 1 to start_cnt) {
vals :+= Random.nextInt
}
var test_rdd = sc.parallelize(vals)
println(test_rdd.count)
test_rdd = test_rdd.flatMap(x => get_array(elems_total / start_cnt)).distinct
println("test_rdd count = " + test_rdd.count)
println("partitions count = " + test_rdd.getNumPartitions)
var test_rdd1 = test_rdd.map(x => (x, 1)).toDF("a", "b").repartition(partitions_total, $"a").cache
var test_rdd2 = test_rdd.map(x => (x, 2)).toDF("a", "c").repartition(partitions_total, $"a").cache
println("test_rdd1 count = " + test_rdd1.count)
println("test_rdd2 count = " + test_rdd2.count)
var start_test_time = System.nanoTime
var test_res = test_rdd1.join(test_rdd2, test_rdd1("a") === test_rdd2("a"))
println(test_res.count)
print("join time = ")
println((System.nanoTime - start_test_time) / 1e9d + " sec. ")
print("all time = ")
println((System.nanoTime - start_time) / 1e9d + " sec. ")
sc.stop()
}
}
config parameters:
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max 1024
spark.kryo.unsafe true
spark.kryo.referenceTracking false
spark.driver.memory 22g
spark.executor.memory 22g
spark.driver.maxResultSize 22g
spark.rpc.message.maxSize 2047
spark.memory.fraction 0.8
spark.memory.storageFraction 0.5
spark.executor.extraJavaOptions "-XX:+UseParallelGC"
Partitions per core - 4.
Example of launching program:
./bin/spark-submit --class "joinPerf" --conf spark.executor.cores=8 --conf spark.cores.max=64 --conf spark.default.parallelism=256 ./joinPerf.jar
Theoretically, with the increase of the data size and cores by 64 times, the execution time should remain the same, but the execution time slightly grows
It shouldn't. While one could expect linear scalability, assuming no IO bottlenecks, when performing strictly local operations on uniformly distributed data, this is not longer the case, when transformations require data exchange (RDD shuffles, Dataset Exchange). Among wide transformations, joins belong to most expensive category (next groupByKey-like operations), due their non-reducing nature, and usage of large, local, supporting collections.
Shuffles not only have higher than linear complexity (at least O(N log N) for sorting-based methods), but also can induce non-uniform distribution of data, and require significant disk and network IO.
This is even more severe in case of your code, which shuffles data twice - once to repartition RDDs and once to join Datasets (HashPartitioner for RDDs is not compatible with Dataset partitioning).
Finally increasing cluster size, has its own performance impact, related to increased communication and synchronization overhead and decreased data locality.
Overall you'll rarely see truly linear scalability, and even if you do, you can expect slope to be < 1.
On a side note I wouldn't depend on cache - count idiom when working with Datasets. It is likely to be unreliable.
See also Spark: Inconsistent performance number in scaling number of cores
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With