Skip to content

GH-3012: Non-blocking retries support @KafkaListener on class level #3105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Version 2.9 changed the mechanism to bootstrap infrastructure beans; see xref:re
Achieving non-blocking retry / dlt functionality with Kafka usually requires setting up extra topics and creating and configuring the corresponding listeners.
Since 2.7 Spring for Apache Kafka offers support for that via the `@RetryableTopic` annotation and `RetryTopicConfiguration` class to simplify that bootstrapping.

Since 3.2, Spring for Apache Kafka supports non-blocking retries with xref:kafka/receiving-messages/class-level-kafkalistener.adoc[@KafkaListener on a Class].

IMPORTANT: Non-blocking retries are not supported with xref:kafka/receiving-messages/listener-annotation.adoc#batch-listeners[Batch Listeners].

IMPORTANT: Non-Blocking Retries cannot combine with xref:kafka/transactions.adoc#container-transaction-manager[Container Transactions].
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ public void processMessage(MyPojo message) {
}
----

Since 3.2, `@RetryableTopic` support for @KafkaListener on a class would be:
[source,java]
----
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {

@KafkaHandler
public void processMessage(MyPojo message) {
// ... message processing
}

}
----

You can specify a method in the same class to process the dlt messages by annotating it with the `@DltHandler` annotation.
If no DltHandler method is provided a default consumer is created which only logs the consumption.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Proc
Change `@RetryableTopic` property `SameIntervalTopicReuseStrategy` default value to `SINGLE_TOPIC`.
See xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[Single Topic for maxInterval Exponential Delay].

=== Non-blocking retries support class level @KafkaListener
Non-blocking retries support xref:kafka/receiving-messages/class-level-kafkalistener.adoc[@KafkaListener on a Class].
See xref:retrytopic.adoc[Non-Blocking Retries].

=== Support process @RetryableTopic on a class in RetryTopicConfigurationProvider.
Provides a new public API to find `RetryTopicConfiguration`.
See xref:retrytopic/retry-config.adoc#find-retry-topic-config[Find RetryTopicConfiguration]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>

private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;

@Nullable
private ApplicationContext applicationContext;

private BeanFactory beanFactory;
Expand All @@ -197,6 +198,7 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>

private AnnotationEnhancer enhancer;

@Nullable
private RetryTopicConfigurer retryTopicConfigurer;

@Override
Expand Down Expand Up @@ -273,9 +275,11 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
public synchronized void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableListableBeanFactory clbf) {
this.resolver = clbf.getBeanExpressionResolver();
this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
this.listenerScope);
BeanExpressionResolver beanExpressionResolver = clbf.getBeanExpressionResolver();
if (beanExpressionResolver != null) {
this.resolver = beanExpressionResolver;
}
this.expressionContext = new BeanExpressionContext(clbf, this.listenerScope);
}
}

Expand Down Expand Up @@ -333,9 +337,11 @@ public void afterSingletonsInstantiated() {

// Actually register all listeners
this.registrar.afterPropertiesSet();
Map<String, ContainerGroupSequencer> sequencers =
this.applicationContext.getBeansOfType(ContainerGroupSequencer.class, false, false);
sequencers.values().forEach(ContainerGroupSequencer::initialize);
if (this.applicationContext != null) {
Map<String, ContainerGroupSequencer> sequencers =
this.applicationContext.getBeansOfType(ContainerGroupSequencer.class, false, false);
sequencers.values().forEach(ContainerGroupSequencer::initialize);
}
}

private void buildEnhancer() {
Expand Down Expand Up @@ -368,36 +374,36 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class<?> targetClass = AopUtils.getTargetClass(bean);
Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = !classLevelListeners.isEmpty();
final List<Method> multiMethods = new ArrayList<>();
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (hasClassLevelListeners) {
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
(ReflectionUtils.MethodFilter) method ->
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
multiMethods.addAll(methodsWithHandler);
}
if (annotatedMethods.isEmpty() && !hasClassLevelListeners) {
boolean hasClassLevelListeners = !classLevelListeners.isEmpty();
boolean hasMethodLevelListeners = !annotatedMethods.isEmpty();
if (!hasMethodLevelListeners && !hasClassLevelListeners) {
this.nonAnnotatedClasses.add(bean.getClass());
this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
}
else {
// Non-empty set of methods
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
processKafkaListener(listener, method, bean, beanName);
if (hasMethodLevelListeners) {
// Non-empty set of methods
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
processKafkaListener(listener, method, bean, beanName);
}
}
this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
+ beanName + "': " + annotatedMethods);
}
if (hasClassLevelListeners) {
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
(ReflectionUtils.MethodFilter) method ->
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
List<Method> multiMethods = new ArrayList<>(methodsWithHandler);
processMultiMethodListeners(classLevelListeners, multiMethods, targetClass, bean, beanName);
}
this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
+ beanName + "': " + annotatedMethods);
}
if (hasClassLevelListeners) {
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
}
}
return bean;
Expand Down Expand Up @@ -444,30 +450,25 @@ private KafkaListener enhance(AnnotatedElement element, KafkaListener ann) {
}

private synchronized void processMultiMethodListeners(Collection<KafkaListener> classLevelListeners,
List<Method> multiMethods, Object bean, String beanName) {
List<Method> multiMethods, Class<?> clazz, Object bean, String beanName) {

List<Method> checkedMethods = new ArrayList<>();
Method defaultMethod = null;
for (Method method : multiMethods) {
Method checked = checkProxy(method, bean);
KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
if (annotation != null && annotation.isDefault()) {
final Method toAssert = defaultMethod;
Method toAssert = defaultMethod;
Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
+ toAssert.toString() + " and " + method.toString());
+ toAssert.toString() + " and " + method);
defaultMethod = checked;
}
checkedMethods.add(checked);
}
for (KafkaListener classLevelListener : classLevelListeners) {
MultiMethodKafkaListenerEndpoint<K, V> endpoint =
new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
String beanRef = classLevelListener.beanRef();
this.listenerScope.addListener(beanRef, bean);
endpoint.setId(getEndpointId(classLevelListener));
processListener(endpoint, classLevelListener, bean, beanName, resolveTopics(classLevelListener),
resolveTopicPartitions(classLevelListener));
this.listenerScope.removeListener(beanRef);
processMainAndRetryListeners(classLevelListener, bean, beanName, endpoint, null, clazz);
}
}

Expand All @@ -477,39 +478,34 @@ protected synchronized void processKafkaListener(KafkaListener kafkaListener, Me
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setMethod(methodToUse);
processMainAndRetryListeners(kafkaListener, bean, beanName, endpoint, methodToUse, null);
}

private void processMainAndRetryListeners(KafkaListener kafkaListener, Object bean, String beanName,
MethodKafkaListenerEndpoint<K, V> endpoint, @Nullable Method methodToUse, @Nullable Class<?> clazz) {

String beanRef = kafkaListener.beanRef();
this.listenerScope.addListener(beanRef, bean);
endpoint.setId(getEndpointId(kafkaListener));
String[] topics = resolveTopics(kafkaListener);
TopicPartitionOffset[] tps = resolveTopicPartitions(kafkaListener);
if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {
if (!processMainAndRetryListeners(kafkaListener, bean, beanName, endpoint, topics, tps, methodToUse, clazz)) {
processListener(endpoint, kafkaListener, bean, beanName, topics, tps);
}
this.listenerScope.removeListener(beanRef);
}

private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object bean, String beanName,
Method methodToUse, MethodKafkaListenerEndpoint<K, V> endpoint, String[] topics,
TopicPartitionOffset[] tps) {

String[] retryableCandidates = topics;
if (retryableCandidates.length == 0 && tps.length > 0) {
retryableCandidates = Arrays.stream(tps)
.map(tp -> tp.getTopic())
.distinct()
.toList()
.toArray(new String[0]);
}
MethodKafkaListenerEndpoint<K, V> endpoint, String[] topics, TopicPartitionOffset[] tps,
@Nullable Method methodToUse, @Nullable Class<?> clazz) {

String[] retryableCandidates = getTopicsFromTopicPartitionOffset(topics, tps);
RetryTopicConfiguration retryTopicConfiguration = new RetryTopicConfigurationProvider(this.beanFactory,
this.resolver, this.expressionContext)
.findRetryConfigurationFor(retryableCandidates, methodToUse, bean);

.findRetryConfigurationFor(retryableCandidates, methodToUse, clazz, bean);
if (retryTopicConfiguration == null) {
String[] candidates = retryableCandidates;
this.logger.debug(() ->
"No retry topic configuration found for topics " + Arrays.toString(candidates));
"No retry topic configuration found for topics " + Arrays.toString(retryableCandidates));
return false;
}

Expand All @@ -525,6 +521,18 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
return true;
}

private String[] getTopicsFromTopicPartitionOffset(String[] topics, TopicPartitionOffset[] tps) {
String[] retryableCandidates = topics;
if (retryableCandidates.length == 0 && tps.length > 0) {
retryableCandidates = Arrays.stream(tps)
.map(TopicPartitionOffset::getTopic)
.distinct()
.toList()
.toArray(new String[0]);
}
return retryableCandidates;
}

private RetryTopicConfigurer getRetryTopicConfigurer() {
if (this.retryTopicConfigurer == null) {
try {
Expand Down Expand Up @@ -737,7 +745,7 @@ private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListener k
private void resolveContainerPostProcessor(MethodKafkaListenerEndpoint<?, ?> endpoint,
KafkaListener kafkaListener) {

final String containerPostProcessor = kafkaListener.containerPostProcessor();
String containerPostProcessor = kafkaListener.containerPostProcessor();
if (StringUtils.hasText(containerPostProcessor)) {
endpoint.setContainerPostProcessor(this.beanFactory.getBean(containerPostProcessor,
ContainerPostProcessor.class));
Expand Down Expand Up @@ -804,7 +812,8 @@ private String getEndpointId(KafkaListener kafkaListener) {
}
}

private String getEndpointGroupId(KafkaListener kafkaListener, String id) {
@Nullable
private String getEndpointGroupId(KafkaListener kafkaListener, @Nullable String id) {
String groupId = null;
if (StringUtils.hasText(kafkaListener.groupId())) {
groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");
Expand Down Expand Up @@ -1086,8 +1095,7 @@ private void addFormatters(FormatterRegistry registry) {

private <T> Collection<T> getBeansOfType(Class<T> type) {
if (KafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ListableBeanFactory lbf) {
return lbf.getBeansOfType(type)
.values();
return lbf.getBeansOfType(type).values();
}
else {
return Collections.emptySet();
Expand Down Expand Up @@ -1241,7 +1249,7 @@ public interface AnnotationEnhancer extends BiFunction<Map<String, Object>, Anno

}

private final class BytesToNumberConverter implements ConditionalGenericConverter {
private static final class BytesToNumberConverter implements ConditionalGenericConverter {

BytesToNumberConverter() {
}
Expand All @@ -1265,6 +1273,9 @@ public Set<ConvertiblePair> getConvertibleTypes() {
@Nullable
public Object convert(@Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
byte[] bytes = (byte[]) source;
if (bytes == null) {
return null;
}
if (targetType.getType().equals(long.class) || targetType.getType().equals(Long.class)) {
Assert.state(bytes.length >= 8, "At least 8 bytes needed to convert a byte[] to a long"); // NOSONAR
return ByteBuffer.wrap(bytes).getLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.EndpointHandlerMethod;
import org.springframework.kafka.support.EndpointHandlerMultiMethod;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
Expand Down Expand Up @@ -274,7 +275,17 @@ public MessageListenerContainer unregisterListenerContainer(String id) {
protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> factory) {

if (endpoint instanceof MethodKafkaListenerEndpoint<?, ?> mkle) {
if (endpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> mmkle) {
Object bean = mmkle.getBean();
if (bean instanceof EndpointHandlerMultiMethod ehmm) {
ehmm = new EndpointHandlerMultiMethod(ehmm.resolveBean(this.applicationContext),
ehmm.getDefaultMethod(), ehmm.getMethods());
mmkle.setBean(ehmm.resolveBean(this.applicationContext));
mmkle.setDefaultMethod(ehmm.getDefaultMethod());
mmkle.setMethods(ehmm.getMethods());
}
}
else if (endpoint instanceof MethodKafkaListenerEndpoint<?, ?> mkle) {
Object bean = mkle.getBean();
if (bean instanceof EndpointHandlerMethod ehm) {
ehm = new EndpointHandlerMethod(ehm.resolveBean(this.applicationContext), ehm.getMethodName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,19 @@
* }</code>
* }</code>
*</pre>
* <p> Since 3.2, {@link org.springframework.kafka.annotation.RetryableTopic} annotation supports
* {@link org.springframework.kafka.annotation.KafkaListener} annotated class, such as:
* <pre>
* <code>@RetryableTopic(attempts = 3,
* backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))</code>
* <code>@KafkaListener(topics = "my-annotated-topic")
* static class ListenerBean {
* <code> @KafkaHandler
* public void processMessage(MyPojo message) {
* // ... message processing
* }</code>
* }</code>
*</pre>
* <p> Or through meta-annotations, such as:
* <pre>
* <code>@RetryableTopic(backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))</code>
Expand Down
Loading