Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the "offset was changed from X to 0" error with a KafkaSource in Spark Structured Streaming?

I'm getting the error "offset was changed from X to 0, some data may have been missed" with a KafkaSource in a Spark Structured Streaming application with checkpointing but it doesn't seem to actually cause any problem. I'm trying to figure out what the error actually means.

My setup is as follows.

  • I have Kafka (0.10.1.0) running in a docker container, with a named volume mounted on /tmp/kafka-logs so that the logs are kept between restarts.

  • I have a Spark Structured Streaming (2.1.1) application in another docker container. The streams consume data from Kafka. They also use checkpointing in locations that are again mounted in a named volume to make sure the metadata is kept between restarts.

  • I use a custom sink that implements the ForeachWriter interface, which means I have to implement my own log of processed versions so that when everything restarts, I can tell Spark Streaming not to reprocess what has already been processed.

All of this works well, the data gets correctly consumed from Kafka and my custom sink correctly processes it.

Now if I kill the Spark Streaming application, let data in Kafka pile up and then restart Spark Streaming, it will throw the following error, indicating that some data isn't available in Kafka any more

ERROR StreamExecution: Query [id = cd2b69e1-2f24-439a-bebc-89e343df83a8, runId = d4b3ae65-8cfa-4713-912c-404623710048] terminated with error

Java.lang.IllegalStateException: Partition input.clientes-0's offset
 was changed from 908 to 0, some data may have been missed.

Some data may have been lost because they are not available in Kafka
 any more; either the data was aged out by Kafka or the topic may have 
 been deleted before all the data in the topic was processed. If you 
 don't want your streaming query to fail on such cases, set the source 
 option "failOnDataLoss" to "false".

at org.apache.spark.sql.kafka010.KafkaSource.org$apache$spark$sql$kafka010$KafkaSource$$reportDataLoss(KafkaSource.scala:329)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:283)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:281)
at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:452)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:448)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:448)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:448)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:447)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)

But after the error gets thrown, I see my streams starting normally. Spark Streaming correctly pushes the data that had piled up in Kafka to my custom sink, with the expected version. My sink then proceeds and correctly processes the new data.

So the error indicates that some data isn't available in Kafka any more, but it still manages to get consumed correctly by Spark Streaming.

If I restart the Spark Streaming application even when no data gets pushed to Kafka, I get the same error again. And if I start pushing new data to Kafka, it will continue to be processed correctly by the system.

Does someone know what could be going on here ? Am I interpreting the error incorrectly ?

like image 260
ThorGutierrez Avatar asked Dec 05 '25 10:12

ThorGutierrez


1 Answers

/tmp/kafka-logs is the logs directory for Kafka where all your offset, topic information is stored. If it is corrupted or some data is deleted, you need to set the option failOnDataLoss:false in Kafka options of your SparkProcessContext and restart the Spark Job.

Option  : failOnDataLoss
Value   : true or false
Default : TRUE

Meaning : Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.

like image 171
milind bhavsar Avatar answered Dec 07 '25 14:12

milind bhavsar



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!