Skip to content

Commit

Permalink
Add multi request to KafkaRequestReply
Browse files Browse the repository at this point in the history
  • Loading branch information
Malandril committed Sep 22, 2024
1 parent fff37d8 commit 610d985
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;
import java.util.Set;

import io.smallrye.mutiny.Multi;
import org.apache.kafka.common.TopicPartition;
import org.eclipse.microprofile.reactive.messaging.Message;

Expand Down Expand Up @@ -120,6 +121,22 @@ public interface KafkaRequestReply<Req, Rep> extends EmitterType {
*/
Uni<Message<Rep>> request(Message<Req> request);

/**
* Sends a request and receives responses.
*
* @param request the request object to be sent
* @return a Multi object representing the results of the send and receive operation
*/
Multi<Rep> requestMulti(Req request);

/**
* Sends a request and receives responses.
*
* @param request the request object to be sent
* @return a Multi object representing the results of the send and receive operation
*/
Multi<Message<Rep>> requestMulti(Message<Req> request);

/**
* Blocks until the consumer has been assigned all partitions for consumption.
* If a {@code reply.partition} is provided, waits only for the assignment of that particular partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.mutiny.subscription.UniEmitter;
import io.smallrye.reactive.messaging.ClientCustomizer;
Expand Down Expand Up @@ -185,12 +186,22 @@ private void grace(Duration duration) {

@Override
public Uni<Rep> request(Req request) {
return request(ContextAwareMessage.of(request))
.map(Message::getPayload);
return requestMulti(request).toUni();
}

@Override
public Uni<Message<Rep>> request(Message<Req> request) {
return requestMulti(request).toUni();
}

@Override
public Multi<Rep> requestMulti(Req request) {
return requestMulti(ContextAwareMessage.of(request))
.map(Message::getPayload);
}

@Override
public Multi<Message<Rep>> requestMulti(Message<Req> request) {
var builder = request.getMetadata(OutgoingKafkaRecordMetadata.class)
.map(metadata -> OutgoingKafkaRecordMetadata.from(metadata))
.orElseGet(OutgoingKafkaRecordMetadata::builder);
Expand All @@ -204,16 +215,26 @@ public Uni<Message<Rep>> request(Message<Req> request) {
OutgoingMessageMetadata<RecordMetadata> outMetadata = new OutgoingMessageMetadata<>();
return sendMessage(request.addMetadata(builder.build()).addMetadata(outMetadata))
.invoke(() -> subscription.get().request(1))
.chain(unused -> Uni.createFrom().<Message<Rep>> emitter(emitter -> pendingReplies.put(correlationId,
new PendingReplyImpl<>(outMetadata.getResult(), replyTopic, replyPartition,
(UniEmitter<Message<Rep>>) emitter)))
.ifNoItem().after(replyTimeout).fail())
.onItemOrFailure().invoke(() -> pendingReplies.remove(correlationId))
.plug(uni -> replyFailureHandler != null ? uni.onItem().transformToUni(f -> {
Throwable failure = replyFailureHandler.handleReply((KafkaRecord<?, ?>) f);
return failure != null ? Uni.createFrom().failure(failure) : Uni.createFrom().item(f);
}) : uni)
.plug(uni -> replyConverter != null ? uni.map(f -> replyConverter.apply(f)) : uni);
.onItem()
.transformToMulti(unused -> Multi.createFrom().<Message<Rep>>emitter(emitter -> {
pendingReplies.put(correlationId,
new PendingReplyImpl<>(outMetadata.getResult(),
replyTopic,
replyPartition,
(MultiEmitter<Message<Rep>>) emitter));
})
.ifNoItem().after(replyTimeout).fail()
).onTermination().invoke(() -> pendingReplies.remove(correlationId))
.onItem().transformToUniAndMerge(m -> {
if (replyFailureHandler != null) {
Throwable failure = replyFailureHandler.handleReply((KafkaRecord<?, ?>) m);
if (failure != null) {
return Uni.createFrom().failure(failure);
}
}
return Uni.createFrom().item(m);
})
.plug(multi -> replyConverter != null ? multi.map(f -> replyConverter.apply(f)) : multi);
}

@Override
Expand Down Expand Up @@ -262,13 +283,10 @@ public void onItem(KafkaRecord<?, Rep> record) {
// If reply topic header is NOT null, it is considered a request not a reply
if (header != null && record.getHeaders().lastHeader(replyTopicHeader) == null) {
CorrelationId correlationId = correlationIdHandler.parse(header.value());
PendingReplyImpl<Rep> reply = pendingReplies.remove(correlationId);
if (reply != null) {
reply.getEmitter().complete(record);
return;
} else {
log.requestReplyRecordIgnored(channel, record.getTopic(), correlationId.toString());
}
pendingReplies.computeIfPresent(correlationId, (id, reply) -> {
reply.getEmitter().emit(record);
return reply;
});
}
// request more
subscription.get().request(1);
Expand All @@ -289,10 +307,10 @@ public static class PendingReplyImpl<Rep> implements PendingReply {
private final RecordMetadata metadata;
private final String replyTopic;
private final int replyPartition;
private final UniEmitter<Message<Rep>> emitter;
private final MultiEmitter<Message<Rep>> emitter;

public PendingReplyImpl(RecordMetadata metadata, String replyTopic, int replyPartition,
UniEmitter<Message<Rep>> emitter) {
MultiEmitter<Message<Rep>> emitter) {
this.replyTopic = replyTopic;
this.replyPartition = replyPartition;
this.metadata = metadata;
Expand All @@ -314,7 +332,7 @@ public RecordMetadata recordMetadata() {
return metadata;
}

public UniEmitter<Message<Rep>> getEmitter() {
public MultiEmitter<Message<Rep>> getEmitter() {
return emitter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

import io.smallrye.mutiny.Multi;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

Expand Down Expand Up @@ -139,6 +142,58 @@ void testReplyMessage() {
.containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
}

@Test
void testReplyMessageMulti() {
addBeans(ReplyServerMultipleReplies.class);
topic = companion.topics().createAndWait(topic, 3);
String replyTopic = topic + "-replies";
companion.topics().createAndWait(replyTopic, 3);

List<String> replies = new CopyOnWriteArrayList<>();

RequestReplyProducer app = runApplication(config(), RequestReplyProducer.class);
List<String> expected = new ArrayList<>();
int sent = 5;
for (int i = 0; i < sent; i++) {
app.requestReply().requestMulti(i)
.subscribe()
.with(replies::add);
for (int j = 0; j < ReplyServerMultipleReplies.REPLIES; j++) {
expected.add(i + ": " + j);
}
}
await().untilAsserted(() -> assertThat(replies).hasSize(ReplyServerMultipleReplies.REPLIES * sent));
assertThat(replies)
.containsAll(expected);

assertThat(companion.consumeStrings().fromTopics(replyTopic, ReplyServerMultipleReplies.REPLIES * sent).awaitCompletion())
.extracting(ConsumerRecord::value)
.containsAll(expected);
}

@Test
void testReplyMessageMultiLimit() {
addBeans(ReplyServerMultipleReplies.class);
topic = companion.topics().createAndWait(topic, 3);
String replyTopic = topic + "-replies";
companion.topics().createAndWait(replyTopic, 3);

List<String> replies = new CopyOnWriteArrayList<>();

RequestReplyProducer app = runApplication(config(), RequestReplyProducer.class);
app.requestReply().requestMulti(0)
.capDemandsTo(5)
.subscribe()
.with(replies::add);
await().untilAsserted(() -> assertThat(replies).hasSize(5));
assertThat(replies)
.containsExactlyInAnyOrder("0: 0", "0: 1", "0: 2", "0: 3", "0: 4");

assertThat(companion.consumeStrings().fromTopics(replyTopic, 5).awaitCompletion())
.extracting(ConsumerRecord::value)
.containsExactlyInAnyOrder("0: 0", "0: 1", "0: 2", "0: 3", "0: 4");
}

@Test
void testReplyWithReplyTopic() {
addBeans(ReplyServer.class);
Expand Down Expand Up @@ -610,6 +665,26 @@ String process(Integer payload) {
}
}

@ApplicationScoped
public static class ReplyServerMultipleReplies {

public static final int REPLIES = 10;

@Incoming("req")
@Outgoing("rep")
Multi<String> process(Integer payload) {
if (payload == null) {
return null;
}
return Multi.createFrom().emitter(multiEmitter -> {
for (int i = 0; i < REPLIES; i++) {
multiEmitter.emit(payload + ": " + i);
}
multiEmitter.complete();
});
}
}

@ApplicationScoped
public static class ReplyServerSlow {

Expand Down

0 comments on commit 610d985

Please sign in to comment.