diff --git a/documentation/src/main/docs/kafka/request-reply.md b/documentation/src/main/docs/kafka/request-reply.md index 22ba07c8ae..2ef09781df 100644 --- a/documentation/src/main/docs/kafka/request-reply.md +++ b/documentation/src/main/docs/kafka/request-reply.md @@ -89,9 +89,14 @@ A snapshot of the list of pending replies is available through the `KafkaRequest The requestor can be found in a position where a request is sent, and it's reply is already published to the reply topic, before the requestor starts and polls the consumer. In case the reply consumer is configured with `auto.offset.reset=latest`, which is the default value, this can lead to the requestor missing replies. + If `auto.offset.reset` is `latest`, at wiring time, before any request can take place, the `KafkaRequestReply` finds partitions that the consumer needs to subscribe and waits for their assignment to the consumer. -On other occasons the `KafkaRequestReply#waitForAssignments` method can be used. +The timeout of the initial subscription can be adjusted with `reply.initial-assignment-timeout` which defaults to the `reply.timeout`. +If this timeout fails, `KafkaRequestReply` will enter an invalid state which will require it to be restarted. +If set to `-1`, the `KafkaRequestReply` will not wait for the initial assignment of the reply consumer to sent requests. + +On other occasions the `KafkaRequestReply#waitForAssignments` method can be used. ## Correlation Ids diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java index 23af6eb92e..0bc4ec68c0 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java @@ -73,6 +73,13 @@ public interface KafkaRequestReply extends EmitterType { */ String REPLY_TIMEOUT_KEY = "reply.timeout"; + /** + * The config key for the initial assignment timeout. + * This timeout is used at start when the {@code auto.offset.reset} is set to {@code latest}. + * The value {@code -1} disables waiting for initial assignment. + */ + String REPLY_INITIAL_ASSIGNMENT_TIMEOUT_KEY = "reply.initial-assignment-timeout"; + /** * The config key for the correlation ID handler identifier. *

diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java index 240fef87f8..161ccf6a29 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java @@ -7,6 +7,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -75,6 +76,7 @@ public class KafkaRequestReplyImpl extends MutinyEmitterImpl private final KafkaSource replySource; private final Set waitForPartitions; private final boolean gracefulShutdown; + private final Duration initialAssignmentTimeout; private Function, Message> replyConverter; public KafkaRequestReplyImpl(EmitterConfiguration config, @@ -103,6 +105,10 @@ public KafkaRequestReplyImpl(EmitterConfiguration config, this.replyTopic = consumerConfig.getTopic().orElse(null); this.replyPartition = connectorConfig.getOptionalValue(REPLY_PARTITION_KEY, Integer.class).orElse(-1); this.replyTimeout = Duration.ofMillis(connectorConfig.getOptionalValue(REPLY_TIMEOUT_KEY, Integer.class).orElse(5000)); + int initialAssignmentTimeoutMillis = connectorConfig + .getOptionalValue(REPLY_INITIAL_ASSIGNMENT_TIMEOUT_KEY, Integer.class) + .orElse((int) replyTimeout.toMillis()); + this.initialAssignmentTimeout = initialAssignmentTimeoutMillis < 0 ? null : Duration.ofMillis(initialAssignmentTimeoutMillis); this.autoOffsetReset = consumerConfig.getAutoOffsetReset(); this.replyCorrelationIdHeader = connectorConfig.getOptionalValue(REPLY_CORRELATION_ID_HEADER_KEY, String.class) @@ -151,8 +157,11 @@ private Set getWaitForPartitions(KafkaConnectorIncomingConfigura @Override public Flow.Publisher> getPublisher() { return this.publisher - .plug(m -> "latest".equals(autoOffsetReset) - ? m.onSubscription().call(() -> waitForAssignments().ifNoItem().after(replyTimeout).fail()) + .plug(m -> initialAssignmentTimeout != null && "latest".equals(autoOffsetReset) + ? m.onSubscription().call(() -> waitForAssignments() + .ifNoItem() + .after(initialAssignmentTimeout) + .fail()) : m) .onTermination().invoke(this::complete); } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java index 209b033b65..b3145ba8ac 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java @@ -479,6 +479,30 @@ void testReplyOffsetResetEarliest() { .extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); } + @Test + void testReplyWithoutWaitForAssignment() { + addBeans(ReplyServer.class); + topic = companion.topics().createAndWait(topic, 3); + String replyTopic = topic + "-replies"; + companion.topics().createAndWait(replyTopic, 3); + + List replies = new CopyOnWriteArrayList<>(); + + RequestReplyProducer app = runApplication(config() + .withPrefix("mp.messaging.outgoing.request-reply") + .with("reply.initial-assignment-timeout", "-1"), RequestReplyProducer.class); + + for (int i = 0; i < 10; i++) { + app.requestReply().request(Message.of(i, Metadata.of(OutgoingKafkaRecordMetadata.builder() + .withKey("" + i).build()))).subscribe().with(r -> replies.add(r.getPayload())); + } + await().untilAsserted(() -> assertThat(replies).hasSize(10)); + assertThat(replies).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + + assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) + .extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + } + @Test void testReplyAssignAndSeekOffset() { addBeans(ReplyServer.class);