-
Notifications
You must be signed in to change notification settings - Fork 397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add global consume timeout #1061
base: master
Are you sure you want to change the base?
feat: add global consume timeout #1061
Conversation
…consumenum worker the consumenum worker consumes up to a certain # of messages, and only applies the configured timeout to a per-message consumption, breaking out of the consume loop when: 1. the end of the partition is reached (if partition eof messages are enabled) 2. a consume times out 3. the batch size is filled this leaves the worker susceptible to slow-loris type issues, where messages come in at a cadence *just* under the timeout (e.g. every 900 milliseconds) -- with a batch size of 1000, that means that `consumer.consume(1000, 1000, cb)` could take as many as 900 seconds to complete, which is less than ideal. introduce a configuration in the consumenum worker that allows callers to specify a "total" timeout that gets applied to the entire operation -- when exceeded, we break out of the loop and return the batch, no matter how many messages were read.
…++-land this builds on the previous commit by: 1. implementing the total consume timeout parameter on the consume method's binding 2. cleaning up the argument conversions a little bit to make them cleaner to read -- instead of deciding whether we're in the two-argument consume(timeout, cb) variant or the newly-4-argument consume(timeout, totalTimeout, num, cb) variant based on the type of the 2nd argument, it instead looks at the # of arguments
builds on top of the previous two commits and threads the total consume timeout value into javascript-land. the default is `undefined`, as that will result in no behavioral changes for consumers. the new behavior is thus entirely opt-in.
e2e is failing, appears to be related to the argument checking — I suspect I either didn’t rebuild the integration fully or messed up a check somewhere… |
Fixed — in the process, ended up cleaning up C++-land a bit and killing some dead code + some code that I'm not sure I understand how it ever worked. :) |
@GaryWilber @iradul It might be good to get one of you to look at both this and #1053 and decide which approach to take, merge one, and do some testing before cutting a release. I'm fairly confident in mine, but it could probably use more testing regardless… |
This solves a similar problem to #1052, but does it with an approach that I think is a little cleaner and doesn't change existing consumer behavior — even if the existing consumer behavior is arguably wrong, all behavioral changes are susceptible to the spacebar heating problem.
Rather than making the specified timeout both the single-message consume timeout and the batch timeout, it introduces a new value called the total consume timeout, which controls how long we can spend trying to fill a batch of
n
messages with theconsume(num, cb)
variant, and checks it on every iteration of the consume loop.I did borrow the tests from that PR though, as they provided a pretty elegant framework to check the approach.