Skip to content

Commit

Permalink
Handle post-processing for payload consuming stream returning process…
Browse files Browse the repository at this point in the history
…or methods

Closes #2732 and #2733
  • Loading branch information
ozangunalp committed Aug 29, 2024
1 parent 88148be commit 6bc9865
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 26 deletions.
16 changes: 8 additions & 8 deletions documentation/src/main/docs/concepts/signatures.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ and available acknowledgement strategies (when applicable).
| `@Outgoing @Incoming Flow.Processor<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming ProcessorBuilder<Message<I>, Message<O>> method()` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming ProcessorBuilder<I, O> method()` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | not supported |
| `@Outgoing @Incoming Publisher<Message<O>> method(Message<I> msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Publisher<O> method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic |
| `@Outgoing @Incoming Multi<Message<O>> method(Message<I> msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Multi<O> method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic |
| `@Outgoing @Incoming Flow.Publisher<Message<O>> method(Message<I> msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Flow.Publisher<O> method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic |
| `@Outgoing @Incoming PublisherBuilder<Message<O>> method(Message<I> msg)` | Called once at *assembly* time | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming PublisherBuilder<O> method(I payload)` | Called once at *assembly* time | *PRE_PROCESSING*, NONE | automatic |
| `@Outgoing @Incoming Publisher<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Publisher<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming Multi<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Multi<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming Flow.Publisher<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming Flow.Publisher<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |
| `@Outgoing @Incoming PublisherBuilder<Message<O>> method(Message<I> msg)` | Called for every incoming message (sequentially) | *MANUAL*, PRE_PROCESSING, NONE | manual |
| `@Outgoing @Incoming PublisherBuilder<O> method(I payload)` | Called for every incoming payload (sequentially) | *PRE_PROCESSING*, POST_PROCESSING, NONE | automatic |

## Method signatures to manipulate streams

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,19 @@ private void processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloa
this.mapper = upstream -> {
Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi.onItem().transformToMultiAndConcatenate(message -> {
PublisherBuilder<?> pb = invoke(getArguments(message));
if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs()))
.onItem().transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));
if (isPostAck()) {
try {
PublisherBuilder<?> pb = invoke(getArguments(message));
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs()))
.onItem()
.transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));
} catch (Throwable t) {
return handlePostInvocation(message, t);
}
} else {
PublisherBuilder<?> pb = invoke(getArguments(message));
return MultiUtils.publisher(AdaptersToFlow.publisher(pb.buildRs()))
.onItem().transform(payload -> payloadToMessage(payload, message.getMetadata()));
}
Expand All @@ -276,13 +282,19 @@ private void processMethodReturningAReactiveStreamsPublisherOfPayloadsAndConsumi
this.mapper = upstream -> {
Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi.onItem().transformToMultiAndConcatenate(message -> {
Publisher<?> pub = invoke(getArguments(message));
if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(AdaptersToFlow.publisher(pub))
.onItem().transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));
if (isPostAck()) {
try {
Publisher<?> pub = invoke(getArguments(message));
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(AdaptersToFlow.publisher(pub))
.onItem()
.transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));
} catch (Throwable t) {
return handlePostInvocation(message, t);
}
} else {
Publisher<?> pub = invoke(getArguments(message));
return MultiUtils.publisher(AdaptersToFlow.publisher(pub))
.onItem().transform(payload -> payloadToMessage(payload, message.getMetadata()));
}
Expand All @@ -294,13 +306,20 @@ private void processMethodReturningAPublisherOfPayloadsAndConsumingPayloads() {
this.mapper = upstream -> {
Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi.onItem().transformToMultiAndConcatenate(message -> {
Flow.Publisher<?> pub = invoke(getArguments(message));
if (configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING) {
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(pub)
.onItem().transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));
if (isPostAck()) {
try {
Flow.Publisher<?> pub = invoke(getArguments(message));
// POST_PROCESSING must not be used when returning an infinite stream
AcknowledgementCoordinator coordinator = new AcknowledgementCoordinator(message);
return MultiUtils.publisher(pub)
.onItem()
.transform(payload -> coordinator.track(payloadToMessage(payload, message.getMetadata())));

} catch (Throwable t) {
return handlePostInvocation(message, t);
}
} else {
Flow.Publisher<?> pub = invoke(getArguments(message));
return MultiUtils.publisher(pub)
.onItem().transform(payload -> payloadToMessage(payload, message.getMetadata()));
}
Expand Down Expand Up @@ -384,6 +403,10 @@ private Flow.Publisher<? extends Message<Object>> handleSkip(Message<Object> m)
}
}

private Multi<? extends Message<?>> handlePostInvocation(Message<?> message, Throwable fail) {
return Uni.createFrom().completionStage(() -> message.nack(fail).thenApply(x -> (Message<?>) null)).toMulti();
}

private Uni<? extends Message<Object>> handlePostInvocation(Message<?> message, Object res, Throwable fail) {
if (fail != null) {
if (isPostAck()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.junit.jupiter.api.Test;

import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherBuilderOfMessagesAndConsumingIndividualMessage;
import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayload;
import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError;
import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherOfMessagesAndConsumingIndividualMessage;
import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayload;
import io.smallrye.reactive.messaging.beans.BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError;

public class ProcessorShapeReturningPublisherTest extends WeldTestBase {

Expand All @@ -27,6 +32,17 @@ public void testBeanProducingAPublisherOfPayloadsAndConsumingIndividualPayload()
assertThat(collector.payloads()).isEqualTo(EXPECTED);
}

@Test
public void BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError() {
addBeanClass(BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError.class);
initialize();
MyCollector collector = container.select(MyCollector.class).get();
assertThat(collector.payloads()).isEqualTo(IntStream.rangeClosed(1, 5)
.map(i -> i * 2)
.flatMap(i -> IntStream.of(i, i)).boxed()
.map(Object::toString).collect(Collectors.toList()));
}

@Test
public void testBeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayload() {
addBeanClass(BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayload.class);
Expand All @@ -35,6 +51,15 @@ public void testBeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPa
assertThat(collector.payloads()).isEqualTo(EXPECTED);
}

@Test
public void testBeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError() {
addBeanClass(BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError.class);
initialize();
MyCollector collector = container.select(MyCollector.class).get();
assertThat(collector.payloads()).isEqualTo(IntStream.rangeClosed(1, 6).flatMap(i -> IntStream.of(i, i)).boxed()
.map(Object::toString).collect(Collectors.toList()));
}

@Test
public void testBeanProducingAPublisherBuilderOfMessagesAndConsumingIndividualMessage() {
addBeanClass(BeanProducingAPublisherBuilderOfMessagesAndConsumingIndividualMessage.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.smallrye.reactive.messaging.beans;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

import io.reactivex.Flowable;

@ApplicationScoped
public class BeanProducingAPublisherBuilderOfPayloadsAndConsumingIndividualPayloadWithProcessingError {

@Incoming("count")
@Outgoing("sink")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
public PublisherBuilder<String> process(Integer payload) {
if (payload > 5) {
throw new IllegalArgumentException("boom");
}
return ReactiveStreams.of(payload)
.map(i -> i + 1)
.flatMapRsPublisher(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.smallrye.reactive.messaging.beans;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;

import io.reactivex.Flowable;

@ApplicationScoped
public class BeanProducingAPublisherOfPayloadsAndConsumingIndividualPayloadWithProcessingError {

@Incoming("count")
@Outgoing("sink")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
public Publisher<String> process(Integer payload) {
if (payload % 2 == 0) {
throw new IllegalArgumentException("boom");
}
return ReactiveStreams.of(payload)
.map(i -> i + 1)
.flatMapRsPublisher(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i))
.buildRs();
}

}

0 comments on commit 6bc9865

Please sign in to comment.