diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java index 11360b0dac0c9..fa4ff8905427a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java @@ -45,6 +45,11 @@ * states. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. * *

+ * However, each individual {@link org.apache.kafka.common.header.Header} instance + * is read thread-safe; that is, it is safe for multiple threads to read the same header's key or value concurrently + * as long as no thread modifies it. + * + *

* Refer to the {@link KafkaConsumer} documentation for more details on multi-threaded consumption and processing strategies. */ public class ConsumerRecord { diff --git a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java index e1ce6ad01a5bb..3eee4d27da50a 100644 --- a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java @@ -25,9 +25,9 @@ public class RecordHeader implements Header { private ByteBuffer keyBuffer; - private String key; - private ByteBuffer valueBuffer; - private byte[] value; + private volatile String key; + private volatile ByteBuffer valueBuffer; + private volatile byte[] value; public RecordHeader(String key, byte[] value) { Objects.requireNonNull(key, "Null header keys are not permitted"); @@ -42,16 +42,24 @@ public RecordHeader(ByteBuffer keyBuffer, ByteBuffer valueBuffer) { public String key() { if (key == null) { - key = Utils.utf8(keyBuffer, keyBuffer.remaining()); - keyBuffer = null; + synchronized (this) { + if (key == null) { + key = Utils.utf8(keyBuffer, keyBuffer.remaining()); + keyBuffer = null; + } + } } return key; } public byte[] value() { if (value == null && valueBuffer != null) { - value = Utils.toArray(valueBuffer); - valueBuffer = null; + synchronized (this) { + if (value == null && valueBuffer != null) { + value = Utils.toArray(valueBuffer); + valueBuffer = null; + } + } } return value; } diff --git a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java index 41104194991d9..605b5598e133d 100644 --- a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java +++ b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java @@ -19,10 +19,17 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -265,4 +272,41 @@ static void assertHeader(String key, String value, Header actual) { assertArrayEquals(value.getBytes(), actual.value()); } + private void assertRecordHeaderReadThreadSafe(RecordHeader header) { + int threadCount = 16; + CountDownLatch startLatch = new CountDownLatch(1); + + var futures = IntStream.range(0, threadCount) + .mapToObj(i -> CompletableFuture.runAsync(() -> { + try { + startLatch.await(); + header.key(); + header.value(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + })).collect(Collectors.toUnmodifiableList()); + + startLatch.countDown(); + futures.forEach(CompletableFuture::join); + } + + @RepeatedTest(100) + public void testRecordHeaderIsReadThreadSafe() throws Exception { + RecordHeader header = new RecordHeader( + ByteBuffer.wrap("key".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value".getBytes(StandardCharsets.UTF_8)) + ); + assertRecordHeaderReadThreadSafe(header); + } + + @RepeatedTest(100) + public void testRecordHeaderWithNullValueIsReadThreadSafe() throws Exception { + RecordHeader header = new RecordHeader( + ByteBuffer.wrap("key".getBytes(StandardCharsets.UTF_8)), + null + ); + assertRecordHeaderReadThreadSafe(header); + } } diff --git a/docs/upgrade.html b/docs/upgrade.html index 80343474d7051..a1b9bcaab16ca 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -25,6 +25,10 @@

Upgrading Servers to 4
Notable changes in 4.2.0