Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial timeout property for KafkaRequestReply when using latest offset #2748

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion documentation/src/main/docs/kafka/request-reply.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@ A snapshot of the list of pending replies is available through the `KafkaRequest
The requestor can be found in a position where a request is sent, and it's reply is already published to the reply topic,
before the requestor starts and polls the consumer.
In case the reply consumer is configured with `auto.offset.reset=latest`, which is the default value, this can lead to the requestor missing replies.

If `auto.offset.reset` is `latest`, at wiring time, before any request can take place, the `KafkaRequestReply`
finds partitions that the consumer needs to subscribe and waits for their assignment to the consumer.
On other occasons the `KafkaRequestReply#waitForAssignments` method can be used.
The timeout of the initial subscription can be adjusted with `reply.initial-assignment-timeout` which defaults to the `reply.timeout`.
If this timeout fails, `KafkaRequestReply` will enter an invalid state which will require it to be restarted.
If set to `-1`, the `KafkaRequestReply` will not wait for the initial assignment of the reply consumer to sent requests.

On other occasions the `KafkaRequestReply#waitForAssignments` method can be used.

## Correlation Ids

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ public interface KafkaRequestReply<Req, Rep> extends EmitterType {
*/
String REPLY_TIMEOUT_KEY = "reply.timeout";

/**
* The config key for the initial assignment timeout.
* This timeout is used at start when the {@code auto.offset.reset} is set to {@code latest}.
* The value {@code -1} disables waiting for initial assignment.
*/
String REPLY_INITIAL_ASSIGNMENT_TIMEOUT_KEY = "reply.initial-assignment-timeout";

/**
* The config key for the correlation ID handler identifier.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class KafkaRequestReplyImpl<Req, Rep> extends MutinyEmitterImpl<Req>
private final KafkaSource<Object, Rep> replySource;
private final Set<TopicPartition> waitForPartitions;
private final boolean gracefulShutdown;
private final Duration initialAssignmentTimeout;
private Function<Message<Rep>, Message<Rep>> replyConverter;

public KafkaRequestReplyImpl(EmitterConfiguration config,
Expand Down Expand Up @@ -103,6 +105,10 @@ public KafkaRequestReplyImpl(EmitterConfiguration config,
this.replyTopic = consumerConfig.getTopic().orElse(null);
this.replyPartition = connectorConfig.getOptionalValue(REPLY_PARTITION_KEY, Integer.class).orElse(-1);
this.replyTimeout = Duration.ofMillis(connectorConfig.getOptionalValue(REPLY_TIMEOUT_KEY, Integer.class).orElse(5000));
int initialAssignmentTimeoutMillis = connectorConfig
.getOptionalValue(REPLY_INITIAL_ASSIGNMENT_TIMEOUT_KEY, Integer.class)
.orElse((int) replyTimeout.toMillis());
this.initialAssignmentTimeout = initialAssignmentTimeoutMillis < 0 ? null : Duration.ofMillis(initialAssignmentTimeoutMillis);

this.autoOffsetReset = consumerConfig.getAutoOffsetReset();
this.replyCorrelationIdHeader = connectorConfig.getOptionalValue(REPLY_CORRELATION_ID_HEADER_KEY, String.class)
Expand Down Expand Up @@ -151,8 +157,11 @@ private Set<TopicPartition> getWaitForPartitions(KafkaConnectorIncomingConfigura
@Override
public Flow.Publisher<Message<? extends Req>> getPublisher() {
return this.publisher
.plug(m -> "latest".equals(autoOffsetReset)
? m.onSubscription().call(() -> waitForAssignments().ifNoItem().after(replyTimeout).fail())
.plug(m -> initialAssignmentTimeout != null && "latest".equals(autoOffsetReset)
? m.onSubscription().call(() -> waitForAssignments()
.ifNoItem()
.after(initialAssignmentTimeout)
.fail())
: m)
.onTermination().invoke(this::complete);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,30 @@ void testReplyOffsetResetEarliest() {
.extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
}

@Test
void testReplyWithoutWaitForAssignment() {
addBeans(ReplyServer.class);
topic = companion.topics().createAndWait(topic, 3);
String replyTopic = topic + "-replies";
companion.topics().createAndWait(replyTopic, 3);

List<String> replies = new CopyOnWriteArrayList<>();

RequestReplyProducer app = runApplication(config()
.withPrefix("mp.messaging.outgoing.request-reply")
.with("reply.initial-assignment-timeout", "-1"), RequestReplyProducer.class);

for (int i = 0; i < 10; i++) {
app.requestReply().request(Message.of(i, Metadata.of(OutgoingKafkaRecordMetadata.builder()
.withKey("" + i).build()))).subscribe().with(r -> replies.add(r.getPayload()));
}
await().untilAsserted(() -> assertThat(replies).hasSize(10));
assertThat(replies).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");

assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion())
.extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
}

@Test
void testReplyAssignAndSeekOffset() {
addBeans(ReplyServer.class);
Expand Down
Loading