Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to post-processing ack management #2742

Merged
merged 2 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 35 additions & 22 deletions documentation/src/main/docs/concepts/signatures.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,41 @@ and available acknowledgement strategies (when applicable).

## Method signatures to process data

| Signature | Invocation time | Supported Acknowledgement Strategies | Metadata Propagation |
|---------------------------------------------------------------------------|--------------------------------------------------|-----------------------------------------|----------------------|
| `@Outgoing @Incoming Message<O> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual |
| `@Outgoing @Incoming O method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming CompletionStage<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual |
| `@Outgoing @Incoming CompletionStage<O> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming Uni<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual |
| `@Outgoing @Incoming Uni<O> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming Processor<Message<I>, Message<O>> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Processor<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming Flow.Processor<Message<I>, Message<O>> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Flow.Processor<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming ProcessorBuilder<Message<I>, Message<O>> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming ProcessorBuilder<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming Publisher<Message<O>> method(Message<I> msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Publisher<O> method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic |
| `@Outgoing @Incoming Multi<Message<O>> method(Message<I> msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Multi<O> method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic |
| `@Outgoing @Incoming Flow.Publisher<Message<O>> method(Message<I> msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Flow.Publisher<O> method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic |
| `@Outgoing @Incoming PublisherBuilder<Message<O>> method(Message<I> msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming PublisherBuilder<O> method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic |
| Signature | Invocation time | Supported Acknowledgement Strategies | Metadata Propagation |
|---------------------------------------------------------------------------|--------------------------------------------------|-----------------------------------------------------------------|----------------------|
| `@Outgoing @Incoming Message<O> method(Message<I> msg)` | Called for every incoming message (sequentially) | POST_PROCESSING (Smallrye only), *MANUAL*, NONE, PRE_PROCESSING | manual |
| `@Outgoing @Incoming Message<O> method(I payload)` | Called for every incoming message (sequentially) | *POST_PROCESSING* (Smallrye only), NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming O method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming CompletionStage<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual |
| `@Outgoing @Incoming CompletionStage<O> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming CompletionStage<Message<O>> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING* (Smallrye only), NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming Uni<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, NONE, PRE_PROCESSING | manual |
| `@Outgoing @Incoming Uni<Message<O>> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING* (Smallrye only), NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming Uni<O> method(I payload)` | Called for every incoming payload (sequentially) | *POST_PROCESSING*, NONE, PRE_PROCESSING | automatic |
| `@Outgoing @Incoming Processor<Message<I>, Message<O>> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Processor<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming Flow.Processor<Message<I>, Message<O>> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Flow.Processor<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming ProcessorBuilder<Message<I>, Message<O>> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming ProcessorBuilder<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming Publisher<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Publisher<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming Multi<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Multi<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming Flow.Publisher<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Flow.Publisher<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming PublisherBuilder<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming PublisherBuilder<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |

Note that in additional to the MicroProfile Reactive Messaging specification,
SmallRye Reactive Messaging supports the post-processing acknowledgment handling with automatic metadata propagation for the following signatures:

- `@Outgoing @Incoming Message<O> method(I payload)`
- `@Outgoing @Incoming CompletionStage<Message<O>> method(I payload)`
- `@Outgoing @Incoming Uni<Message<O>> method(I payload)`
- `@Outgoing @Incoming Message<O> method(Message<I> payload)` : For this signature, the post-processing acknowledgment handling is limited.
It covers cases for nacking incoming messages on caught exceptions at the method body, acking incoming messages when outgoing message is skipped by returning `null`, and chaining acknowlegment from outgoing message to the incoming.
However, if the incoming message has already been (n)acked, you will experience duplicate (n)acks.

## Method signatures to manipulate streams

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,10 @@ private ValidationOutput validateProcessor(Acknowledgment.Strategy acknowledgmen

if (production == MediatorConfiguration.Production.INDIVIDUAL_MESSAGE
&& acknowledgment == Acknowledgment.Strategy.POST_PROCESSING) {
throw ex.illegalStateForValidateProcessor(methodAsString);
// relax here the validation for the post-processing acknowledgment
if (consumption == MediatorConfiguration.Consumption.MESSAGE) {
log.postProcessingNotFullySupported(methodAsString);
}
}

return new ValidationOutput(production, consumption, useBuilderTypes, useReactiveStreams, payloadType);
Expand Down
Loading