I'm deploying a Spark data processing job on an EC2 cluster, the job is small for the cluster (16 cores with 120G RAM in total), the largest RDD has only 76k+ rows. But heavily skewed in the middle (thus requires repartitioning) and each row has around 100k of data after serialization. The job always got stuck in repartitioning. Namely, the job will constantly get following errors and retries:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle
org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer
org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /tmp/spark-...
I've tried to identify the problem but it seems like both memory and disk consumption of the machine throwing these errors are below 50%. I've also tried different configurations, including:
let driver/executor memory use 60% of total memory.
let netty to priortize JVM shuffling buffer.
increase shuffling streaming buffer to 128m.
use KryoSerializer and max out all buffers
increase shuffling memoryFraction to 0.4
But none of them works. The small job always trigger the same series of errors and max out retries (upt to 1000 times). How to troubleshoot this thing in such situation?
Thanks a lot if you have any clue.
Transformations which can cause a shuffle include repartition operations like repartition and coalesce , 'ByKey operations (except for counting) like groupByKey and reduceByKey , and join operations like cogroup and join .
Now, when we talk about the shuffle-data which will be the intermediate result/output from mapper. By default, the spark will store this intermediate output in memory but if there is not enough space then it will store the intermediate data on the disk space.
One way to avoid shuffles when joining two datasets is to take advantage of broadcast variables. When one of the datasets is small enough to fit in memory in a single executor, it can be loaded into a hash table on the driver and then broadcast to every executor.
Shuffling is the process of exchanging data between partitions. As a result, data rows can move between worker nodes when their source partition and the target partition reside on a different machine. Spark doesn't move data between nodes randomly.
Check your log if you get an error similar to this.
ERROR 2015-05-12 17:29:16,984 Logging.scala:75 - Lost executor 13 on node-xzy: remote Akka client disassociated
Every time you get this error is because you lose an executor. As why you lost an executor, that is another story, again check your log for clues.
One thing Yarn can kill your job, if it thinks that see you are using "too much memory"
Check for something like this:
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl  - Container [<edited>] is running beyond physical memory limits. Current usage: 18.0 GB of 18 GB physical memory used; 19.4 GB of 37.8 GB virtual memory used. Killing container.
Also see: http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
The current state of the art is to increase spark.yarn.executor.memoryOverhead until the job stops failing. We do have plans to try to automatically scale this based on the amount of memory requested, but it will still just be a heuristic.
I was also getting error
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle
and looking further in log I found
Container killed on request. Exit code is 143
After searching for the exit code, I realized that's its mainly related to memory allocation. So I checked the amount of memory I have configured for executors. I found that by mistake I had configured 7g to driver and only 1g for executor. After increasing the memory of executor my spark job ran successfully.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With