I'm running Spark 2 and am trying to shuffle around 5 terabytes of json. I'm running into very long garbage collection pauses during shuffling of a Dataset:
val operations = spark.read.json(inPath).as[MyClass]
operations.repartition(partitions, operations("id")).write.parquet("s3a://foo")
Are there any obvious configuration tweaks to deal with this issue? My configuration is as follows:
spark.driver.maxResultSize 6G
spark.driver.memory 10G
spark.executor.extraJavaOptions -XX:+UseG1GC -XX:MaxPermSize=1G -XX:+HeapDumpOnOutOfMemoryError
spark.executor.memory   32G
spark.hadoop.fs.s3a.buffer.dir  /raid0/spark
spark.hadoop.fs.s3n.buffer.dir  /raid0/spark
spark.hadoop.fs.s3n.multipart.uploads.enabled   true
spark.hadoop.parquet.block.size 2147483648
spark.hadoop.parquet.enable.summary-metadata    false
spark.local.dir /raid0/spark
spark.memory.fraction 0.8
spark.mesos.coarse  true
spark.mesos.constraints  priority:1
spark.mesos.executor.memoryOverhead 16000
spark.network.timeout   600
spark.rpc.message.maxSize    1000
spark.speculation   false
spark.sql.parquet.mergeSchema   false
spark.sql.planner.externalSort  true
spark.submit.deployMode client
spark.task.cpus 1
The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation. Stages, tasks and shuffle writes and reads are concrete concepts that can be monitored from the Spark shell.
Parallelising effectively of the spark shuffle operation gives performance output as good for spark jobs. Spark data frames are the partitions of Shuffle operations. The original data frame partitions differ with the number of data frame partitions.
Data is returned to disk and is transferred all across the network during a shuffle. The shuffle operation number reduction is to be done or consequently reduce the amount of data being shuffled. By default, Spark shuffle operation uses partitioning of hash to determine which key-value pair shall be sent to which machine.
High garbage collection rate will increase the GC pause time as well. Thus, optimizing the application to create less number of objects is THE EFFECTIVE strategy to reduce long GC pauses. This might be a time-consuming exercise, but it is 100% worth doing.
Adding the following flags got rid of the GC pauses.
spark.executor.extraJavaOptions -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12
I think it does take a fair amount of tweaking though. This databricks post was very very helpful.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With