Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-1189: Asynchronous server-side processing in a request/reply scenario #2963

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ ext {
junit4Version = '4.13.2'
junitJupiterVersion = '5.10.1'
kafkaVersion = '3.6.1'
kotlinCoroutinesVersion = '1.7.3'
log4jVersion = '2.21.1'
micrometerDocsVersion = '1.0.2'
micrometerVersion = '1.12.1'
Expand Down Expand Up @@ -276,6 +277,7 @@ project ('spring-kafka') {
}
api "org.apache.kafka:kafka-clients:$kafkaVersion"
optionalApi "org.apache.kafka:kafka-streams:$kafkaVersion"
optionalApi "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinCoroutinesVersion"
optionalApi 'com.fasterxml.jackson.core:jackson-core'
optionalApi 'com.fasterxml.jackson.core:jackson-databind'
optionalApi 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[[async-returns]]
= Asynchronous `@KafkaListener` Return Types

`@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types `CompletableFuture<?>` and `Mono<?>`, letting the reply be sent asynchronously.

[source, java]
----
@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
...
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("done");
return future;
}
----

[source, java]
----
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
...
return Mono.empty();
}
----

IMPORTANT: The `AckMode` will be automatically set the `MANUAL` and enable out-of-order commits when async return types are detected; instead, the asynchronous completion will ack when the async operation completes.
When the async result is completed with an error, whether the message is recover or not depends on the container error handler.
If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover.

If a `KafkaListenerErrorHandler` is configured on a listener with an async return type (including Kotlin suspend functions), the error handler is invoked after a failure.
See xref:kafka/annotation-error-handling.adoc[Handling Exceptions] for more information about this error handler and its purpose.
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ See xref:retrytopic/topic-naming.adoc[Topic Naming] for more information.

When manually assigning partitions, with a `null` consumer `group.id`, the `AckMode` is now automatically coerced to `MANUAL`.
See xref:tips.adoc#tip-assign-all-parts[Manually Assigning All Partitions] for more information.

[[x31-async-return]]
=== Async @KafkaListener Return

`@KafkaListener` (and `@KafkaHandler`) methods can now return asynchronous return types `CompletableFuture<?>` and `Mono<?>`.
See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

/**
* No-op resolver for method arguments of type {@link kotlin.coroutines.Continuation}.
* <p>
* This class is similar to
* {@link org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver}
* but for regular {@link HandlerMethodArgumentResolver} contract.
*
* @author Wang Zhiyang
*
* @since 3.1
*
* @see org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver
*/
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;

import reactor.core.publisher.Mono;

public class ContinuationHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver {

@Override
public boolean supportsParameter(MethodParameter parameter) {
return "kotlin.coroutines.Continuation".equals(parameter.getParameterType().getName());
}

@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return Mono.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1155,7 +1155,7 @@ private MessageHandlerMethodFactory getHandlerMethodFactory() {
}

private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
DefaultMessageHandlerMethodFactory defaultFactory = new KafkaMessageHandlerMethodFactory();
Validator validator = KafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();
if (validator != null) {
defaultFactory.setValidator(validator);
Expand All @@ -1170,8 +1170,6 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {

List<HandlerMethodArgumentResolver> customArgumentsResolver =
new ArrayList<>(KafkaListenerAnnotationBeanPostProcessor.this.registrar.getCustomMethodArgumentResolvers());
// Has to be at the end - look at PayloadMethodArgumentResolver documentation
customArgumentsResolver.add(new KafkaNullAwarePayloadArgumentResolver(messageConverter, validator));
defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);

defaultFactory.afterPropertiesSet();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

import java.lang.reflect.Method;
import java.util.List;

import org.springframework.core.KotlinDetector;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.validation.Validator;

/**
* Extension of the {@link DefaultMessageHandlerMethodFactory} for Spring Kafka requirements.
*
* @author Wang Zhiyang
*
* @since 3.1
*/
public class KafkaMessageHandlerMethodFactory extends DefaultMessageHandlerMethodFactory {

private final HandlerMethodArgumentResolverComposite argumentResolvers =
new HandlerMethodArgumentResolverComposite();

private MessageConverter messageConverter;

private Validator validator;

@Override
public void setMessageConverter(MessageConverter messageConverter) {
super.setMessageConverter(messageConverter);
this.messageConverter = messageConverter;
}

@Override
public void setValidator(Validator validator) {
super.setValidator(validator);
this.validator = validator;
}

@Override
protected List<HandlerMethodArgumentResolver> initArgumentResolvers() {
List<HandlerMethodArgumentResolver> resolvers = super.initArgumentResolvers();
if (KotlinDetector.isKotlinPresent()) {
// Insert before PayloadMethodArgumentResolver
resolvers.add(resolvers.size() - 1, new ContinuationHandlerMethodArgumentResolver());
}
// Has to be at the end - look at PayloadMethodArgumentResolver documentation
resolvers.add(resolvers.size() - 1, new KafkaNullAwarePayloadArgumentResolver(this.messageConverter, this.validator));
this.argumentResolvers.addResolvers(resolvers);
return resolvers;
}

@Override
public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
InvocableHandlerMethod handlerMethod = new KotlinAwareInvocableHandlerMethod(bean, method);
handlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers);
return handlerMethod;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

import java.lang.reflect.Method;

import org.springframework.core.CoroutinesUtils;
import org.springframework.core.KotlinDetector;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;

/**
* An {@link InvocableHandlerMethod} extension for supporting Kotlin {@code suspend} function.
*
* @author Wang Zhiyang
*
* @since 3.1
*/
public class KotlinAwareInvocableHandlerMethod extends InvocableHandlerMethod {

public KotlinAwareInvocableHandlerMethod(Object bean, Method method) {
super(bean, method);
}

@Override
protected Object doInvoke(Object... args) throws Exception {
Method method = getBridgedMethod();
if (KotlinDetector.isSuspendingFunction(method)) {
return CoroutinesUtils.invokeSuspendingFunction(method, getBean(), args);
}
else {
return super.doInvoke(args);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ private String getReplyTopic() {
}
String topic = destinations.length == 1 ? destinations[0] : "";
BeanFactory beanFactory = getBeanFactory();
if (beanFactory instanceof ConfigurableListableBeanFactory) {
topic = ((ConfigurableListableBeanFactory) beanFactory).resolveEmbeddedValue(topic);
if (beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory) {
topic = configurableListableBeanFactory.resolveEmbeddedValue(topic);
if (topic != null) {
topic = resolve(topic);
}
Expand Down Expand Up @@ -213,16 +213,16 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(
if (batchToRecordAdapter != null) {
messageListener.setBatchToRecordAdapter(batchToRecordAdapter);
}
if (messageConverter instanceof BatchMessageConverter) {
messageListener.setBatchMessageConverter((BatchMessageConverter) messageConverter);
if (messageConverter instanceof BatchMessageConverter batchMessageConverter) {
messageListener.setBatchMessageConverter(batchMessageConverter);
}
listener = messageListener;
}
else {
RecordMessagingMessageListenerAdapter<K, V> messageListener = new RecordMessagingMessageListenerAdapter<K, V>(
this.bean, this.method, this.errorHandler);
if (messageConverter instanceof RecordMessageConverter) {
messageListener.setMessageConverter((RecordMessageConverter) messageConverter);
if (messageConverter instanceof RecordMessageConverter recordMessageConverter) {
messageListener.setMessageConverter(recordMessageConverter);
}
listener = messageListener;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,6 +72,7 @@ default Object handleError(Message<?> message, ListenerExecutionFailedException
* @return the return value is ignored unless the annotated method has a
* {@code @SendTo} annotation.
*/
@Nullable
default Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack) {

Expand Down
Loading