Skip to content

Commit

Permalink
@RetryableTopic support KL annotated on class part 2
Browse files Browse the repository at this point in the history
* `EndpointCustomizerFactory` adaptor `MultiMethodKafkaListenerEndpoint`.
* `RetryTopicConfigurer.processAndRegisterEndpoint` support `@KafkaListener` on a class.
* Add new class `EndpointHandlerMultiMethod` to handler multi method for retrying endpoints.

part2 of spring-projects#3105 and this contributes to fixing spring-projects#3105 eventually
  • Loading branch information
Wzy19930507 committed Mar 10, 2024
1 parent a41c03a commit a1f818e
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 139 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,15 +35,16 @@
* @param <V> the value type.
*
* @author Gary Russell
* @author Wang Zhiyang
*
* @see org.springframework.kafka.annotation.KafkaHandler
* @see DelegatingInvocableHandler
*/
public class MultiMethodKafkaListenerEndpoint<K, V> extends MethodKafkaListenerEndpoint<K, V> {

private final List<Method> methods;
private List<Method> methods;

private final Method defaultMethod;
private Method defaultMethod;

private Validator validator;

Expand All @@ -60,6 +61,43 @@ public MultiMethodKafkaListenerEndpoint(List<Method> methods, @Nullable Method d
setBean(bean);
}


/**
* Get a method list.
* @return the method list.
* @since 3.2
*/
public List<Method> getMethods() {
return this.methods;
}

/**
* Set a method list.
* @param methods the methods.
* @since 3.2
*/
public void setMethods(List<Method> methods) {
this.methods = methods;
}

/**
* Get a default method.
* @return the default method.
* @since 3.2
*/
public Method getDefaultMethod() {
return this.defaultMethod;
}

/**
* Set a default method.
* @param defaultMethod the default method.
* @since 3.2
*/
public void setDefaultMethod(Method defaultMethod) {
this.defaultMethod = defaultMethod;
}

/**
* Set a payload validator.
* @param validator the validator.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,21 +24,25 @@
* Customizes main, retry and DLT endpoints in the Retry Topic functionality
* and returns the resulting topic names.
*
* @param <T> the listener endpoint type.
*
* @author Tomaz Fernandes
* @author Wang Zhiyang
*
* @since 2.7.2
*
* @see EndpointCustomizerFactory
*
*/
@FunctionalInterface
public interface EndpointCustomizer {
public interface EndpointCustomizer<T extends MethodKafkaListenerEndpoint<?, ?>> {

/**
* Customize the endpoint and return the topic names generated for this endpoint.
* @param listenerEndpoint The main, retry or DLT endpoint to be customized.
* @return A collection containing the topic names generated for this endpoint.
*/
Collection<TopicNamesHolder> customizeEndpointAndCollectTopics(MethodKafkaListenerEndpoint<?, ?> listenerEndpoint);
Collection<TopicNamesHolder> customizeEndpointAndCollectTopics(T listenerEndpoint);

class TopicNamesHolder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package org.springframework.kafka.retrytopic;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Stream;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.support.EndpointHandlerMethod;
import org.springframework.kafka.support.EndpointHandlerMultiMethod;
import org.springframework.kafka.support.TopicPartitionOffset;

/**
Expand Down Expand Up @@ -63,41 +64,58 @@ public EndpointCustomizerFactory(DestinationTopic.Properties destinationProperti
this.retryTopicNamesProviderFactory = retryTopicNamesProviderFactory;
}

public final EndpointCustomizer createEndpointCustomizer() {
return addSuffixesAndMethod(this.destinationProperties, this.beanMethod.resolveBean(this.beanFactory),
this.beanMethod.getMethod());
public final EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> createEndpointCustomizer() {
return addSuffixesAndMethod(this.destinationProperties);
}

protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties properties, Object bean, Method method) {
protected EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> addSuffixesAndMethod(
DestinationTopic.Properties properties) {

RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider =
this.retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties);
return endpoint -> {
Collection<EndpointCustomizer.TopicNamesHolder> topics = customizeAndRegisterTopics(namesProvider, endpoint);
endpoint.setId(namesProvider.getEndpointId(endpoint));
endpoint.setGroupId(namesProvider.getGroupId(endpoint));
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider,
endpoint.getTopicPartitionsToAssign()));
Collection<EndpointCustomizer.TopicNamesHolder> topics =
customizeAndRegisterTopics(namesProvider, endpoint);
configurationEndpoint(endpoint, namesProvider, properties, this.beanMethod.resolveBean(this.beanFactory));
if (endpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> multiMethodEndpoint
&& this.beanMethod instanceof EndpointHandlerMultiMethod beanMultiMethod) {
multiMethodEndpoint.setDefaultMethod(beanMultiMethod.getDefaultMethod());
multiMethodEndpoint.setMethods(beanMultiMethod.getMethods());
}
else {
endpoint.setTopics(endpoint.getTopics().stream()
.map(namesProvider::getTopicName).toArray(String[]::new));
}
endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint));
endpoint.setGroup(namesProvider.getGroup(endpoint));
endpoint.setBean(bean);
endpoint.setMethod(method);
Boolean autoStartDltHandler = properties.autoStartDltHandler();
if (autoStartDltHandler != null && properties.isDltTopic()) {
endpoint.setAutoStartup(autoStartDltHandler);
endpoint.setMethod(this.beanMethod.getMethod());
}
return topics;
};
}

private void configurationEndpoint(MethodKafkaListenerEndpoint<?, ?> endpoint,
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
DestinationTopic.Properties properties, Object bean) {

endpoint.setId(namesProvider.getEndpointId(endpoint));
endpoint.setGroupId(namesProvider.getGroupId(endpoint));
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider,
endpoint.getTopicPartitionsToAssign()));
}
else {
endpoint.setTopics(endpoint.getTopics().stream()
.map(namesProvider::getTopicName).toArray(String[]::new));
}
endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint));
endpoint.setGroup(namesProvider.getGroup(endpoint));
endpoint.setBean(bean);
Boolean autoStartDltHandler = properties.autoStartDltHandler();
if (autoStartDltHandler != null && properties.isDltTopic()) {
endpoint.setAutoStartup(autoStartDltHandler);
}
}

private static TopicPartitionOffset[] getTopicPartitions(DestinationTopic.Properties properties,
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
TopicPartitionOffset[] topicPartitionOffsets) {

return Stream.of(topicPartitionOffsets)
.map(tpo -> properties.isMainEndpoint()
? getTPOForMainTopic(namesProvider, tpo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.lang.reflect.Method;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;

import org.apache.commons.logging.LogFactory;
Expand All @@ -36,6 +37,7 @@
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.EndpointHandlerMethod;
import org.springframework.kafka.support.EndpointHandlerMultiMethod;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.TopicForRetryable;
import org.springframework.lang.NonNull;
Expand Down Expand Up @@ -150,6 +152,19 @@
* // ... message processing
* }</code>
*</pre>
* <p> Since 3.2 , {@link org.springframework.kafka.annotation.RetryableTopic} annotation support
* {@link org.springframework.kafka.annotation.KafkaListener} annotated class, such as:
* <pre>
* <code>@RetryableTopic(attempts = 3,
* backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))</code>
* <code>@KafkaListener(topics = "my-annotated-topic")
* static class ListenerBean {
* <code> @KafkaHandler
* public void processMessage(MyPojo message) {
* // ... message processing
* }</code>
* }</code>
*</pre>
* <p> Or through meta-annotations, such as:
* <pre>
* <code>@RetryableTopic(backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))</code>
Expand Down Expand Up @@ -281,7 +296,7 @@ public void processMainAndRetryListeners(EndpointProcessor endpointProcessor,
KafkaListenerEndpointRegistrar registrar,
@Nullable KafkaListenerContainerFactory<?> factory,
String defaultContainerFactoryBeanName) {
throwIfMultiMethodEndpoint(mainEndpoint);

String id = mainEndpoint.getId();
if (id == null) {
id = "no.id.provided";
Expand All @@ -300,6 +315,7 @@ private void configureEndpoints(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
RetryTopicConfiguration configuration,
DestinationTopicProcessor.Context context,
String defaultContainerFactoryBeanName) {

this.destinationTopicProcessor
.processDestinationTopicProperties(destinationTopicProperties ->
processAndRegisterEndpoint(mainEndpoint,
Expand Down Expand Up @@ -330,7 +346,13 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEn
endpoint = mainEndpoint;
}
else {
endpoint = new MethodKafkaListenerEndpoint<>();
if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> multi) {
endpoint = new MultiMethodKafkaListenerEndpoint<>(multi.getMethods(), multi.getDefaultMethod(),
multi.getBean());
}
else {
endpoint = new MethodKafkaListenerEndpoint<>();
}
endpoint.setId(mainEndpoint.getId());
endpoint.setMainListenerId(mainEndpoint.getId());
}
Expand All @@ -345,12 +367,12 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEn
getEndpointHandlerMethod(mainEndpoint, configuration, destinationTopicProperties);

createEndpointCustomizer(endpointBeanMethod, destinationTopicProperties)
.customizeEndpointAndCollectTopics(endpoint)
.forEach(topicNamesHolder ->
this.destinationTopicProcessor
.registerDestinationTopic(topicNamesHolder.getMainTopic(),
topicNamesHolder.getCustomizedTopic(),
destinationTopicProperties, context));
.customizeEndpointAndCollectTopics(endpoint)
.forEach(topicNamesHolder ->
this.destinationTopicProcessor
.registerDestinationTopic(topicNamesHolder.getMainTopic(),
topicNamesHolder.getCustomizedTopic(),
destinationTopicProperties, context));

registrar.registerEndpoint(endpoint, resolvedFactory);
endpoint.setBeanFactory(this.beanFactory);
Expand All @@ -359,9 +381,10 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEn
protected EndpointHandlerMethod getEndpointHandlerMethod(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
RetryTopicConfiguration configuration,
DestinationTopic.Properties props) {

EndpointHandlerMethod dltHandlerMethod = configuration.getDltHandlerMethod();
EndpointHandlerMethod retryBeanMethod = new EndpointHandlerMethod(mainEndpoint.getBean(), mainEndpoint.getMethod());
return props.isDltTopic() ? getDltEndpointHandlerMethodOrDefault(dltHandlerMethod) : retryBeanMethod;
return props.isDltTopic() ? getDltEndpointHandlerMethodOrDefault(mainEndpoint, dltHandlerMethod)
: getRetryEndpointHandlerMethod(mainEndpoint);
}

private Consumer<Collection<String>> getTopicCreationFunction(RetryTopicConfiguration config) {
Expand All @@ -383,7 +406,7 @@ protected void createNewTopicBeans(Collection<String> topics, RetryTopicConfigur
);
}

protected EndpointCustomizer createEndpointCustomizer(
protected EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> createEndpointCustomizer(
EndpointHandlerMethod endpointBeanMethod, DestinationTopic.Properties destinationTopicProperties) {

return new EndpointCustomizerFactory(destinationTopicProperties,
Expand All @@ -393,8 +416,28 @@ protected EndpointCustomizer createEndpointCustomizer(
.createEndpointCustomizer();
}

private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(EndpointHandlerMethod dltEndpointHandlerMethod) {
return dltEndpointHandlerMethod != null ? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
@Nullable EndpointHandlerMethod dltEndpointHandlerMethod) {

EndpointHandlerMethod dltHandlerMethod = dltEndpointHandlerMethod != null
? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint) {
dltHandlerMethod = new EndpointHandlerMultiMethod(dltHandlerMethod.resolveBean(this.beanFactory),
dltHandlerMethod.getMethod(), List.of(dltHandlerMethod.getMethod()));
}
return dltHandlerMethod;
}

private EndpointHandlerMethod getRetryEndpointHandlerMethod(MethodKafkaListenerEndpoint<?, ?> mainEndpoint) {
EndpointHandlerMethod retryBeanMethod;
if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> multi) {
retryBeanMethod = new EndpointHandlerMultiMethod(multi.getBean(), multi.getDefaultMethod(),
multi.getMethods());
}
else {
retryBeanMethod = new EndpointHandlerMethod(mainEndpoint.getBean(), mainEndpoint.getMethod());
}
return retryBeanMethod;
}

private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForMainEndpoint(
Expand All @@ -419,12 +462,6 @@ private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForRetryEndpo
return this.listenerContainerFactoryConfigurer.decorateFactory(resolvedFactory);
}

private void throwIfMultiMethodEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEndpoint) {
if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint) {
throw new IllegalArgumentException("Retry Topic is not compatible with " + MultiMethodKafkaListenerEndpoint.class);
}
}

public static EndpointHandlerMethod createHandlerMethodWith(Object beanOrClass, String methodName) {
return new EndpointHandlerMethod(beanOrClass, methodName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class EndpointHandlerMethod {

private final Object beanOrClass;

private final String methodName;
private String methodName;

private Object bean;

Expand All @@ -54,6 +54,12 @@ public EndpointHandlerMethod(Object beanOrClass, String methodName) {
this.methodName = methodName;
}

public EndpointHandlerMethod(Object bean) {
Assert.notNull(bean, () -> "No bean for destination provided!");
this.bean = bean;
this.beanOrClass = bean.getClass();
}

public EndpointHandlerMethod(Object bean, Method method) {
Assert.notNull(bean, () -> "No bean for destination provided!");
Assert.notNull(method, () -> "No method for destination bean class provided!");
Expand Down
Loading

0 comments on commit a1f818e

Please sign in to comment.