Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does collecting dataset fail with org.apache.spark.shuffle.FetchFailedException?

I use Spark with YARN cluster manager.

I create a dataset from a Cassandra table with around 700 rows with 5 columns in that and one of the column with data in JSON format. The amount of data is in MB only.

I run spark-shell with:

  • spark.executor.memory=4g
  • spark.driver.memory=2g

I am getting this error:

org.apache.spark.shuffle.FetchFailedException: Failed to connect to bosirahtaicas02.bscuat.local/172.17.0.1:53093 at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)

while I am trying to collect data from my dataFrame

I run the following piece of code directly in spark-shell (line by line):

import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext

import com.datastax.spark.connector.cql._
import com.datastax.spark.connector._

import org.json._
import java.sql.Timestamp

val sqc = new SQLContext(sc)
val csc = new CassandraSQLContext(sc)

val allRecordsDF = sqc.read.format("org.apache.spark.sql.cassandra").
    option("table", "xyz").
    option("keyspace", 'pqr").load().cache()

val allRowsDF = allRecordsDF.select(allRecordsDF.col("record_type"), allRecordsDF.col("process_time"))

val allPatientsTS = allRowsDF.where(allRowsDF("record_type") === "patient").select("process_time").sort("process_time")

Here when I am trying to collect allPatientsTS Dataframe it is showing error.

like image 851
Divas Nikhra Avatar asked Dec 06 '25 10:12

Divas Nikhra


1 Answers

Quoting my notes about FetchFailedException:

FetchFailedException exception may be thrown when a task runs (and ShuffleBlockFetcherIterator did not manage to fetch shuffle blocks).

The root cause of a FetchFailedException is usually because the executor (with the BlockManager for the shuffle blocks) is lost (i.e. no longer available) due to:

  • OutOfMemoryError could be thrown (aka OOMed) or some other unhandled exception.

  • The cluster manager that manages the workers with the executors of your Spark application, e.g. YARN (this is the case here), enforces the container memory limits and eventually decided to kill the executor due to excessive memory usage.

You should review the logs of the Spark application using web UI, Spark History Server or cluster-specific tools like yarn logs -applicationId for Hadoop YARN.

A solution is usually to tune the memory of your Spark application.

As you can see, it's hard to say exactly what caused the FetchFailedException without extensive review of your dataset (most importantly its size and partitioning scheme on Spark) and logs in YARN.

like image 90
Jacek Laskowski Avatar answered Dec 08 '25 00:12

Jacek Laskowski



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!