Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic partition is processing on 2 consumers after consumer group rebalance #345

Open
ajax-osadchuk-r opened this issue May 19, 2023 · 12 comments
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...)

Comments

@ajax-osadchuk-r
Copy link

ajax-osadchuk-r commented May 19, 2023

Hello, I have kafka topic with 4 partitions and 2 instances of my application, each of them has established 1 Kafka receiver flux. So, there are 2 consumers in Kafka consumer group, each of them pools events from 2 topic partitions. I use default receiver options with manual acknowledgment and auto-commit enabled.

Lets consider that consumer1 has assigned partition0, partition1, consumer2 - partition2, partition3

I have scaled up my application and deployed 2 more instance, it has triggered consumer group rebalance.

Expected Behavior

After rebalanced there should be 4 Kafka consumers and each of them should process events only from one topic partition.

Actual Behavior

Actually there are 4 Kafka consumer in consumer group, and each of them has only one partition assigned.
But after rebalance partition0 has been assigned on new consumer3. Since partition0 has been previously assigned on consumer1, it has prefetched a batch of events. And after rebalance rebalance was performed these"ore-fetched" events was processed on BOTH consumer.

Note: it is reproducible under high load on Kafka topic. In my case there was a 6000 offset lag on each partition at the moment pf rebalance.

Could you please clarify is it expected behavior for reactive kafka?

I have found a possible workaround for this issue: https://medium.com/thinkspecial/handling-partition-rebalancing-3e4f3cb192b0. It seems it can help, but I have not tested it yet.

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label May 19, 2023
@garyrussell
Copy link
Contributor

garyrussell commented May 22, 2023

Which version are you using? This was fixed in 1.3.12; the current version is 1.3.18.

#281

@garyrussell garyrussell added for/user-attention This issue needs user attention (feedback, rework, etc...) and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels May 22, 2023
@ajax-osadchuk-r
Copy link
Author

ajax-osadchuk-r commented May 22, 2023

@garyrussell I'm using io.projectreactor.kafka:reactor-kafka:1.3.17

@garyrussell
Copy link
Contributor

If you are getting forced rebalances and getting dups, then it means your app is taking too long to process the records returned from a poll.

Try reducing max.poll.records or increasing the maxDelayRebalance receiver property (default is 60 seconds).

@ajax-osadchuk-r
Copy link
Author

ajax-osadchuk-r commented Jun 7, 2023

@garyrussell I have tried with max.poll.records=5 and maxDelayRebalance=60s. As far as I understand, there should be at most 5 events on old consumer when a new one joins to the group.
But actually I observed next log message from reactor.kafka.receiver.internals.ConsumerEventLoop: Rebalancing; waiting for 289 records in pipeline. As a result this issue was reproduced again.
Do you have any thoughts what might be wrong?

UPD: reduce of fetch.max.bytes and max.partition.fetch.bytes does not help as well

UPD: with default max.poll.records=500 I can observe next log message: Rebalancing; waiting for 842 records in pipeline. It seems reducing of max.poll.records helps a bit but it still does not work as expected

@garyrussell
Copy link
Contributor

Those messages imply that your app is failing to acknowledge/commit some (many) offsets.

@ajax-osadchuk-r
Copy link
Author

ajax-osadchuk-r commented Jun 13, 2023

Thats actually it, the issues in this ticket is only reproducible during consumer group rebalance when there is high load on kafka topic (offset lag is ~24k events for topic, 6k for each partition). In that case you would like to scale up your service instances, what leads to consumer group rebalance. But then some events from same partitions are processed on 2 consumers, because they was already in pipeline on old consumer and they were not handled during maxDelayRebalance, because count of such events in pipeline is ~800 due to high load on topic. And I can not increase maxDelayRebalance, because I need to scale consumers ASAP to reduce offset lag, otherwise such scale will be useless if I will need to wait several minutes before rebalance happens

@garyrussell
Copy link
Contributor

You either have to decrease the depth of the pipeline, or increase the time, so it can be processed in that time.

There are no other choices.

@ajax-osadchuk-r
Copy link
Author

ajax-osadchuk-r commented Jun 13, 2023

As I described here, reduce of max.poll.records even to 1 does not help, there are still 289 records in pipeline

Is there any other option to decrease the depth the depth of pipeline?

@garyrussell
Copy link
Contributor

It doesn't make any sense to me, unless you are using a high prefetch in the receive() operation instead of its default (1).

By default, there will be 2 polls worth of records in the pipeline and the consumer is paused when it is full; turning on DEBUG logging will show the back pressure in action (pause after the second poll, resume when the contents of the first poll are processed). If you are not seeing back pressure then something else is wrong.

If you can't figure it out, provide an MCRE that exhibits the behavior.

@ajax-osadchuk-r
Copy link
Author

Hey @garyrussell, I have created minimal reproducible example for you. Please take a look at: https://github.com/ajax-osadchuk-r/kafka-receiver-back-pressure-issue. There are steps to reproduce in readme.

BTW, I have noticed that receiver back pressure is not working when there is Mono#retry or Mono#delay in event handling reactive chain

@garyrussell
Copy link
Contributor

@ajax-osadchuk-r Sorry for the delay in getting back to you; the problem is your groupBy is using the default prefetch (256).

Back pressure is applied properly if you reduce its prefetch:

.groupBy(ReceiverRecord::partition, 5)

@garyrussell
Copy link
Contributor

Also note that, since you are committing offsets out of order, you should set maxDeferredCommits on the receiver options.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...)
Projects
None yet
Development

No branches or pull requests

3 participants