Skip to content

Commit

Permalink
Merge pull request #2203 from cescoffier/implement-post-processing-su…
Browse files Browse the repository at this point in the history
…pport-for-method-receiving-one-payload-and-producing-streams-of-payloads

Implement post-processing acknowledgment support for methods receiving a single payload and producing streams stream of payloads (Multi, RS publisher, and publisher builder)
  • Loading branch information
ozangunalp authored Jun 23, 2023
2 parents 5b77718 + b8d63b2 commit 4a40657
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 10 deletions.
9 changes: 9 additions & 0 deletions documentation/src/main/docs/concepts/acknowledgement.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ of the outcome).
{{ insert('ack/StreamAckExamples.java', 'payload') }}
```

For method receiving a single payload and producing a stream of payloads, it defaults to pre-processing acknowledgement.
However, in this case, post-processing is supported.
It waits for all the produced messages to be acknowledged before acknowledging the received one.
If one of the produced message is nacked, the received one is nacked immediately.

``` java
{{ insert('ack/StreamAckExamples.java', 'payload-to-multi') }}
```

## Controlling acknowledgement

The {{ javadoc('org.eclipse.microprofile.reactive.messaging.Acknowledgment') }}
Expand Down
11 changes: 11 additions & 0 deletions documentation/src/main/java/ack/StreamAckExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.concurrent.Flow.Publisher;

import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
Expand All @@ -28,4 +29,14 @@ public Publisher<String> transformPayload(Multi<String> stream) {
.map(String::toUpperCase);
}
// </payload>

// <payload-to-multi>
@Incoming("in")
@Outgoing("out")
// Defaults to pre-processing, but post-processing is also supported
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
public Multi<String> transformPayload(String one) {
return Multi.createFrom().items(one, one);
}
// </payload-to-multi>
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.providers.helpers.AcknowledgementCoordinator;
import io.smallrye.reactive.messaging.providers.helpers.ClassUtils;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import mutiny.zero.flow.adapters.AdaptersToFlow;
Expand Down Expand Up @@ -256,9 +257,15 @@ private void processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloa
Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi.onItem().transformToMultiAndConcatenate(message -> {
PublisherBuilder<?> pb = invoke(getArguments(message));
return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs()))
.onItem().transform(payload -> Message.of(payload, message.getMetadata()));
// TODO We can handle post-acknowledgement here. -> onCompletion
if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs()))
.onItem().transform(payload -> coordinator.track(Message.of(payload, message.getMetadata())));
} else {
return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs()))
.onItem().transform(payload -> Message.of(payload, message.getMetadata()));
}
});
};
}
Expand All @@ -267,10 +274,16 @@ private void processMethodReturningAReactiveStreamsPublisherOfPayloadsAndConsumi
this.mapper = upstream -> {
Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi.onItem().transformToMultiAndConcatenate(message -> {
Publisher<?> pb = invoke(getArguments(message));
return MultiUtils.publisher(AdaptersToFlow.publisher(pb))
.onItem().transform(payload -> Message.of(payload, message.getMetadata()));
// TODO We can handle post-acknowledgement here. -> onCompletion
Publisher<?> pub = invoke(getArguments(message));
if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(AdaptersToFlow.publisher(pub))
.onItem().transform(payload -> coordinator.track(Message.of(payload, message.getMetadata())));
} else {
return MultiUtils.publisher(AdaptersToFlow.publisher(pub))
.onItem().transform(payload -> Message.of(payload, message.getMetadata()));
}
});
};
}
Expand All @@ -280,9 +293,15 @@ private void processMethodReturningAPublisherOfPayloadsAndConsumingPayloads() {
Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi.onItem().transformToMultiAndConcatenate(message -> {
Flow.Publisher<?> pub = invoke(getArguments(message));
return MultiUtils.publisher(pub)
.onItem().transform(payload -> Message.of(payload, message.getMetadata()));
// TODO We can handle post-acknowledgement here. -> onCompletion
if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(pub)
.onItem().transform(payload -> coordinator.track(Message.of(payload, message.getMetadata())));
} else {
return MultiUtils.publisher(pub)
.onItem().transform(payload -> Message.of(payload, message.getMetadata()));
}
});
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.smallrye.reactive.messaging.providers.helpers;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.eclipse.microprofile.reactive.messaging.Message;

/**
* A utility class than orchestrate the (negative-)acknowledgment of a specified message based on the ack/nack of a set of
* messages.
* <p>
* A coordinator is created with a specific input message.
* For each message that needs to be tracked, the {@code track} method is called, which returned a modified message.
* When all the added messages are acked, the coordinator acks the input message.
* When one of the added message is nacked, the coordinator nacks the input message with the same reason.
*/
public class AcknowledgementCoordinator {

private final Message<?> input;

private volatile boolean done;
private final List<Tracker> tracked = new ArrayList<>();

public AcknowledgementCoordinator(Message<?> input) {
this.input = input;
}

public synchronized Message<?> track(Message<?> msg) {
Tracker tracker = new Tracker();
tracked.add(tracker);
return msg
.withAck(() -> {
onAck(tracker);
return CompletableFuture.completedFuture(null);
})
.withNack(reason -> {
onNack(reason, tracker);
return CompletableFuture.completedFuture(null);
});
}

private synchronized void onAck(Tracker id) {
if (done) {
return;
}
if (tracked.remove(id)) {
if (tracked.isEmpty() && !done) {
// Done!
done = true;
input.ack();
}
// Otherwise not done yet.
}
// Already acked or nack.
}

private synchronized void onNack(Throwable reason, Tracker id) {
if (done) {
return;
}
if (tracked.remove(id)) {
done = true;
tracked.clear();
input.nack(reason);
}
}

static class Tracker {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package io.smallrye.reactive.messaging.acknowledgement;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.*;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.WeldTestBaseWithoutTails;

/**
* Verify post-acknowledgement (and negative-acknowledgment) for method consuming a payload and returning a Multi, Publisher
* or PublisherBuilder of payload.
* Under the hood it verifies the behavior of
* {@link io.smallrye.reactive.messaging.providers.helpers.AcknowledgementCoordinator}.
*/
public class PayloadToMultiAcknowledgmentTest extends WeldTestBaseWithoutTails {

@ParameterizedTest
@ValueSource(classes = { ProcessorUsingMulti.class, ProcessorUsingRSPublisher.class, ProcessorUsingFlowPublisher.class,
ProcessorUsingPublisherBuilder.class })
void testCoordinatedAcknowledgement(Class<?> clz) {
addBeanClass(InAndOut.class, clz);

initialize();

InAndOut bean = get(InAndOut.class);

AtomicBoolean ackM1 = new AtomicBoolean();
AtomicBoolean ackM2 = new AtomicBoolean();
AtomicInteger nack = new AtomicInteger();

bean.emitter().send(Message.of("a")
.withAck(() -> {
ackM1.set(true);
return CompletableFuture.completedFuture(null);
})
.withNack(t -> {
nack.incrementAndGet();
return CompletableFuture.completedFuture(null);
}));

bean.emitter().send(Message.of("b")
.withAck(() -> {
ackM2.set(true);
return CompletableFuture.completedFuture(null);
})
.withNack(t -> {
nack.incrementAndGet();
return CompletableFuture.completedFuture(null);
}));

assertThat(ackM1).isFalse();
assertThat(ackM2).isFalse();
assertThat(nack).hasValue(0);

assertThat(bean.list()).hasSize(4);

bean.list().get(0).ack();
bean.list().get(3).ack();

assertThat(ackM1).isFalse();
assertThat(ackM2).isFalse();

bean.list().get(1).ack();
assertThat(ackM1).isTrue();
assertThat(ackM2).isFalse();

bean.list().get(2).nack(new IOException("boom"));
assertThat(ackM1).isTrue();
assertThat(ackM2).isFalse();
assertThat(ackM1).isTrue();
assertThat(nack).hasValue(1);

// Ignored signals
bean.list().get(2).ack();
bean.list().get(1).ack();
bean.list().get(3).nack(new IOException("boom"));
assertThat(ackM1).isTrue();
assertThat(ackM2).isFalse();
assertThat(ackM1).isTrue();
assertThat(nack).hasValue(1);
}

@ApplicationScoped
static class InAndOut {
@Inject
@Channel("input")
Emitter<String> emitter;

public Emitter<String> emitter() {
return emitter;
}

private List<Message<String>> list = new CopyOnWriteArrayList<>();

@Incoming("output")
CompletionStage<Void> consume(Message<String> s) {
list.add(s);
return CompletableFuture.completedFuture(null);
}

List<Message<String>> list() {
return list;
}
}

@ApplicationScoped
static class ProcessorUsingMulti {

@Incoming("input")
@Outgoing("output")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
Multi<String> process(String i) {
return Multi.createFrom().items(i, i);
}
}

@ApplicationScoped
static class ProcessorUsingRSPublisher {

@Incoming("input")
@Outgoing("output")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
Publisher<String> process(String i) {
return FlowAdapters.toPublisher(Multi.createFrom().items(i, i));
}
}

@ApplicationScoped
static class ProcessorUsingFlowPublisher {

@Incoming("input")
@Outgoing("output")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
Flow.Publisher<String> process(String i) {
return Multi.createFrom().items(i, i);
}
}

@ApplicationScoped
static class ProcessorUsingPublisherBuilder {

@Incoming("input")
@Outgoing("output")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
PublisherBuilder<String> process(String i) {
return ReactiveStreams.of(i, i);
}
}

}

0 comments on commit 4a40657

Please sign in to comment.