There is a continuous stream of data, after all transformations it has next schema:
root
|-- message_id: string (nullable = true)
|-- device_id: string (nullable = true)
|-- metric_id: long (nullable = true)
|-- value: long (nullable = true)
|-- timestamp: string (nullable = true)
There is also set of rules, i.e.:
if metric_id = 4077 and value > 10 and value < 25
That means if any row in a stream meets that condition, then this row must be pushed into a different stream.
How identify messages that meet alert criteria (there are several) and after push them to different stream?
Spark Structured Streaming applications allow you to have multiple output streams using the same input stream.
That means, if for example df
is your input streaming DataFrame you could just define a DataFrame filter and use the resulting, filtered DataFrame for another output stream as below:
df = readStream.format(...).options(...).load().select(...)
# create a new DataFrame that only contains alters
alertsDf = df.filter( (df.metric_id == "4077") & (df.value > 10) & (df.value < 45) )
# use both DataFrames for output streams
df.writeStream.format(...).options(...).start()
alertsDf.writeStream.format(...).options(...).start()
spark.streams.awaitTermination()
For fault-tolerance it is recommended to set the option checkpointLocation
for each output stream separately.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With