diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java index 23af6eb92..6c7fba3ca 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java @@ -5,6 +5,7 @@ import java.util.Map; import java.util.Set; +import io.smallrye.mutiny.Multi; import org.apache.kafka.common.TopicPartition; import org.eclipse.microprofile.reactive.messaging.Message; @@ -120,6 +121,22 @@ public interface KafkaRequestReply extends EmitterType { */ Uni> request(Message request); + /** + * Sends a request and receives responses. + * + * @param request the request object to be sent + * @return a Multi object representing the results of the send and receive operation + */ + Multi requestMulti(Req request); + + /** + * Sends a request and receives responses. + * + * @param request the request object to be sent + * @return a Multi object representing the results of the send and receive operation + */ + Multi> requestMulti(Message request); + /** * Blocks until the consumer has been assigned all partitions for consumption. * If a {@code reply.partition} is provided, waits only for the assignment of that particular partition. diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java index 240fef87f..64b75579c 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java @@ -31,6 +31,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.mutiny.subscription.MultiEmitter; import io.smallrye.mutiny.subscription.MultiSubscriber; import io.smallrye.mutiny.subscription.UniEmitter; import io.smallrye.reactive.messaging.ClientCustomizer; @@ -185,12 +186,22 @@ private void grace(Duration duration) { @Override public Uni request(Req request) { - return request(ContextAwareMessage.of(request)) - .map(Message::getPayload); + return requestMulti(request).toUni(); } @Override public Uni> request(Message request) { + return requestMulti(request).toUni(); + } + + @Override + public Multi requestMulti(Req request) { + return requestMulti(ContextAwareMessage.of(request)) + .map(Message::getPayload); + } + + @Override + public Multi> requestMulti(Message request) { var builder = request.getMetadata(OutgoingKafkaRecordMetadata.class) .map(metadata -> OutgoingKafkaRecordMetadata.from(metadata)) .orElseGet(OutgoingKafkaRecordMetadata::builder); @@ -204,16 +215,26 @@ public Uni> request(Message request) { OutgoingMessageMetadata outMetadata = new OutgoingMessageMetadata<>(); return sendMessage(request.addMetadata(builder.build()).addMetadata(outMetadata)) .invoke(() -> subscription.get().request(1)) - .chain(unused -> Uni.createFrom().> emitter(emitter -> pendingReplies.put(correlationId, - new PendingReplyImpl<>(outMetadata.getResult(), replyTopic, replyPartition, - (UniEmitter>) emitter))) - .ifNoItem().after(replyTimeout).fail()) - .onItemOrFailure().invoke(() -> pendingReplies.remove(correlationId)) - .plug(uni -> replyFailureHandler != null ? uni.onItem().transformToUni(f -> { - Throwable failure = replyFailureHandler.handleReply((KafkaRecord) f); - return failure != null ? Uni.createFrom().failure(failure) : Uni.createFrom().item(f); - }) : uni) - .plug(uni -> replyConverter != null ? uni.map(f -> replyConverter.apply(f)) : uni); + .onItem() + .transformToMulti(unused -> Multi.createFrom().>emitter(emitter -> { + pendingReplies.put(correlationId, + new PendingReplyImpl<>(outMetadata.getResult(), + replyTopic, + replyPartition, + (MultiEmitter>) emitter)); + }) + .ifNoItem().after(replyTimeout).fail() + ).onTermination().invoke(() -> pendingReplies.remove(correlationId)) + .onItem().transformToUniAndMerge(m -> { + if (replyFailureHandler != null) { + Throwable failure = replyFailureHandler.handleReply((KafkaRecord) m); + if (failure != null) { + return Uni.createFrom().failure(failure); + } + } + return Uni.createFrom().item(m); + }) + .plug(multi -> replyConverter != null ? multi.map(f -> replyConverter.apply(f)) : multi); } @Override @@ -262,13 +283,10 @@ public void onItem(KafkaRecord record) { // If reply topic header is NOT null, it is considered a request not a reply if (header != null && record.getHeaders().lastHeader(replyTopicHeader) == null) { CorrelationId correlationId = correlationIdHandler.parse(header.value()); - PendingReplyImpl reply = pendingReplies.remove(correlationId); - if (reply != null) { - reply.getEmitter().complete(record); - return; - } else { - log.requestReplyRecordIgnored(channel, record.getTopic(), correlationId.toString()); - } + pendingReplies.computeIfPresent(correlationId, (id, reply) -> { + reply.getEmitter().emit(record); + return reply; + }); } // request more subscription.get().request(1); @@ -289,10 +307,10 @@ public static class PendingReplyImpl implements PendingReply { private final RecordMetadata metadata; private final String replyTopic; private final int replyPartition; - private final UniEmitter> emitter; + private final MultiEmitter> emitter; public PendingReplyImpl(RecordMetadata metadata, String replyTopic, int replyPartition, - UniEmitter> emitter) { + MultiEmitter> emitter) { this.replyTopic = replyTopic; this.replyPartition = replyPartition; this.metadata = metadata; @@ -314,7 +332,7 @@ public RecordMetadata recordMetadata() { return metadata; } - public UniEmitter> getEmitter() { + public MultiEmitter> getEmitter() { return emitter; } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java index 209b033b6..8c4c9798e 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java @@ -5,11 +5,14 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.time.Duration; +import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import io.smallrye.mutiny.Multi; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -139,6 +142,58 @@ void testReplyMessage() { .containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); } + @Test + void testReplyMessageMulti() { + addBeans(ReplyServerMultipleReplies.class); + topic = companion.topics().createAndWait(topic, 3); + String replyTopic = topic + "-replies"; + companion.topics().createAndWait(replyTopic, 3); + + List replies = new CopyOnWriteArrayList<>(); + + RequestReplyProducer app = runApplication(config(), RequestReplyProducer.class); + List expected = new ArrayList<>(); + int sent = 5; + for (int i = 0; i < sent; i++) { + app.requestReply().requestMulti(i) + .subscribe() + .with(replies::add); + for (int j = 0; j < ReplyServerMultipleReplies.REPLIES; j++) { + expected.add(i + ": " + j); + } + } + await().untilAsserted(() -> assertThat(replies).hasSize(ReplyServerMultipleReplies.REPLIES * sent)); + assertThat(replies) + .containsAll(expected); + + assertThat(companion.consumeStrings().fromTopics(replyTopic, ReplyServerMultipleReplies.REPLIES * sent).awaitCompletion()) + .extracting(ConsumerRecord::value) + .containsAll(expected); + } + + @Test + void testReplyMessageMultiLimit() { + addBeans(ReplyServerMultipleReplies.class); + topic = companion.topics().createAndWait(topic, 3); + String replyTopic = topic + "-replies"; + companion.topics().createAndWait(replyTopic, 3); + + List replies = new CopyOnWriteArrayList<>(); + + RequestReplyProducer app = runApplication(config(), RequestReplyProducer.class); + app.requestReply().requestMulti(0) + .capDemandsTo(5) + .subscribe() + .with(replies::add); + await().untilAsserted(() -> assertThat(replies).hasSize(5)); + assertThat(replies) + .containsExactlyInAnyOrder("0: 0", "0: 1", "0: 2", "0: 3", "0: 4"); + + assertThat(companion.consumeStrings().fromTopics(replyTopic, 5).awaitCompletion()) + .extracting(ConsumerRecord::value) + .containsExactlyInAnyOrder("0: 0", "0: 1", "0: 2", "0: 3", "0: 4"); + } + @Test void testReplyWithReplyTopic() { addBeans(ReplyServer.class); @@ -610,6 +665,26 @@ String process(Integer payload) { } } + @ApplicationScoped + public static class ReplyServerMultipleReplies { + + public static final int REPLIES = 10; + + @Incoming("req") + @Outgoing("rep") + Multi process(Integer payload) { + if (payload == null) { + return null; + } + return Multi.createFrom().emitter(multiEmitter -> { + for (int i = 0; i < REPLIES; i++) { + multiEmitter.emit(payload + ": " + i); + } + multiEmitter.complete(); + }); + } + } + @ApplicationScoped public static class ReplyServerSlow {