diff --git a/documentation/src/main/docs/concepts/signatures.md b/documentation/src/main/docs/concepts/signatures.md index 816773e9e7..934f9b12f1 100644 --- a/documentation/src/main/docs/concepts/signatures.md +++ b/documentation/src/main/docs/concepts/signatures.md @@ -41,28 +41,41 @@ and available acknowledgement strategies (when applicable). ## Method signatures to process data -| Signature | Invocation time | Supported Acknowledgement Strategies | Metadata Propagation | -|---------------------------------------------------------------------------|--------------------------------------------------|-----------------------------------------|----------------------| -| `@Outgoing @Incoming Message method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual | -| `@Outgoing @Incoming O method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic | -| `@Outgoing @Incoming CompletionStage> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual | -| `@Outgoing @Incoming CompletionStage method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic | -| `@Outgoing @Incoming Uni> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual | -| `@Outgoing @Incoming Uni method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic | -| `@Outgoing @Incoming Processor, Message> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming Processor method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported | -| `@Outgoing @Incoming Flow.Processor, Message> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming Flow.Processor method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported | -| `@Outgoing @Incoming ProcessorBuilder, Message> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming ProcessorBuilder method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported | -| `@Outgoing @Incoming Publisher> method(Message msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming Publisher method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic | -| `@Outgoing @Incoming Multi> method(Message msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming Multi method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic | -| `@Outgoing @Incoming Flow.Publisher> method(Message msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming Flow.Publisher method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic | -| `@Outgoing @Incoming PublisherBuilder> method(Message msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | -| `@Outgoing @Incoming PublisherBuilder method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic | +| Signature | Invocation time | Supported Acknowledgement Strategies | Metadata Propagation | +|---------------------------------------------------------------------------|--------------------------------------------------|-----------------------------------------------------------------|----------------------| +| `@Outgoing @Incoming Message method(Message msg)` | Called for every incoming message (sequentially) | POST_PROCESSING (Smallrye only), *MANUAL*, NONE, PRE_PROCESSING | manual | +| `@Outgoing @Incoming Message method(I payload)` | Called for every incoming message (sequentially) | *POST_PROCESSING* (Smallrye only), NONE, PRE_PROCESSING | automatic | +| `@Outgoing @Incoming O method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic | +| `@Outgoing @Incoming CompletionStage> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual | +| `@Outgoing @Incoming CompletionStage method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic | +| `@Outgoing @Incoming CompletionStage> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING* (Smallrye only), NONE, PRE_PROCESSING | automatic | +| `@Outgoing @Incoming Uni> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual | +| `@Outgoing @Incoming Uni> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING* (Smallrye only), NONE, PRE_PROCESSING | automatic | +| `@Outgoing @Incoming Uni method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic | +| `@Outgoing @Incoming Processor, Message> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming Processor method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported | +| `@Outgoing @Incoming Flow.Processor, Message> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming Flow.Processor method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported | +| `@Outgoing @Incoming ProcessorBuilder, Message> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming ProcessorBuilder method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported | +| `@Outgoing @Incoming Publisher> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming Publisher method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | +| `@Outgoing @Incoming Multi> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming Multi method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | +| `@Outgoing @Incoming Flow.Publisher> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming Flow.Publisher method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | +| `@Outgoing @Incoming PublisherBuilder> method(Message msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual | +| `@Outgoing @Incoming PublisherBuilder method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic | + +Note that in additional to the MicroProfile Reactive Messaging specification, +SmallRye Reactive Messaging supports the post-processing acknowledgment handling with automatic metadata propagation for the following signatures: + +- `@Outgoing @Incoming Message method(I payload)` +- `@Outgoing @Incoming CompletionStage> method(I payload)` +- `@Outgoing @Incoming Uni> method(I payload)` +- `@Outgoing @Incoming Message method(Message payload)` : For this signature, the post-processing acknowledgment handling is limited. + It covers cases for nacking incoming messages on caught exceptions at the method body, acking incoming messages when outgoing message is skipped by returning `null`, and chaining acknowlegment from outgoing message to the incoming. + However, if the incoming message has already been (n)acked, you will experience duplicate (n)acks. ## Method signatures to manipulate streams diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/MediatorConfigurationSupport.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/MediatorConfigurationSupport.java index 5c1b724148..732104246c 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/MediatorConfigurationSupport.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/MediatorConfigurationSupport.java @@ -422,7 +422,10 @@ private ValidationOutput validateProcessor(Acknowledgment.Strategy acknowledgmen if (production == MediatorConfiguration.Production.INDIVIDUAL_MESSAGE && acknowledgment == Acknowledgment.Strategy.POST_PROCESSING) { - throw ex.illegalStateForValidateProcessor(methodAsString); + // relax here the validation for the post-processing acknowledgment + if (consumption == MediatorConfiguration.Consumption.MESSAGE) { + log.postProcessingNotFullySupported(methodAsString); + } } return new ValidationOutput(production, consumption, useBuilderTypes, useReactiveStreams, payloadType); diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java index 561d7ba98e..34d02575be 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java @@ -26,6 +26,7 @@ import io.smallrye.reactive.messaging.providers.helpers.AcknowledgementCoordinator; import io.smallrye.reactive.messaging.providers.helpers.ClassUtils; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; +import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging; import mutiny.zero.flow.adapters.AdaptersToFlow; @SuppressWarnings("ReactiveStreamsUnusedPublisher") @@ -147,19 +148,19 @@ public void initialize(Object bean) { processMethodReturningIndividualPayloadAndConsumingIndividualItem(); break; case COMPLETION_STAGE_OF_MESSAGE: - // Case 11 + // Case 12 processMethodReturningACompletionStageOfMessageAndConsumingIndividualItem(); break; case COMPLETION_STAGE_OF_PAYLOAD: - // Case 12 + // Case 11 processMethodReturningACompletionStageOfPayloadAndConsumingIndividualItem(); break; case UNI_OF_MESSAGE: - // Case 11 - Uni variant + // Case 12 - Uni variant processMethodReturningAUniOfMessageAndConsumingIndividualItem(); break; case UNI_OF_PAYLOAD: - // Case 12 - Uni variant + // Case 11 - Uni variant processMethodReturningAUniOfPayloadAndConsumingIndividualItem(); break; default: @@ -167,6 +168,9 @@ public void initialize(Object bean) { } } + /** + * {@code PublisherBuilder> method(Message msg)} + */ @SuppressWarnings("unchecked") private void processMethodReturningAPublisherBuilderOfMessageAndConsumingMessages() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) @@ -174,6 +178,9 @@ private void processMethodReturningAPublisherBuilderOfMessageAndConsumingMessage msg -> AdaptersToFlow.publisher(((PublisherBuilder>) invoke(msg)).buildRs())); } + /** + * {@code PublisherBuilder> method(Message msg)} + */ @SuppressWarnings("unchecked") private void processMethodReturningAReactiveStreamsPublisherOfMessageAndConsumingMessages() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) @@ -181,12 +188,18 @@ private void processMethodReturningAReactiveStreamsPublisherOfMessageAndConsumin msg -> AdaptersToFlow.publisher((Publisher>) invoke(msg))); } + /** + * {@code Flow.Publisher> method(Message msg)} + */ @SuppressWarnings("unchecked") private void processMethodReturningAPublisherOfMessageAndConsumingMessages() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate(msg -> (Flow.Publisher>) invoke(msg)); } + /** + * {@code ProcessorBuilder, Message> method()} + */ private void processMethodReturningAProcessorBuilderOfMessages() { ProcessorBuilder, Message> builder = Objects.requireNonNull(invoke(), msg.methodReturnedNull(configuration.methodAsString())); @@ -197,6 +210,9 @@ private void processMethodReturningAProcessorBuilderOfMessages() { }; } + /** + * {@code Processor, Message> method()} + */ private void processMethodReturningAReactiveStreamsProcessorOfMessages() { Processor, Message> result = Objects.requireNonNull(invoke(), msg.methodReturnedNull(configuration.methodAsString())); @@ -207,6 +223,9 @@ private void processMethodReturningAReactiveStreamsProcessorOfMessages() { }; } + /** + * {@code Flow.Processor, Message> method()} + */ private void processMethodReturningAProcessorOfMessages() { Flow.Processor, Message> result = Objects.requireNonNull(invoke(), msg.methodReturnedNull(configuration.methodAsString())); @@ -217,6 +236,9 @@ private void processMethodReturningAProcessorOfMessages() { }; } + /** + * {@code ProcessorBuilder method()} + */ @SuppressWarnings({ "unchecked", "rawtypes" }) private void processMethodReturningAProcessorBuilderOfPayloads() { ProcessorBuilder returnedProcessorBuilder = invoke(); @@ -230,6 +252,9 @@ private void processMethodReturningAProcessorBuilderOfPayloads() { }; } + /** + * {@code Processor method()} + */ @SuppressWarnings({ "unchecked", "rawtypes" }) private void processMethodReturningAReactiveStreamsProcessorOfPayloads() { Processor returnedProcessor = invoke(); @@ -242,6 +267,9 @@ private void processMethodReturningAReactiveStreamsProcessorOfPayloads() { }; } + /** + * {@code Flow.Processor method()} + */ @SuppressWarnings({ "unchecked", "rawtypes" }) private void processMethodReturningAProcessorOfPayloads() { Flow.Processor returnedProcessor = invoke(); @@ -254,17 +282,26 @@ private void processMethodReturningAProcessorOfPayloads() { }; } + /** + * {@code PublisherBuilder method(I payload)} + */ private void processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloads() { this.mapper = upstream -> { Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); return multi.onItem().transformToMultiAndConcatenate(message -> { - PublisherBuilder pb = invoke(getArguments(message)); - if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) { - // POST_PROCESSING must not be used when returning an infinite stream - AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message); - return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs())) - .onItem().transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata()))); + if (isPostAck()) { + try { + PublisherBuilder pb = invoke(getArguments(message)); + // POST_PROCESSING must not be used when returning an infinite stream + AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message); + return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs())) + .onItem() + .transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata()))); + } catch (Throwable t) { + return handlePostInvocation(message, t); + } } else { + PublisherBuilder pb = invoke(getArguments(message)); return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs())) .onItem().transform(payload -> payloadToMessage(payload, message.getMetadata())); } @@ -272,17 +309,26 @@ private void processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloa }; } + /** + * {@code Publisher method(I payload)} + */ private void processMethodReturningAReactiveStreamsPublisherOfPayloadsAndConsumingPayloads() { this.mapper = upstream -> { Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); return multi.onItem().transformToMultiAndConcatenate(message -> { - Publisher pub = invoke(getArguments(message)); - if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) { - // POST_PROCESSING must not be used when returning an infinite stream - AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message); - return MultiUtils.publisher(AdaptersToFlow.publisher(pub)) - .onItem().transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata()))); + if (isPostAck()) { + try { + Publisher pub = invoke(getArguments(message)); + // POST_PROCESSING must not be used when returning an infinite stream + AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message); + return MultiUtils.publisher(AdaptersToFlow.publisher(pub)) + .onItem() + .transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata()))); + } catch (Throwable t) { + return handlePostInvocation(message, t); + } } else { + Publisher pub = invoke(getArguments(message)); return MultiUtils.publisher(AdaptersToFlow.publisher(pub)) .onItem().transform(payload -> payloadToMessage(payload, message.getMetadata())); } @@ -290,17 +336,27 @@ private void processMethodReturningAReactiveStreamsPublisherOfPayloadsAndConsumi }; } + /** + * {@code Flow.Publisher method(I payload)} + */ private void processMethodReturningAPublisherOfPayloadsAndConsumingPayloads() { this.mapper = upstream -> { Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); return multi.onItem().transformToMultiAndConcatenate(message -> { - Flow.Publisher pub = invoke(getArguments(message)); - if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) { - // POST_PROCESSING must not be used when returning an infinite stream - AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message); - return MultiUtils.publisher(pub) - .onItem().transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata()))); + if (isPostAck()) { + try { + Flow.Publisher pub = invoke(getArguments(message)); + // POST_PROCESSING must not be used when returning an infinite stream + AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message); + return MultiUtils.publisher(pub) + .onItem() + .transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata()))); + + } catch (Throwable t) { + return handlePostInvocation(message, t); + } } else { + Flow.Publisher pub = invoke(getArguments(message)); return MultiUtils.publisher(pub) .onItem().transform(payload -> payloadToMessage(payload, message.getMetadata())); } @@ -308,6 +364,10 @@ private void processMethodReturningAPublisherOfPayloadsAndConsumingPayloads() { }; } + /** + * {@code Message method(Message msg)} + * {@code Message method(I payload)} + */ private void processMethodReturningIndividualMessageAndConsumingIndividualItem() { // Item can be a message or a payload if (configuration.isBlocking()) { @@ -318,7 +378,7 @@ private void processMethodReturningIndividualMessageAndConsumingIndividualItem() .onItem() .transformToMultiAndConcatenate(message -> invokeBlocking(message, getArguments(message)) .onItemOrFailure() - .transformToUni((o, t) -> this.handlePostInvocationWithMessage((Message) o, t)) + .transformToUni((o, t) -> handlePostInvocationWithMessage(message, (Message) o, t)) .onItem().transformToMulti(this::handleSkip)); }; } else { @@ -327,7 +387,7 @@ private void processMethodReturningIndividualMessageAndConsumingIndividualItem() return multi .onItem().transformToMulti(message -> invokeBlocking(message, getArguments(message)) .onItemOrFailure() - .transformToUni((o, t) -> this.handlePostInvocationWithMessage((Message) o, t)) + .transformToUni((o, t) -> handlePostInvocationWithMessage(message, (Message) o, t)) .onItem().transformToMulti(this::handleSkip)) .merge(maxConcurrency()); }; @@ -340,16 +400,16 @@ private void processMethodReturningIndividualMessageAndConsumingIndividualItem() .onItem().transformToMultiAndConcatenate( message -> invokeOnMessageContext(message, getArguments(message)) .onItem().transform(o -> (Message) o) - .onItemOrFailure().transformToUni(this::handlePostInvocationWithMessage) + .onItemOrFailure() + .transformToUni((r, f) -> handlePostInvocationWithMessage(message, r, f)) .onItem().transformToMulti(this::handleSkip)); }; } } - private boolean isPostAck() { - return configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING; - } - + /** + * {@code O method(I payload)} + */ private void processMethodReturningIndividualPayloadAndConsumingIndividualItem() { // Item can be message or payload. if (configuration.isBlocking()) { @@ -384,6 +444,10 @@ private Flow.Publisher> handleSkip(Message m) } } + private Multi> handlePostInvocation(Message message, Throwable fail) { + return Uni.createFrom().completionStage(() -> message.nack(fail).thenApply(x -> (Message) null)).toMulti(); + } + private Uni> handlePostInvocation(Message message, Object res, Throwable fail) { if (fail != null) { if (isPostAck()) { @@ -418,36 +482,62 @@ private Uni> handlePostInvocation(Message message, } } - private Uni> handlePostInvocationWithMessage(Message res, - Throwable fail) { + private Uni> handlePostInvocationWithMessage(Message in, Message res, Throwable fail) { if (fail != null) { - throw ex.processingException(getMethodAsString(), fail); + if (isPostAck()) { + // Here we nack the incoming, but maybe the message has already been (n)acked + return Uni.createFrom() + .completionStage(in.nack(fail).thenApply(x -> null)); + } else { + throw ex.processingException(getMethodAsString(), fail); + } } else if (res != null) { + if (isPostAck()) { + // Here we chain the outgoing message to the incoming, but maybe the message has already been (n)acked + return Uni.createFrom().item((Message) res.withAckWithMetadata(m -> res.ack(m).thenCompose(x -> in.ack(m))) + .withNackWithMetadata((t, m) -> res.nack(t, m).thenCompose(x -> in.nack(t, m)))); + } + return Uni.createFrom().item((Message) res); } else { // the method returned null, the message is not forwarded + if (isPostAck()) { + // Here we ack the incoming message, but maybe the message has already been (n)acked + return Uni.createFrom().completionStage(in.ack().thenApply(x -> null)); + } return Uni.createFrom().nullItem(); } } + /** + * {@code CompletionStage> method(I payload)} + */ private void processMethodReturningACompletionStageOfMessageAndConsumingIndividualItem() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate( message -> invokeOnMessageContext(message, getArguments(message)) .onItem().transformToUni(cs -> Uni.createFrom().completionStage((CompletionStage) cs)) - .onItemOrFailure().transformToUni((r, f) -> handlePostInvocationWithMessage((Message) r, f)) + .onItemOrFailure() + .transformToUni((r, f) -> handlePostInvocationWithMessage(message, (Message) r, f)) .onItem().transformToMulti(this::handleSkip)); } + /** + * {@code Uni> method(I payload)} + */ private void processMethodReturningAUniOfMessageAndConsumingIndividualItem() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate( message -> invokeOnMessageContext(message, getArguments(message)) .onItem().transformToUni(u -> (Uni) u) - .onItemOrFailure().transformToUni((r, f) -> handlePostInvocationWithMessage((Message) r, f)) + .onItemOrFailure() + .transformToUni((r, f) -> handlePostInvocationWithMessage(message, (Message) r, f)) .onItem().transformToMulti(this::handleSkip)); } + /** + * {@code CompletionStage method(I payload)} + */ private void processMethodReturningACompletionStageOfPayloadAndConsumingIndividualItem() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate( @@ -457,6 +547,9 @@ private void processMethodReturningACompletionStageOfPayloadAndConsumingIndividu .onItem().transformToMulti(this::handleSkip)); } + /** + * {@code Uni method(I payload)} + */ private void processMethodReturningAUniOfPayloadAndConsumingIndividualItem() { this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration) .onItem().transformToMultiAndConcatenate( @@ -479,4 +572,9 @@ private boolean isReturningAProcessorOrAReactiveStreamsProcessorOrAProcessorBuil || ClassUtils.isAssignable(returnType, Processor.class) || ClassUtils.isAssignable(returnType, ProcessorBuilder.class); } + + private boolean isPostAck() { + return configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING; + } + } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/i18n/ProviderLogging.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/i18n/ProviderLogging.java index adf369b8e4..50e65195cb 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/i18n/ProviderLogging.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/i18n/ProviderLogging.java @@ -143,4 +143,8 @@ public interface ProviderLogging extends BasicLogger { @LogMessage(level = Logger.Level.DEBUG) @Message(id = 242, value = "Resuming polling messages for channel %s, queue size %s <= %s") void resumingRequestingMessages(String channel, int size, int halfMaxQueueSize); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 243, value = "Processing method '%s' annotated with @Acknowledgement(POST_PROCESSING), but may not be compatible with post-processing acknowledgement management. You may experience duplicate (negative-)acknowledgement of messages.") + void postProcessingNotFullySupported(String methodAsString); } diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/ProcessorShapeReturningPublisherTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/ProcessorShapeReturningPublisherTest.java index a3bf604f67..1772228e1c 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/ProcessorShapeReturningPublisherTest.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/ProcessorShapeReturningPublisherTest.java @@ -2,12 +2,17 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + import org.junit.jupiter.api.Test; import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherBuilderOfMessagesAndConsumingIndividualMessage; import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayload; +import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError; import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherOfMessagesAndConsumingIndividualMessage; import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayload; +import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError; public class ProcessorShapeReturningPublisherTest extends WeldTestBase { @@ -27,6 +32,17 @@ public void testBeanProducingAPublisherOfPayloadsAndConsumingIndividualPayload() assertThat(collector.payloads()).isEqualTo(EXPECTED); } + @Test + public void BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError() { + addBeanClass(BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError.class); + initialize(); + MyCollector collector = container.select(MyCollector.class).get(); + assertThat(collector.payloads()).isEqualTo(IntStream.rangeClosed(1, 5) + .map(i -> i * 2) + .flatMap(i -> IntStream.of(i, i)).boxed() + .map(Object::toString).collect(Collectors.toList())); + } + @Test public void testBeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayload() { addBeanClass(BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayload.class); @@ -35,6 +51,15 @@ public void testBeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPa assertThat(collector.payloads()).isEqualTo(EXPECTED); } + @Test + public void testBeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError() { + addBeanClass(BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError.class); + initialize(); + MyCollector collector = container.select(MyCollector.class).get(); + assertThat(collector.payloads()).isEqualTo(IntStream.rangeClosed(1, 6).flatMap(i -> IntStream.of(i, i)).boxed() + .map(Object::toString).collect(Collectors.toList())); + } + @Test public void testBeanProducingAPublisherBuilderOfMessagesAndConsumingIndividualMessage() { addBeanClass(BeanProducingAPublisherBuilderOfMessagesAndConsumingIndividualMessage.class); diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/AsynchronousPayloadProcessorAckTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/AsynchronousPayloadProcessorAckTest.java index 58e96ec94f..564b4f582a 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/AsynchronousPayloadProcessorAckTest.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/AsynchronousPayloadProcessorAckTest.java @@ -42,6 +42,40 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayload() throws assertThat(nacked).hasSize(0); } + @Test + public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayloadReturningCompletionStageOfMessage() throws InterruptedException { + addBeanClass(SuccessfulPayloadProcessorCompletionStageOfMessage.class); + initialize(); + Emitter emitter = get(EmitterBean.class).emitter(); + Sink sink = get(Sink.class); + + Set acked = new ConcurrentHashSet<>(); + Set nacked = new ConcurrentHashSet<>(); + + run(acked, nacked, emitter); + + await().until(() -> sink.list().size() == 10); + assertThat(acked).hasSize(10); + assertThat(nacked).hasSize(0); + } + + @Test + public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayloadReturningUniOfMessage() throws InterruptedException { + addBeanClass(SuccessfulPayloadProcessorUniOfMessage.class); + initialize(); + Emitter emitter = get(EmitterBean.class).emitter(); + Sink sink = get(Sink.class); + + Set acked = new ConcurrentHashSet<>(); + Set nacked = new ConcurrentHashSet<>(); + + run(acked, nacked, emitter); + + await().until(() -> sink.list().size() == 10); + assertThat(acked).hasSize(10); + assertThat(nacked).hasSize(0); + } + @Test public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayloadUni() throws InterruptedException { addBeanClass(SuccessfulPayloadProcessorUni.class); @@ -161,6 +195,29 @@ public Uni process(String s) { } + + @ApplicationScoped + public static class SuccessfulPayloadProcessorCompletionStageOfMessage { + + @Incoming("data") + @Outgoing("out") + public CompletionStage> process(String s) { + return CompletableFuture.supplyAsync(() -> Message.of(s.toUpperCase())); + } + + } + + @ApplicationScoped + public static class SuccessfulPayloadProcessorUniOfMessage { + + @Incoming("data") + @Outgoing("out") + public Uni> process(String s) { + return Uni.createFrom().completionStage(CompletableFuture.supplyAsync(() -> Message.of(s.toUpperCase()))); + } + + } + @ApplicationScoped public static class FailingPayloadProcessor { diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/SynchronousPayloadProcessorAckTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/SynchronousPayloadProcessorAckTest.java index e405eefb84..b991a4c528 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/SynchronousPayloadProcessorAckTest.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/SynchronousPayloadProcessorAckTest.java @@ -4,6 +4,7 @@ import static org.awaitility.Awaitility.await; import java.lang.reflect.InvocationTargetException; +import java.util.Collection; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -48,6 +49,23 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayload() throws assertThat(nacked).hasSize(0); } + @Test + public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayloadReturningMessage() throws InterruptedException { + addBeanClass(SuccessfulPayloadToMessageProcessor.class); + initialize(); + Emitter emitter = get(EmitterBean.class).emitter(); + Sink sink = get(Sink.class); + + Set acked = new ConcurrentHashSet<>(); + Set nacked = new ConcurrentHashSet<>(); + + run(acked, nacked, emitter); + + await().until(() -> sink.list().size() == 10); + assertThat(acked).hasSize(10); + assertThat(nacked).hasSize(0); + } + @Test public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessage() throws InterruptedException { addBeanClass(SuccessfulMessageToPayloadProcessor.class); @@ -65,6 +83,79 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessage() throws assertThat(nacked).hasSize(0); } + + @Test + public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessage() throws InterruptedException { + addBeanClass(SuccessfulMessageToMessageProcessor.class); + initialize(); + Emitter emitter = get(EmitterBean.class).emitter(); + Sink sink = get(Sink.class); + + Set acked = new ConcurrentHashSet<>(); + Set nacked = new ConcurrentHashSet<>(); + + run(acked, nacked, emitter); + + await().until(() -> sink.list().size() == 10); + assertThat(acked).hasSize(10); + assertThat(nacked).hasSize(0); + } + + @Test + public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessagePostProcessing() throws InterruptedException { + addBeanClass(SuccessfulMessageToMessageProcessorPostProcessing.class); + initialize(); + Emitter emitter = get(EmitterBean.class).emitter(); + Sink sink = get(Sink.class); + + Set acked = new ConcurrentHashSet<>(); + Set nacked = new ConcurrentHashSet<>(); + + run(acked, nacked, emitter); + + await().until(() -> sink.list().size() == 10); + assertThat(acked).hasSize(10); + assertThat(nacked).hasSize(0); + } + + + @Test + public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessagePostProcessingDuplicate() throws InterruptedException { + addBeanClass(SuccessfulMessageToMessageProcessorPostProcessingDuplicate.class); + initialize(); + Emitter emitter = get(EmitterBean.class).emitter(); + Sink sink = get(Sink.class); + + List acked = new CopyOnWriteArrayList<>(); + List nacked = new CopyOnWriteArrayList<>(); + + run(acked, nacked, emitter); + + await().until(() -> sink.list().size() == 10); + assertThat(acked).hasSize(20); + assertThat(nacked).hasSize(0); + } + + + @Test + public void testThatMessagesAreNackedAfterFailingProcessingOfMessageReturningMessage() throws InterruptedException { + addBeanClass(FailingMessageToMessageProcessor.class); + initialize(); + Emitter emitter = get(EmitterBean.class).emitter(); + Sink sink = get(Sink.class); + + Set acked = new ConcurrentHashSet<>(); + Set nacked = new ConcurrentHashSet<>(); + + List throwables = run(acked, nacked, emitter); + + await().until(() -> sink.list().size() == 8); + assertThat(acked).hasSize(9); + assertThat(nacked).hasSize(1); + assertThat(throwables).hasSize(1).allSatisfy(t -> assertThat(t).isInstanceOf(ProcessingException.class) + .hasCauseInstanceOf(InvocationTargetException.class).hasStackTraceContaining("b")); + } + @Test public void testThatMessagesAreNackedAfterFailingProcessingOfPayload() throws InterruptedException { addBeanClass(FailingPayloadProcessor.class); @@ -175,7 +266,7 @@ public void testThatMessagesAreNackedAfterFailingBlockingProcessingOfMessage() t .hasCauseInstanceOf(InvocationTargetException.class).hasStackTraceContaining("b")); } - private List run(Set acked, Set nacked, Emitter emitter) + private List run(Collection acked, Collection nacked, Emitter emitter) throws InterruptedException { List reasons = new CopyOnWriteArrayList<>(); CountDownLatch done = new CountDownLatch(1); @@ -230,6 +321,17 @@ public String process(String s) { } + public static class SuccessfulPayloadToMessageProcessor { + + @Incoming("data") + @Outgoing("out") + @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING) + public Message process(String s) { + return Message.of(s.toUpperCase()); + } + + } + @ApplicationScoped public static class SuccessfulMessageToPayloadProcessor { @@ -242,6 +344,38 @@ public String process(Message s) { } + public static class SuccessfulMessageToMessageProcessor { + + @Incoming("data") + @Outgoing("out") + public Message process(Message s) { + return s.withPayload(s.getPayload().toUpperCase()); + } + + } + + public static class SuccessfulMessageToMessageProcessorPostProcessingDuplicate { + + @Incoming("data") + @Outgoing("out") + @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING) + public Message process(Message s) { + return s.withPayload(s.getPayload().toUpperCase()); + } + + } + + public static class SuccessfulMessageToMessageProcessorPostProcessing { + + @Incoming("data") + @Outgoing("out") + @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING) + public Message process(Message s) { + return Message.of(s.getPayload().toUpperCase()); + } + + } + @ApplicationScoped public static class FailingPayloadProcessor { @@ -286,6 +420,29 @@ public String process(Message m) { } + @ApplicationScoped + public static class FailingMessageToMessageProcessor { + + @Incoming("data") + @Outgoing("out") + @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING) + public Message process(Message m) { + String s = m.getPayload(); + if (s.equalsIgnoreCase("b")) { + // nacked. + throw new IllegalArgumentException("b"); + } + + if (s.equalsIgnoreCase("e")) { + // acked but not forwarded + return null; + } + + return m.withPayload(s.toUpperCase()); + } + + } + @ApplicationScoped public static class SuccessfulBlockingPayloadProcessor { diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/beans/BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/beans/BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError.java new file mode 100644 index 0000000000..3670a92cb9 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/beans/BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError.java @@ -0,0 +1,29 @@ +package io.smallrye.reactive.messaging.beans; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Acknowledgment; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; +import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; + +import io.reactivex.Flowable; + +@ApplicationScoped +public class BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError { + + @Incoming("count") + @Outgoing("sink") + @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING) + public PublisherBuilder process(Integer payload) { + if (payload > 5) { + throw new IllegalArgumentException("boom"); + } + return ReactiveStreams.of(payload) + .map(i -> i + 1) + .flatMapRsPublisher(i -> Flowable.just(i, i)) + .map(i -> Integer.toString(i)); + } + +} diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/beans/BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/beans/BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError.java new file mode 100644 index 0000000000..aa7e8bf583 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/beans/BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError.java @@ -0,0 +1,30 @@ +package io.smallrye.reactive.messaging.beans; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Acknowledgment; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; +import org.reactivestreams.Publisher; + +import io.reactivex.Flowable; + +@ApplicationScoped +public class BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError { + + @Incoming("count") + @Outgoing("sink") + @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING) + public Publisher process(Integer payload) { + if (payload % 2 == 0) { + throw new IllegalArgumentException("boom"); + } + return ReactiveStreams.of(payload) + .map(i -> i + 1) + .flatMapRsPublisher(i -> Flowable.just(i, i)) + .map(i -> Integer.toString(i)) + .buildRs(); + } + +}