I am searching for a way to make a Google DataFlow job stop ingesting from Pub/Sub when a (specific) exception happens.
The events from Pub/Sub are JSON read via PubsubIO.Read.Bound<TableRow> using TableRowJsonCoder and directly streamed to BigQuery with
BigQueryIO.Write.Bound.
(There is a ParDo inbetween that changes the contents of one field and some custom partitioning by day happening, but that should be irrelevant for this purpose.)
When there are fields in the events/rows ingested from PubSub that are not columns in the destination BigQuery table, the DataFlow job logs IOExceptions at run time claiming it could not insert the rows, but seems to acknowledge these messages and continues running.
What I want to do instead is to stop ingesting messages from Pub/Sub and/or make the Dataflow job crash, so that alerting could be based on the age of oldest unacknowledged message. At the very least I want to make sure that those Pub/Sub messages that failed to be inserted to BigQuery are not ack'ed so that I can fix the problem, restart the Dataflow job and consume those messages again.
I know that one suggested solution for handling faulty input is described here: https://cloud.google.com/blog/big-data/2016/01/handling-invalid-inputs-in-dataflow
I am also aware of this PR on Apache Beam that would allow inserting the rows without the offending fields: https://github.com/apache/beam/pull/1778
However in my case I don't really want to guard from faulty input but rather from programmer errors, i.e. the fact that new fields were added to the JSON messages which are pushed to Pub/Sub, but the corresponding DataFlow job was not updated. So I don't really have faulty data, I rather simply want to crash when a programmer makes the mistake not to deploy a new Dataflow job before changing anything about the message format.
I assume it would be possible to (analogue to the blog post solution) create a custom ParDo that validates each row and throws an exception that isn't caught and leads to a crash.
But ideally, I would just like to have some configuration that does not handle the insert error and logs it but instead just crashes the job or at least stops ingestion.
You could have a ParDo with a DoFn which sits before the BQ write. The DoFn would be responsible to get the output table schema every X mins and would validate that each record that is to be written matches the expected output schema (and throw an exception if it doesn't).
Old Pipeline:
PubSub -> Some Transforms -> BQ Sink
New Pipeline:
PubSub -> Some Transforms -> ParDo(BQ Sink Validator) -> BQ Sink
This has the advantage that once someone fixes the output table schema, the pipeline will recover. You'll want to throw a good error messaging stating whats wrong with the incoming PubSub message.
Alternatively, you could have the BQ Sink Validator instead output messages to a PubSub DLQ (monitoring its size). Operationally you would have to update the table and then re-ingest the DLQ back in as an input. This has the advantage that only bad messages block pipeline execution.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With