Skip to content

Commit

Permalink
GH-3395: Use Original Key in Reply Record
Browse files Browse the repository at this point in the history
Fixes: #3395

When using `ReplyingKafkaTemplate,` include the original key from the request record if such a key exists.
  • Loading branch information
sobychacko authored Aug 5, 2024
1 parent 2567b20 commit b003dff
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,12 @@ public Message<?> messageReturn(String in) {
}
----

=== Original Record Key in Reply

Starting with version 3.3, the Kafka record key from the incoming request (if it exists) will be preserved in the reply record.
This is only applicable for single record request/reply scenario.
When the listener is batch or when the return type is a collection, it is up to the application to specify which keys to use by wrapping the reply record in a `Message` type.

[[aggregating-request-reply]]
== Aggregating Multiple Replies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ For more details, see xref:kafka/receiving-messages/filtering.adoc[Message recei

The `ConcurentContainerMessageListenerContainer` emits now a `ConcurrentContainerStoppedEvent` when all of its child containers are stopped.
For more details, see xref:kafka/events.adoc[Application Events] and `ConcurrentContainerStoppedEvent` Javadocs.

[[x33-original-record-key-in-reply]]
=== Original Record Key in Reply

When using `ReplyingKafkaTemplate`, if the original record from the request contains a key, then that same key will be part of the reply as well.
For more details, see xref:kafka/sending-messages.adoc[Sending Messages] section of the reference docs.
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ private void sendReplyForMessageSource(Object result, String topic, Message<?> s
builder.setHeader(this.correlationHeaderName, correlationId);
}
setPartition(builder, source);
setKey(builder, source);
this.replyTemplate.send(builder.build());
}

Expand All @@ -662,7 +663,6 @@ protected void acknowledge(@Nullable Acknowledgment acknowledgment) {

protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
Throwable t, Message<?> source) {

try {
handleException(request, acknowledgment, consumer, source,
new ListenerExecutionFailedException(createMessagingErrorMessage(
Expand All @@ -676,7 +676,6 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm

protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
Message<?> message, ListenerExecutionFailedException e) {

if (this.errorHandler != null) {
try {
if (NULL_MESSAGE.equals(message)) {
Expand Down Expand Up @@ -719,6 +718,14 @@ private void setPartition(MessageBuilder<?> builder, Message<?> source) {
}
}

private void setKey(MessageBuilder<?> builder, Message<?> source) {
Object key = source.getHeaders().get(KafkaHeaders.RECEIVED_KEY);
// Set the reply record key only for non-batch requests
if (key != null && !(key instanceof List)) {
builder.setHeader(KafkaHeaders.KEY, key);
}
}

@Nullable
private byte[] getReplyPartition(Message<?> source) {
return source.getHeaders().get(KafkaHeaders.REPLY_PARTITION, byte[].class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -101,6 +101,7 @@
/**
* @author Gary Russell
* @author Nathan Xu
* @author Soby Chacko
* @since 2.1.3
*
*/
Expand Down Expand Up @@ -196,11 +197,12 @@ public void testGood() throws Exception {
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
Headers headers = new RecordHeaders();
headers.add("baz", "buz".getBytes());
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, null, null, null, "foo", headers);
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, null, null, 1, "foo", headers);
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
assertThat(consumerRecord.value()).isEqualTo("FOO");
assertThat(consumerRecord.key()).isEqualTo(1);
Map<String, Object> receivedHeaders = new HashMap<>();
new DefaultKafkaHeaderMapper().toHeaders(consumerRecord.headers(), receivedHeaders);
assertThat(receivedHeaders).containsKey("baz");
Expand Down

0 comments on commit b003dff

Please sign in to comment.