-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-3012: Non-blocking retries support @KafkaListener on class level #3105
GH-3012: Non-blocking retries support @KafkaListener on class level #3105
Conversation
@Wzy19930507 This PR is very large with many changes, making the review harder on our side. Can you split the PR into multiple ones? That way, it is easy for us to review; maybe you can isolate the components needed as a prerequisite for the feature into a separate one. And when that is merged, perform the feature changes in another one. However, if it is difficult for you to split the PR, please let us know, and we will try our best to review this as is. |
Good advice, let me try! And please allow me keep this PR, until complete prerequisite feature. |
* Support process `@RetryableTopic` from Class in `RetryTopicConfigurationProvider`. * Support process `@DltHandler` when `@RetryableTopic` annotated on the Class in `RetryableTopicAnnotationProcessor`. part of spring-projects#3105
* Support process `@RetryableTopic` from Class in `RetryTopicConfigurationProvider`. * Support process `@DltHandler` when `@RetryableTopic` annotated on the Class in `RetryableTopicAnnotationProcessor`. part of spring-projects#3105
* `@RetryableTopic` support annotated on Class. * Support process `@RetryableTopic` from Class in `RetryTopicConfigurationProvider`. * Support process `@DltHandler` when `@RetryableTopic` annotated on the Class in `RetryableTopicAnnotationProcessor`. * add java doc * add a snippet in what-new.adoc and retry-config.adoc part of #3105 and this contributes to fixing #3105 eventually
* `EndpointCustomizerFactory` adaptor `MultiMethodKafkaListenerEndpoint`. * `RetryTopicConfigurer.processAndRegisterEndpoint` support `@KafkaListener` on a class. * Add new class `EndpointHandlerMultiMethod` to handler multi method for retrying endpoints. part 2 of spring-projects#3105
* `EndpointCustomizerFactory` adaptor `MultiMethodKafkaListenerEndpoint`. * `RetryTopicConfigurer.processAndRegisterEndpoint` support `@KafkaListener` on a class. * Add new class `EndpointHandlerMultiMethod` to handler multi method for retrying endpoints. part2 of spring-projects#3105 and this contributes to fixing spring-projects#3105 eventually
* @RetryableTopic support for KafkaListener annotation on class part 2 * `EndpointCustomizerFactory` adaptor `MultiMethodKafkaListenerEndpoint`. * `RetryTopicConfigurer.processAndRegisterEndpoint` support `@KafkaListener` on a class. * Add new class `EndpointHandlerMultiMethod` to handler multi method for retrying endpoints. * Deprecated `EndpointCustomizerFactory.addSuffixesAndMethod`. * Document public API changes in `whats-new.adoc`(or javadoc). part2 of #3105 and this contributes to fixing #3105 eventually
18115b4
to
854962a
Compare
@@ -7,6 +7,8 @@ Version 2.9 changed the mechanism to bootstrap infrastructure beans; see xref:re | |||
Achieving non-blocking retry / dlt functionality with Kafka usually requires setting up extra topics and creating and configuring the corresponding listeners. | |||
Since 2.7 Spring for Apache Kafka offers support for that via the `@RetryableTopic` annotation and `RetryTopicConfiguration` class to simplify that bootstrapping. | |||
|
|||
Since 3.2 Non-Blocking Retries support xref:kafka/receiving-messages/class-level-kafkalistener.adoc[@KafkaListener on a Class]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rewrite to something like - Since 3.2, Spring for Apache Kafka supports non-blocking retries...
(with the link).
@@ -28,6 +28,21 @@ public void processMessage(MyPojo message) { | |||
} | |||
---- | |||
|
|||
Since 3.2, `@RetryableTopic` support @KafkaListener on a class would be: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RetryableTopic
support for
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass, | ||
(ReflectionUtils.MethodFilter) method -> | ||
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null); | ||
final List<Method> multiMethods = new ArrayList<>(methodsWithHandler); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to use final
in local variables.
@@ -165,6 +165,19 @@ | |||
* }</code> | |||
* }</code> | |||
*</pre> | |||
* <p> Since 3.2 , {@link org.springframework.kafka.annotation.RetryableTopic} annotation support |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supports.
ExistingRetryTopicClassLevelIntegrationTests.MAIN_TOPIC_WITH_PARTITION_INFO, | ||
ExistingRetryTopicClassLevelIntegrationTests.RETRY_TOPIC_WITH_PARTITION_INFO}, partitions = 4) | ||
@TestPropertySource(properties = "two.attempts=2") | ||
public class ExistingRetryTopicClassLevelIntegrationTests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can leave the access level at default (package-protected).
|
||
} | ||
|
||
public static class ShouldRetryOnlyByTopicException extends RuntimeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public?
} | ||
} | ||
|
||
public static class ShouldSkipBothRetriesException extends RuntimeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public? same for other similar classes in this test class.
RetryTopicClassLevelIntegrationTests.TWO_LISTENERS_TOPIC, | ||
RetryTopicClassLevelIntegrationTests.MANUAL_TOPIC }) | ||
@TestPropertySource(properties = { "five.attempts=5", "kafka.template=customKafkaTemplate"}) | ||
public class RetryTopicClassLevelIntegrationTests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default access.
} | ||
|
||
@SuppressWarnings("serial") | ||
public static class MyRetryException extends RuntimeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public?
} | ||
|
||
@Configuration | ||
public static class KafkaProducerConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review, fix it.
…n class level Fixes: spring-projects#3012 * Non-blocking retries support `@KafkaListener` on class level, include `@RetryableTopic`, `@DltHandler`, `RetryTopicConfiguration`, `RetryTopicConfigurationSupport`. * Doc and test Non-blocking retries support @KafkaListener on class level.
854962a
to
fe24063
Compare
Thanks, @Wzy19930507, for this excellent new feature. Just a reminder: limit the commit message titles to only 50 or fewer characters. Commit message descriptions can be as long and descriptive as you want. |
Fixes: #3012
@KafkaListener
on class level, include@RetryableTopic
,@DltHandler
,RetryTopicConfiguration
,RetryTopicConfigurationSupport
.@KafkaListener
on class level.Part 1: Support process annotation
@RetryableTopic
and@DltHandler
when@RetryableTopic
annotated on class, see #3107.Part 2:
RetryTopicConfigurer#processAndRegisterEndpoint
supportMultiMethodKafkaListenerEndpoint
, see #3112.Part 3:
@EnableKafka
integration feature that non-blocking retries support@KafkaListener
on class level, see #3105.