Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

When using Spark structured streaming , how to just get the aggregation result of current batch, like Spark Streaming?

One big different between Spark Structure Streaming (SSS) and Spark Streaming (SS) is SSS can leverage statestore. It can store aggregation result of previous batches and apply current result with previous result. So it can get the real aggregation result from the very beginning of the input stream.

But for one case, we don't want to get the final result that merged with previous value of statestore. We just want to get (ouput) the aggregation result of current batch. And duo to the platform&framework thing, we can't rollback to SS.

So my question is, is it still doable in SSS to get the aggretation result of current batch, like SS?

Taking word count application for example which is given in the spark structure streaming guide: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

When there is a "cat cat" comes in one batch, my expected output is cat|2.

And when "cat" comes in next batch, my expected output is cat|1

like image 642
DeepNightTwo Avatar asked Oct 15 '25 16:10

DeepNightTwo


2 Answers

is it still doable in SSS to get the aggretation result of current batch, like SS?

One way to achieve what you want is to control the state store yourself using mapGroupsWithState, and use it as a kind of degenerate store that actually doesn't do anything. For example:

val spark =
  SparkSession.builder().appName("bla").master("local[*]").getOrCreate()

import spark.implicits._

val socketDF = spark.readStream
  .format("socket")
  .option("host", "127.0.0.1")
  .option("port", 9999)
  .load()

socketDF
  .as[String]
  .map { str =>
    val Array(key, value) = str.split(';')
    (key, value)
  }
  .groupByKey { case (key, _) => key }
  .mapGroupsWithState((str: String,
                       tuples: Iterator[(String, String)],
                       value: GroupState[Int]) => {
    (str, tuples.size)
  })
  .writeStream
  .outputMode(OutputMode.Update())
  .format("console")
  .start()
  .awaitTermination()

Assuming I have a stream of values in the format of key;value, this will just use mapGroupsWithState as a pass through store and not actually accumulate any results. That way, for each batch, you get a clean state with no previously aggregated data.

like image 103
Yuval Itzchakov Avatar answered Oct 18 '25 01:10

Yuval Itzchakov


In Spark 2.4 there seems to be an easier way to achieve this, which is using the

foreachBatch

operation, as you can read in Spark documentation.

However, I am using 2.3 version of Spark and have not managed to solve this problem.

like image 23
messenjah00 Avatar answered Oct 18 '25 02:10

messenjah00