Apache Kafka documentation states:
The internal Kafka Streams consumer max.poll.interval.ms default value was changed from 300000 to Integer.MAX_VALUE
Since this value is used to detect when the processing time for a batch of records exceeds a given threshold, is there a reason for such an "unlimited" value?
Does it enable applications to become unresponsive? Or Kafka Streams has a different way to leave the consumer group when the processing is taking too long?
Does it enable applications to become unresponsive? Or Kafka Streams has a different way to leave the consumer group when the processing is taking too long?
Kafka Streams leverages a heartbeat functionality of the Kafka consumer client in this context, and thus decouples heartbeats ("Is this app instance still alive?") from calls to poll().  The two main parameters are session.timeout.ms (for the heartbeat thread) and max.poll.interval.ms (for the processing thread), and their difference is described in more detail at https://stackoverflow.com/a/39759329/1743580.
The heartbeating was introduced so that an application instance may be allowed to spent a lot of time processing a record without being considered "not making progress" and thus "be dead". For example, your app can do a lot of crunching for a single record for a minute, while still heartbeating to Kafka "Hey, I'm still alive, and I am making progress. But I'm simply not done with the processing yet. Stay tuned."
Of course you can change max.poll.interval.ms from its default (Integer.MAX_VALUE) to a lower setting if, for example, you actually do want your app instance to be considered "dead" if it takes longer than X seconds in-between polling records, and thus if it takes longer than X seconds to process the latest round of records.  It depends on your specific use case whether or not such a configuration makes sense -- in most cases, the default setting is a safe bet.
session.timeout.ms: The timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.
max.poll.interval.ms: The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With