-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: filtered records holding up pipeline with destination batching #1987
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good.
@raulb @maha-hajja Thanks for the review and the suggestions! I've updated the PR with a few more tests. I've also tested it with a few pipeline:
I've tested all of them with and without batching (batch size + batch delay). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work!
Co-authored-by: Maha Hajja <[email protected]>
…ub.com:ConduitIO/conduit into haris/fix-filtered-records-holding-up-pipeline
Description
In #1986 we discovered an issue where, if there are records to be filtered and batching with a time delay is used, Conduit is waiting for the batch delay as soon as it encounters a filtered record.
The underlying issue is the following: when a processor node in a pipeline identifies a filtered record, then it goes on to acknowledge it. However, the acker node will not acknowledge it until the previous records have been acknowledged.
If batching is enabled, and the batch is not full, then Conduit is waiting for the batch to fill up, to flush it and ack the records. This creates a cyclic dependency: the filtered record is waiting for the previous records to be acked (the batch flushed), and the previous records are waiting for the filtered record to be acked, so that new records can come in and form a full batch.
This issue is solved by not nacking the filtered records in the processor. Instead, we let them pass through the pipeline until they reach the final node, the destination record, which simply acks them.
Fixes #1986.
Quick checks