I use Spark with YARN cluster manager.
I create a dataset from a Cassandra table with around 700 rows with 5 columns in that and one of the column with data in JSON format. The amount of data is in MB only.
I run spark-shell with:
I am getting this error:
org.apache.spark.shuffle.FetchFailedException: Failed to connect to bosirahtaicas02.bscuat.local/172.17.0.1:53093 at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
while I am trying to collect data from my dataFrame
I run the following piece of code directly in spark-shell (line by line):
import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import com.datastax.spark.connector.cql._
import com.datastax.spark.connector._
import org.json._
import java.sql.Timestamp
val sqc = new SQLContext(sc)
val csc = new CassandraSQLContext(sc)
val allRecordsDF = sqc.read.format("org.apache.spark.sql.cassandra").
option("table", "xyz").
option("keyspace", 'pqr").load().cache()
val allRowsDF = allRecordsDF.select(allRecordsDF.col("record_type"), allRecordsDF.col("process_time"))
val allPatientsTS = allRowsDF.where(allRowsDF("record_type") === "patient").select("process_time").sort("process_time")
Here when I am trying to collect allPatientsTS Dataframe it is showing error.
Quoting my notes about FetchFailedException:
FetchFailedExceptionexception may be thrown when a task runs (andShuffleBlockFetcherIteratordid not manage to fetch shuffle blocks).
The root cause of a FetchFailedException is usually because the executor (with the BlockManager for the shuffle blocks) is lost (i.e. no longer available) due to:
OutOfMemoryError could be thrown (aka OOMed) or some other unhandled exception.
The cluster manager that manages the workers with the executors of your Spark application, e.g. YARN (this is the case here), enforces the container memory limits and eventually decided to kill the executor due to excessive memory usage.
You should review the logs of the Spark application using web UI, Spark History Server or cluster-specific tools like yarn logs -applicationId for Hadoop YARN.
A solution is usually to tune the memory of your Spark application.
As you can see, it's hard to say exactly what caused the FetchFailedException without extensive review of your dataset (most importantly its size and partitioning scheme on Spark) and logs in YARN.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With