Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalancing always waits until maxDelayRebalance with AckMode.EXACTLY_ONCE #371

Open
damn1kk opened this issue Nov 13, 2023 · 0 comments · May be fixed by #381
Open

Rebalancing always waits until maxDelayRebalance with AckMode.EXACTLY_ONCE #371

damn1kk opened this issue Nov 13, 2023 · 0 comments · May be fixed by #381
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet

Comments

@damn1kk
Copy link

damn1kk commented Nov 13, 2023

Hello, I have a topic with multiple partitions and multiple app instances. I use AckMode.EXACTLY_ONCE with transactional sender.
Rebalance always waits until maxDelayRebalance.
I get a repeated message in the log: reactor.kafka.receiver.internals.ConsumerEventLoop - Rebalancing; waiting for N records in pipeline
Similar to the issue discussed here in the comments , but I have AckMode.EXACTLY_ONCE.

I think the problem is that the inPipeline variable changes in different objects.

isPipeline decreases in object created here:

CommittableBatch offsetBatch = new CommittableBatch();
for (ConsumerRecord<K, V> r : consumerRecords) {
offsetBatch.updateOffset(new TopicPartition(r.topic(), r.partition()), r.offset());

but increases in another object here:

So variable inPipeline is always positive and the condition here is always waits until maxDelayRebalance:

int inPipeline = commitEvent.commitBatch.getInPipeline();
if (inPipeline > 0 || this.awaitingTransaction.get()) {
long end = maxDelayRebalance + System.currentTimeMillis();
do {
try {
log.debug("Rebalancing; waiting for {} records in pipeline", inPipeline);
Thread.sleep(interval);
commitEvent.runIfRequired(true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
inPipeline = commitEvent.commitBatch.getInPipeline();
} while (isActive.get() && (inPipeline > 0 || this.awaitingTransaction.get())
&& System.currentTimeMillis() < end);

Expected Behavior

Rebalancing should complete after all read messages have been processed and the transaction has committed.

Actual Behavior

Rebalancing always waits until maxDelayRebalance.

Example to Reproduce

https://github.com/damn1kk/kafka-rebalance-issue/blob/master/src/test/java/KafkaRebalanceTest.java

        ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(
                        Map.of(
                                ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
                                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
                                ConsumerConfig.GROUP_ID_CONFIG, "consumer-group",
                                ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10,
                                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest",
                                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
                                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
                        )
                )
                .addAssignListener(onAssign -> log.info("For {} assigned: {}", name, onAssign))
                .addRevokeListener(onRevoke -> log.info("For {} revoked: {}", name, onRevoke))
                .maxDeferredCommits(100)
                .maxDelayRebalance(REBALANCE_LIMIT)
                .subscription(List.of(TOPIC_NAME));


        KafkaReceiver.create(receiverOptions)
                .receiveExactlyOnce(innerSender.transactionManager())
                .concatMap(r -> r.groupBy(ConsumerRecord::partition, 1)
                                .flatMap(c -> innerSender.send(
                                        c.map(cr -> {
                                                    try {
                                                        log.info("Handle message by {} with key: {}", name, Integer.parseInt(cr.key()));
                                                        latch.countDown();
                                                        Thread.sleep(1000);
                                                    } catch (InterruptedException e) {
                                                        throw new RuntimeException(e);
                                                    }
                                                    if (Integer.parseInt(cr.key()) % 2 == 0) {
                                                        return new ProducerRecord<>(TOPIC_RESULT1, cr.key(), cr.value());
                                                    } else {
                                                        return new ProducerRecord<>(TOPIC_RESULT2, cr.key(), cr.value());
                                                    }
                                                })
                                                .map(pr -> SenderRecord.create(pr, pr.key()))

                                ))
                                .then(innerSender.transactionManager().commit()),
                        0
                )
                .subscribe();

Environment

  • reactor-kafka 1.3.21
  • kafka-clients 2.8.2
  • reactor-core 3.5.7
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Nov 13, 2023
damn1kk pushed a commit to damn1kk/reactor-kafka that referenced this issue Jan 21, 2024
…rebalancing (reactor#371)

CommitBatch is not used for AckMode.ExactlyOnce, but increasing the number of uncommitted messages in it causes an infinite loop during rebalancing since they never decrease.

Fixes reactor#371.
damn1kk pushed a commit to damn1kk/reactor-kafka that referenced this issue Jan 21, 2024
Don't increment uncommitted messages in CommitBatch for AckMode.ExactlyOnce.
CommitBatch is not used for AckMode.ExactlyOnce, but increasing the number of uncommitted messages in it causes an infinite loop during rebalancing since they never decrease.

Fixes reactor#371.
damn1kk pushed a commit to damn1kk/reactor-kafka that referenced this issue Jan 21, 2024
Don't increment uncommitted messages in CommitBatch for AckMode.ExactlyOnce.
Increasing the number of uncommitted messages in CommitBatch causes an infinite loop during rebalancing because they never decrease.

Fixes reactor#371.
damn1kk added a commit to damn1kk/reactor-kafka that referenced this issue Jan 22, 2024
Don't increment uncommitted messages in CommitBatch for AckMode.ExactlyOnce.
Increasing the number of uncommitted messages in CommitBatch causes an infinite loop during rebalancing because they never decrease.

Fixes reactor#371.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet
Projects
None yet
2 participants