Skip to content

Commit

Permalink
GH-3011: Support enforced consumer rebalance
Browse files Browse the repository at this point in the history
Fixes: #3011

Kafka consumer API supports an enforced rebalance.
Provide an option via the message listener container to trigger this operation.
  • Loading branch information
sobychacko authored and artembilan committed Feb 26, 2024
1 parent 20bd1fe commit ef8c040
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[[enforced-rebalance]]
= Enforcing Consumer Rebalance

Kafka clients now support an option to trigger an https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer[enforced rebalance].
Starting with version `3.1.2`, Spring for Apache Kafka provides an option to invoke this API on the Kafka consumer via the message listener container.
When calling this API, it is simply alerting the Kafka consumer to trigger an enforced rebalance; the actual rebalance will only occur as part of the next `poll()` operation.
If there is already a rebalance in progress, calling an enforced rebalance is a NO-OP.
The caller must wait for the current rebalance to complete before invoking another one.
See the javadocs for `enfroceRebalance` for more details.

The following code snippet shows the essence of enforcing a rebalance using the message listener container.

[source, java]
----
@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
System.out.println("From KafkaListener: " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
return args -> {
final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
System.out.println("Enforcing a rebalance");
Thread.sleep(5_000);
listenerContainer.enforceRebalance();
Thread.sleep(5_000);
};
}
----

As the code above shows, the application uses the `KafkaListenerEndpointRegistry` to gain access to the message listener container and then calling the `enforceRebalnce` API on it.
When calling the `enforceRebalance` on the listener container, it delegates the call to the underlying Kafka consumer.
The Kafka consumer will trigger a rebalance as part of the next `poll()` operation.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -66,6 +67,7 @@
* @author Marius Bogoevici
* @author Artem Bilan
* @author Tomaz Fernandes
* @author Soby Chacko
*/
public abstract class AbstractMessageListenerContainer<K, V>
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware,
Expand All @@ -90,6 +92,8 @@ public abstract class AbstractMessageListenerContainer<K, V>

protected final Object lifecycleMonitor = new Object(); // NOSONAR

protected final AtomicBoolean enforceRebalanceRequested = new AtomicBoolean();

private final Set<TopicPartition> pauseRequestedPartitions = ConcurrentHashMap.newKeySet();

@NonNull
Expand Down Expand Up @@ -138,6 +142,7 @@ public abstract class AbstractMessageListenerContainer<K, V>
@Nullable
private KafkaAdmin kafkaAdmin;


/**
* Construct an instance with the provided factory and properties.
* @param consumerFactory the factory.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2023 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -369,6 +369,16 @@ && getContainerProperties().isRestartAfterAuthExceptions()
}
}

@Override
public void enforceRebalance() {
synchronized (this.lifecycleMonitor) {
// Since the rebalance is for the whole consumer group, there is no need to
// initiate this operation for every single container in the group.
final KafkaMessageListenerContainer<K, V> listenerContainer = this.containers.get(0);
listenerContainer.enforceRebalance();
}
}

@Override
public void pause() {
synchronized (this.lifecycleMonitor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,15 @@ public boolean isInExpectedState() {
return isRunning() || isStoppedNormally();
}

@Override
public void enforceRebalance() {
this.thisOrParentContainer.enforceRebalanceRequested.set(true);
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
if (consumer != null) {
consumer.wakeIfNecessary();
}
}

@Override
public void pause() {
super.pause();
Expand Down Expand Up @@ -1443,6 +1452,7 @@ protected void pollAndInvoke() {
if (!this.seeks.isEmpty()) {
processSeeks();
}
enforceRebalanceIfNecessary();
pauseConsumerIfNecessary();
pausePartitionsIfNecessary();
this.lastPoll = System.currentTimeMillis();
Expand Down Expand Up @@ -1758,6 +1768,20 @@ private void sleepFor(Duration duration) {
}
}

private void enforceRebalanceIfNecessary() {
try {
if (KafkaMessageListenerContainer.this.thisOrParentContainer.enforceRebalanceRequested.get()) {
String enforcedRebalanceReason = String.format("Enforced rebalance requested for container: %s",
KafkaMessageListenerContainer.this.getListenerId());
this.logger.info(enforcedRebalanceReason);
this.consumer.enforceRebalance(enforcedRebalanceReason);
}
}
finally {
KafkaMessageListenerContainer.this.thisOrParentContainer.enforceRebalanceRequested.set(false);
}
}

private void pauseConsumerIfNecessary() {
if (this.offsetsInThisBatch != null) {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@
* @author Vladimir Tsanev
* @author Tomaz Fernandes
* @author Francois Rosiere
* @author Soby Chacko
*/
public interface MessageListenerContainer extends SmartLifecycle, DisposableBean {

Expand Down Expand Up @@ -85,6 +86,16 @@ default Map<String, Collection<TopicPartition>> getAssignmentsByClientId() {
throw new UnsupportedOperationException("This container doesn't support retrieving its assigned partitions");
}

/**
* Alerting the consumer to trigger an enforced rebalance. The actual enforce will happen
* when the next poll() operation is invoked.
* @since 3.1.2
* @see org.apache.kafka.clients.consumer.KafkaConsumer#enforceRebalance()
*/
default void enforceRebalance() {
throw new UnsupportedOperationException("This container doesn't support enforced rebalance");
}

/**
* Pause this container before the next poll(). This is a thread-safe operation, the
* actual pause is processed by the consumer thread.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Soby Chacko
* @since 3.1.2
*/
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = "enforce-rebalance-topic")
public class ContainerEnforceRebalanceTests {

@Test
void enforceRebalance(@Autowired Config config, @Autowired KafkaTemplate<Integer, String> template,
@Autowired KafkaListenerEndpointRegistry registry) throws InterruptedException {
template.send("enforce-rebalance-topic", "my-data");
final MessageListenerContainer listenerContainer = registry.getListenerContainer("enforce-rebalance-grp");
assertThat(config.listenerLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listenerContainer).isNotNull();
listenerContainer.enforceRebalance();
assertThat(((ConcurrentMessageListenerContainer<?, ?>) listenerContainer).enforceRebalanceRequested).isTrue();
// The test is expecting partition revoke once and assign twice.
assertThat(config.partitionRevokedLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(config.partitionAssignedLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(((ConcurrentMessageListenerContainer<?, ?>) listenerContainer).enforceRebalanceRequested).isFalse();
listenerContainer.pause();
await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(listenerContainer.isPauseRequested()).isTrue());
await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(listenerContainer.isContainerPaused()).isTrue());
// resetting the latches
config.partitionRevokedLatch = new CountDownLatch(1);
config.partitionAssignedLatch = new CountDownLatch(1);
listenerContainer.enforceRebalance();
assertThat(config.partitionRevokedLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(config.partitionAssignedLatch.await(10, TimeUnit.SECONDS)).isTrue();
// Although the rebalance causes the consumer to resume again, since the container is paused,
// it will pause the rebalanced consumers again.
assertThat(listenerContainer.isPauseRequested()).isTrue();
assertThat(listenerContainer.isContainerPaused()).isTrue();
}

@Configuration
@EnableKafka
public static class Config {

@Autowired
EmbeddedKafkaBroker broker;

CountDownLatch partitionRevokedLatch = new CountDownLatch(1);

CountDownLatch partitionAssignedLatch = new CountDownLatch(2);

CountDownLatch listenerLatch = new CountDownLatch(1);

@KafkaListener(id = "enforce-rebalance-grp", topics = "enforce-rebalance-topic")
void listen(ConsumerRecord<Integer, String> ignored) {
listenerLatch.countDown();
}

@Bean
KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> pf) {
return new KafkaTemplate<>(pf);
}

@Bean
ProducerFactory<Integer, String> pf() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(this.broker));
}

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
ConsumerFactory<Integer, String> cf) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf);
factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
partitionAssignedLatch.countDown();
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitionRevokedLatch.countDown();
}
});
return factory;
}

@Bean
ConsumerFactory<Integer, String> cf() {
return new DefaultKafkaConsumerFactory<>(
KafkaTestUtils.consumerProps("enforce-rebalance-topic", "false", this.broker));
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -2538,6 +2538,35 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("Stop rebalance after failed record");
}

@Test
@SuppressWarnings("unchecked")
void enforceRabalanceOnTheConsumer() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
ContainerProperties containerProps = new ContainerProperties("enforce-rebalance-test-topic");
containerProps.setGroupId("grp");
containerProps.setAckMode(AckMode.RECORD);
containerProps.setClientId("clientId");
containerProps.setIdleBetweenPolls(10000L);

Consumer<Integer, String> consumer = mock(Consumer.class);
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);

CountDownLatch enforceRebalanceLatch = new CountDownLatch(1);
containerProps.setMessageListener((MessageListener<Object, Object>) data -> {
});
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
willAnswer(i -> {
enforceRebalanceLatch.countDown();
container.stop();
return null;
}).given(consumer).enforceRebalance(any());

container.start();
container.enforceRebalance();
assertThat(enforceRebalanceLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@SuppressWarnings({ "unchecked" })
@Test
public void testPauseResumeAndConsumerSeekAware() throws Exception {
Expand Down

0 comments on commit ef8c040

Please sign in to comment.