From c7e5e093e0de74a970090e8ca7a0cb8c1055e19e Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Mon, 28 Apr 2025 08:33:02 +0900 Subject: [PATCH 1/4] spring-projectsGH-3067: Spring Kafka support multiple headers with same key. Signed-off-by: chickenchickenlove --- .../support/DefaultKafkaHeaderMapper.java | 17 +- .../support/MultiValueKafkaHeaderMapper.java | 129 ++++++ .../converter/MessagingMessageConverter.java | 22 + .../MultiValueKafkaHeaderMapperTest.java | 432 ++++++++++++++++++ 4 files changed, 598 insertions(+), 2 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapper.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapperTest.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java index 45e6c1eed2..4d751b5f29 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2025 the original author or authors. + * Copyright 2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,6 +48,7 @@ * @author Gary Russell * @author Artem Bilan * @author Soby Chacko + * @author Sanghyoek An * * @since 1.3 * @@ -324,12 +325,24 @@ else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) { populateJsonValueHeader(header, requestedType, headers); } else { - headers.put(headerName, headerValueToAddIn(header)); + handleHeader(headerName, header, headers); } } }); } + /** + * Handle non-reserved headers in {@link DefaultKafkaHeaderMapper}. + * @param headerName the header name. + * @param header the header instance. + * @param headers the target headers. + * @since 4.0.0 + */ + + protected void handleHeader(String headerName, Header header, final Map headers) { + headers.put(headerName, headerValueToAddIn(header)); + } + private void populateJsonValueHeader(Header header, String requestedType, Map headers) { Class type = Object.class; boolean trusted = false; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapper.java new file mode 100644 index 0000000000..ae58c44b30 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapper.java @@ -0,0 +1,129 @@ +/* + * Copyright 2017-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.header.Header; + +import org.springframework.kafka.retrytopic.RetryTopicHeaders; + +/** + * Extended Header Mapper based on {@link DefaultKafkaHeaderMapper}. + * This Header Mapper manages header values as a list, + * except for certain reserved headers. + * Other behaviors are identical to {@link DefaultKafkaHeaderMapper}. + * + * @author Sanghyeok An + * + * @since 4.0.0 + * + */ +public class MultiValueKafkaHeaderMapper extends DefaultKafkaHeaderMapper { + + private final List defaultSingleValueHeaderList = List.of( + KafkaHeaders.PREFIX, + KafkaHeaders.RECEIVED, + KafkaHeaders.TOPIC, + KafkaHeaders.KEY, + KafkaHeaders.PARTITION, + KafkaHeaders.OFFSET, + KafkaHeaders.RAW_DATA, + KafkaHeaders.RECORD_METADATA, + KafkaHeaders.ACKNOWLEDGMENT, + KafkaHeaders.CONSUMER, + KafkaHeaders.RECEIVED_TOPIC, + KafkaHeaders.RECEIVED_KEY, + KafkaHeaders.RECEIVED_PARTITION, + KafkaHeaders.TIMESTAMP_TYPE, + KafkaHeaders.TIMESTAMP, + KafkaHeaders.RECEIVED_TIMESTAMP, + KafkaHeaders.NATIVE_HEADERS, + KafkaHeaders.BATCH_CONVERTED_HEADERS, + KafkaHeaders.CORRELATION_ID, + KafkaHeaders.REPLY_TOPIC, + KafkaHeaders.REPLY_PARTITION, + KafkaHeaders.DLT_EXCEPTION_FQCN, + KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN, + KafkaHeaders.DLT_EXCEPTION_STACKTRACE, + KafkaHeaders.DLT_EXCEPTION_MESSAGE, + KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE, + KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE, + KafkaHeaders.DLT_KEY_EXCEPTION_FQCN, + KafkaHeaders.DLT_ORIGINAL_TOPIC, + KafkaHeaders.DLT_ORIGINAL_PARTITION, + KafkaHeaders.DLT_ORIGINAL_OFFSET, + KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP, + KafkaHeaders.DLT_ORIGINAL_TIMESTAMP, + KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE, + KafkaHeaders.GROUP_ID, + KafkaHeaders.DELIVERY_ATTEMPT, + KafkaHeaders.EXCEPTION_FQCN, + KafkaHeaders.EXCEPTION_CAUSE_FQCN, + KafkaHeaders.EXCEPTION_STACKTRACE, + KafkaHeaders.EXCEPTION_MESSAGE, + KafkaHeaders.KEY_EXCEPTION_STACKTRACE, + KafkaHeaders.KEY_EXCEPTION_MESSAGE, + KafkaHeaders.KEY_EXCEPTION_FQCN, + KafkaHeaders.ORIGINAL_TOPIC, + KafkaHeaders.ORIGINAL_PARTITION, + KafkaHeaders.ORIGINAL_OFFSET, + KafkaHeaders.ORIGINAL_TIMESTAMP, + KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE, + KafkaHeaders.CONVERSION_FAILURES, + KafkaHeaders.LISTENER_INFO, + RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, + RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, + RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP); + + private final Set singleValueHeaders = new HashSet<>(this.defaultSingleValueHeaderList); + + /** + * Adds headers that the {@link MultiValueKafkaHeaderMapper} should handle as single values. + * @param headerName the header name. + */ + public void addSingleValueHeader(String headerName) { + this.singleValueHeaders.add(headerName); + } + + @Override + protected void handleHeader(String headerName, Header header, Map headers) { + if (this.singleValueHeaders.contains(headerName)) { + headers.put(headerName, headerValueToAddIn(header)); + } + else { + Object values = headers.getOrDefault(headerName, new ArrayList<>()); + + if (values instanceof List) { + @SuppressWarnings("unchecked") + List castedValues = (List) values; + castedValues.add(headerValueToAddIn(header)); + headers.put(headerName, castedValues); + } + else { + headers.put(headerName, headerValueToAddIn(header)); + } + + } + + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java index 52be73d64d..05c58e8761 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java @@ -58,6 +58,7 @@ * @author Gary Russell * @author Dariusz Szablinski * @author Biju Kunjummen + * @author Sanghyeok An */ public class MessagingMessageConverter implements RecordMessageConverter { @@ -83,6 +84,15 @@ public MessagingMessageConverter() { this(msg -> msg.getHeaders().get(KafkaHeaders.PARTITION, Integer.class)); } + /** + * Construct an instance that uses given HeaderMapper. + * @param headerMapper the Header mapper. + * @since 4.0.0 + */ + public MessagingMessageConverter(KafkaHeaderMapper headerMapper) { + this(msg -> msg.getHeaders().get(KafkaHeaders.PARTITION, Integer.class), headerMapper); + } + /** * Construct an instance that uses the supplied partition provider function. The * function can return null to delegate the partition selection to the kafka client. @@ -100,6 +110,18 @@ public MessagingMessageConverter(Function, @Nullable Integer> partiti this.partitionProvider = partitionProvider; } + /** + * Construct an instance that uses the supplied partition provider function and given HeaderMapper. + * @param partitionProvider the provider. + * @param headerMapper the Header mapper. + * @since 4.0.0 + */ + public MessagingMessageConverter(Function, @Nullable Integer> partitionProvider, KafkaHeaderMapper headerMapper) { + Assert.notNull(partitionProvider, "'partitionProvider' cannot be null"); + this.headerMapper = headerMapper; + this.partitionProvider = partitionProvider; + } + /** * Generate {@link Message} {@code ids} for produced messages. If set to {@code false}, * will try to use a default value. By default set to {@code false}. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapperTest.java b/spring-kafka/src/test/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapperTest.java new file mode 100644 index 0000000000..d158f81683 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapperTest.java @@ -0,0 +1,432 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerConfigUtils; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.retrytopic.RetryTopicConfiguration; +import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder; +import org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport; +import org.springframework.kafka.retrytopic.RetryTopicHeaders; +import org.springframework.kafka.support.MultiValueKafkaHeaderMapperTest.Config.MultiValueTestListener; +import org.springframework.kafka.support.MultiValueKafkaHeaderMapperTest.RetryTopicConfigurations.FirstTopicListener; +import org.springframework.kafka.support.MultiValueKafkaHeaderMapperTest.RetryTopicConfigurations.MyCustomDltProcessor; +import org.springframework.kafka.support.converter.MessagingMessageConverter; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Headers; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.stereotype.Component; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * + * @author Sanghyeok An + * + * @since 4.0.0 + */ + +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(partitions = 1, topics = { + MultiValueKafkaHeaderMapperTest.TEST_TOPIC, + MultiValueKafkaHeaderMapperTest.RETRY_TOPIC +}) +class MultiValueKafkaHeaderMapperTest { + + public final static String TEST_TOPIC = "multi-value.tests"; + + public final static String RETRY_TOPIC = "multi-value-retry.tests"; + + static final String MULTI_VALUE_HEADER1 = "test-multi-value1"; + + static final String MULTI_VALUE_HEADER2 = "test-multi-value2"; + + static final String SINGLE_VALUE_HEADER = "test-single-value"; + + static final List SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY = List.of(KafkaHeaders.OFFSET, + KafkaHeaders.CONSUMER, + KafkaHeaders.TIMESTAMP_TYPE, + KafkaHeaders.RECEIVED_PARTITION, + KafkaHeaders.RECEIVED_KEY, + KafkaHeaders.RECEIVED_TOPIC, + KafkaHeaders.RECEIVED_TIMESTAMP, + KafkaHeaders.GROUP_ID); + + static final List SHOULD_BE_SINGLE_VALUE_IN_RETRY = List.of(KafkaHeaders.ORIGINAL_OFFSET, + KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE, + KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP, + KafkaHeaders.ORIGINAL_TOPIC, + KafkaHeaders.ORIGINAL_TIMESTAMP, + KafkaHeaders.ORIGINAL_PARTITION, + RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, + KafkaHeaders.EXCEPTION_CAUSE_FQCN, + KafkaHeaders.EXCEPTION_STACKTRACE, + KafkaHeaders.EXCEPTION_FQCN, + KafkaHeaders.EXCEPTION_MESSAGE, + RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, + RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP); + + @Autowired + private KafkaTemplate template; + + @Autowired + private MultiValueTestListener multiValueTestListener; + + @Autowired + private FirstTopicListener firstTopicListener; + + @Autowired + private MyCustomDltProcessor myCustomDltProcessor; + + @Test + void testForCommonCase() throws InterruptedException { + + // GIVEN + Iterable recordHeaders = List.of( + new RecordHeader(MULTI_VALUE_HEADER1, "value1".getBytes(StandardCharsets.UTF_8)), + new RecordHeader(MULTI_VALUE_HEADER1, "value2".getBytes(StandardCharsets.UTF_8)), + new RecordHeader(MULTI_VALUE_HEADER2, "value3".getBytes(StandardCharsets.UTF_8)), + new RecordHeader(SINGLE_VALUE_HEADER, "value3".getBytes(StandardCharsets.UTF_8)) + ); + + ProducerRecord record = new ProducerRecord<>( + TEST_TOPIC, 0, 1, "hello-value", recordHeaders); + + // WHEN + this.template.send(record); + + // THEN + assertThat(this.multiValueTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.multiValueTestListener.latchForHeader1.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.multiValueTestListener.latchForHeader2.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.multiValueTestListener.latchForCustomSingleValueHeaders.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.multiValueTestListener.latchForSingleValueHeaders.await(10, TimeUnit.SECONDS)).isTrue(); + + } + + @Test + void testForDltAndRetryCase() throws InterruptedException { + + // GIVEN + Iterable recordHeaders = List.of( + new RecordHeader(MULTI_VALUE_HEADER1, "value1".getBytes(StandardCharsets.UTF_8)), + new RecordHeader(MULTI_VALUE_HEADER1, "value2".getBytes(StandardCharsets.UTF_8)), + new RecordHeader(MULTI_VALUE_HEADER2, "value3".getBytes(StandardCharsets.UTF_8)), + new RecordHeader(SINGLE_VALUE_HEADER, "value3".getBytes(StandardCharsets.UTF_8)) + ); + + ProducerRecord record = new ProducerRecord<>( + RETRY_TOPIC, 0, 1, "hello-value", recordHeaders); + + // WHEN + this.template.send(record); + + // THEN + assertThat(this.firstTopicListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.firstTopicListener.latchForHeader1.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.firstTopicListener.latchForHeader2.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.firstTopicListener.latchForSingleValueHeadersInFirstTry.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.firstTopicListener.latchForCustomSingleValueHeaders.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.firstTopicListener.latchForSingleValueHeadersInRetry.await(10, TimeUnit.SECONDS)).isTrue(); + + assertThat(this.myCustomDltProcessor.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.myCustomDltProcessor.latchForHeader1.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.myCustomDltProcessor.latchForHeader2.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.myCustomDltProcessor.latchForSingleValueHeadersInFirstTry.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.myCustomDltProcessor.latchForCustomSingleValueHeaders.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.myCustomDltProcessor.latchForSingleValueHeadersInRetry.await(10, TimeUnit.SECONDS)).isTrue(); + } + + static boolean isHeadersExpected(Map headers, String headerName) { + Object value = headers.get(headerName); + if (value == null) { + return false; + } + + if (value instanceof Iterable) { + return false; + } + return true; + } + + @Configuration + @EnableKafka + public static class Config { + + @Autowired + private EmbeddedKafkaBroker broker; + + final CountDownLatch latch = new CountDownLatch(1); + + @Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) + public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() { + return new KafkaListenerEndpointRegistry(); + } + + @Bean + public KafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + + MultiValueKafkaHeaderMapper headerMapper = new MultiValueKafkaHeaderMapper(); + + // Add Test + headerMapper.addSingleValueHeader(SINGLE_VALUE_HEADER); + MessagingMessageConverter converter = new MessagingMessageConverter(headerMapper); + + factory.setConsumerFactory(consumerFactory()); + factory.setRecordMessageConverter(converter); + return factory; + } + + @Bean + public DefaultKafkaConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public Map consumerConfigs() { + Map consumerProps = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), "mutliValueGroup", "false"); + return consumerProps; + } + + @Bean + public KafkaTemplate template() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public Map producerConfigs() { + return KafkaTestUtils.producerProps(this.broker.getBrokersAsString()); + } + + @Component + @KafkaListener(topics = TEST_TOPIC) + public static class MultiValueTestListener { + + private final CountDownLatch latch = new CountDownLatch(1); + + private final CountDownLatch latchForHeader1 = new CountDownLatch(1); + + private final CountDownLatch latchForHeader2 = new CountDownLatch(1); + + private final CountDownLatch latchForSingleValueHeaders = new CountDownLatch(8); + + private final CountDownLatch latchForCustomSingleValueHeaders = new CountDownLatch(1); + + @KafkaHandler + public void listen1(@Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, + @Header(SINGLE_VALUE_HEADER) String header3, + @Headers Map headers, String message) { + this.latch.countDown(); + if (!header1.isEmpty()) { + this.latchForHeader1.countDown(); + } + + if (!header2.isEmpty()) { + this.latchForHeader2.countDown(); + } + + if (header3 != null) { + this.latchForCustomSingleValueHeaders.countDown(); + } + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeaders.countDown(); + } + } + } + + } + + } + + @Configuration + static class RetryTopicConfigurations extends RetryTopicConfigurationSupport { + + private static final String DLT_METHOD_NAME = "processDltMessage"; + + @Bean + RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .fixedBackOff(50) + .maxAttempts(5) + .concurrency(1) + .useSingleTopicForSameIntervals() + .includeTopic(RETRY_TOPIC) + .doNotRetryOnDltFailure() + .dltHandlerMethod("myCustomDltProcessor", DLT_METHOD_NAME) + .create(template); + } + + @Bean + FirstTopicListener firstTopicListener() { + return new FirstTopicListener(); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor() { + return new MyCustomDltProcessor(); + } + + @Bean + TaskScheduler sched() { + return new ThreadPoolTaskScheduler(); + } + + @KafkaListener(id = "firstTopicId", topics = RETRY_TOPIC, concurrency = "2") + static class FirstTopicListener { + + private final CountDownLatch latch = new CountDownLatch(4); + + private final CountDownLatch latchForHeader1 = new CountDownLatch(4); + + private final CountDownLatch latchForHeader2 = new CountDownLatch(4); + + private final CountDownLatch latchForSingleValueHeadersInFirstTry = new CountDownLatch(32); + + private final CountDownLatch latchForCustomSingleValueHeaders = new CountDownLatch(4); + + private final CountDownLatch latchForSingleValueHeadersInRetry = new CountDownLatch(52); + + @KafkaHandler + public void listen( + @Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, + @Header(SINGLE_VALUE_HEADER) String header3, + @Headers Map headers, String message) { + this.latch.countDown(); + if (!header1.isEmpty()) { + this.latchForHeader1.countDown(); + } + + if (!header2.isEmpty()) { + this.latchForHeader2.countDown(); + } + + if (header3 != null) { + this.latchForCustomSingleValueHeaders.countDown(); + } + + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInFirstTry.countDown(); + } + } + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInRetry.countDown(); + } + } + throw new RuntimeException("Woooops... in topic " + message); + } + + } + + static class MyCustomDltProcessor { + + private final CountDownLatch latch = new CountDownLatch(1); + + private final CountDownLatch latchForHeader1 = new CountDownLatch(1); + + private final CountDownLatch latchForHeader2 = new CountDownLatch(1); + + private final CountDownLatch latchForSingleValueHeadersInFirstTry = new CountDownLatch(8); + + private final CountDownLatch latchForCustomSingleValueHeaders = new CountDownLatch(1); + + private final CountDownLatch latchForSingleValueHeadersInRetry = new CountDownLatch(13); + + public void processDltMessage( + Object message, + @Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, + @Header(SINGLE_VALUE_HEADER) String header3, + @Headers Map headers) { + + this.latch.countDown(); + if (!header1.isEmpty()) { + this.latchForHeader1.countDown(); + } + + if (!header2.isEmpty()) { + this.latchForHeader2.countDown(); + } + + if (header3 != null) { + this.latchForCustomSingleValueHeaders.countDown(); + } + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInFirstTry.countDown(); + } + } + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInRetry.countDown(); + } + } + throw new RuntimeException("Woooops... in topic " + message); + } + + } + + } + +} From 42010c69fe00069894a64f6b80a9bed7eb77f862 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sat, 3 May 2025 00:05:27 +0900 Subject: [PATCH 2/4] Draft candidate Signed-off-by: chickenchickenlove --- .../support/AbstractKafkaHeaderMapper.java | 52 ++++++++++++++++ .../support/DefaultKafkaHeaderMapper.java | 60 ++++++++++++++++++- .../support/SimpleKafkaHeaderMapper.java | 26 +++++++- ...ltKafkaHeaderMapperForMultiValueTest.java} | 18 +++--- 4 files changed, 140 insertions(+), 16 deletions(-) rename spring-kafka/src/test/java/org/springframework/kafka/support/{MultiValueKafkaHeaderMapperTest.java => DefaultKafkaHeaderMapperForMultiValueTest.java} (95%) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index ddad12080e..5cdd76445b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -79,6 +80,12 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { private Charset charset = StandardCharsets.UTF_8; + private final List matchersForListValue = new ArrayList<>(); + + private final Set cachedHeadersForListValue = new LinkedHashSet<>(); + + private final Set cachedHeadersForSingleValue = new LinkedHashSet<>(); + /** * Construct a mapper that will match the supplied patterns (outbound) and all headers * (inbound). For outbound mapping, certain internal framework headers are never @@ -97,6 +104,18 @@ public AbstractKafkaHeaderMapper(String... patterns) { * @param patterns the patterns. */ protected AbstractKafkaHeaderMapper(boolean outbound, String... patterns) { + this(outbound, new ArrayList<>(), patterns); + } + + /** + * Construct a mapper that will match the supplied patterns (outbound) and all headers + * (inbound). For outbound mapping, certain internal framework headers are never + * mapped. + * @param outbound true for an outbound mapper. + * @param patternsForListValue the patterns for multiple values at the same key. + * @param patterns the patterns. + */ + protected AbstractKafkaHeaderMapper(boolean outbound, List patternsForListValue, String... patterns) { Assert.notNull(patterns, "'patterns' must not be null"); this.outbound = outbound; if (outbound) { @@ -123,6 +142,11 @@ protected AbstractKafkaHeaderMapper(boolean outbound, String... patterns) { for (String pattern : patterns) { this.matchers.add(new SimplePatternBasedHeaderMatcher(pattern)); } + + for (String patternForListValue : patternsForListValue) { + this.matchersForListValue.add(new SimplePatternBasedHeaderMatcher(patternForListValue)); + } + } /** @@ -287,6 +311,34 @@ private String mapRawIn(String header, byte[] value) { return null; } + /** + * Check whether the header value should be mapped to multiple values. + * @param headerName the header name. + * @return True for multiple values at the same key. + */ + protected boolean isHeaderForListValue(String headerName) { + if (this.matchersForListValue.isEmpty()) { + return false; + } + + if (this.cachedHeadersForSingleValue.contains(headerName)) { + return false; + } + + if (this.cachedHeadersForListValue.contains(headerName)) { + return true; + } + + for (HeaderMatcher headerMatcher : this.matchersForListValue) { + if (headerMatcher.matchHeader(headerName)) { + this.cachedHeadersForListValue.add(headerName); + return true; + } + } + this.cachedHeadersForSingleValue.add(headerName); + return false; + } + /** * A matcher for headers. * @since 2.3 diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java index 4d751b5f29..5ada1996d9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -103,6 +104,23 @@ public DefaultKafkaHeaderMapper() { this(JacksonUtils.enhancedObjectMapper()); } + /** + * Construct an instance with the default object mapper and default header patterns + * for outbound headers; all inbound headers are mapped. The default pattern list is + * {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in + * {@link KafkaHeaders} are never mapped as headers since they represent data in + * consumer/producer records. + * @see #DefaultKafkaHeaderMapper(ObjectMapper) + * @param patternsForListValue + */ + public DefaultKafkaHeaderMapper(List patternsForListValue) { + this(JacksonUtils.enhancedObjectMapper(), + patternsForListValue, + "!" + MessageHeaders.ID, + "!" + MessageHeaders.TIMESTAMP, + "*"); + } + /** * Construct an instance with the provided object mapper and default header patterns * for outbound headers; all inbound headers are mapped. The patterns are applied in @@ -149,11 +167,32 @@ public DefaultKafkaHeaderMapper(String... patterns) { * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) */ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { - this(true, objectMapper, patterns); + this(true, objectMapper, new ArrayList<>(), patterns); + } + + /** + * Construct an instance with the provided object mapper and the provided header + * patterns for outbound headers; all inbound headers are mapped. The patterns are + * applied in order, stopping on the first match (positive or negative). Patterns are + * negated by preceding them with "!". The patterns will replace the default patterns; + * you generally should not map the {@code "id" and "timestamp"} headers. Note: most + * of the headers in {@link KafkaHeaders} are never mapped as headers since they + * represent data in consumer/producer records. + * @param objectMapper the object mapper. + * @param patternsForListValue the patterns for multiple values at the same key. + * @param patterns the patterns. + * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) + */ + public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, List patternsForListValue, String... patterns) { + this(true, objectMapper, patternsForListValue, patterns); } private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, String... patterns) { - super(outbound, patterns); + this(outbound, objectMapper, new ArrayList<>(), patterns); + } + + private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, List patternsForListValue, String... patterns) { + super(outbound, patternsForListValue, patterns); Assert.notNull(objectMapper, "'objectMapper' must not be null"); Assert.noNullElements(patterns, "'patterns' must not have null elements"); this.objectMapper = objectMapper; @@ -340,7 +379,22 @@ else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) { */ protected void handleHeader(String headerName, Header header, final Map headers) { - headers.put(headerName, headerValueToAddIn(header)); + if (!this.isHeaderForListValue(headerName)) { + headers.put(headerName, headerValueToAddIn(header)); + } + else { + Object values = headers.getOrDefault(headerName, new ArrayList<>()); + + if (values instanceof List) { + @SuppressWarnings("unchecked") + List castedValues = (List) values; + castedValues.add(headerValueToAddIn(header)); + headers.put(headerName, castedValues); + } + else { + headers.put(headerName, headerValueToAddIn(header)); + } + } } private void populateJsonValueHeader(Header header, String requestedType, Map headers) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java index 30effa38e0..f54b63477a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,9 @@ package org.springframework.kafka.support; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -77,7 +79,11 @@ public SimpleKafkaHeaderMapper(String... patterns) { } private SimpleKafkaHeaderMapper(boolean outbound, String... patterns) { - super(outbound, patterns); + this(outbound, new ArrayList<>(), patterns); + } + + private SimpleKafkaHeaderMapper(boolean outbound, List patternsForListValue, String... patterns) { + super(outbound, patternsForListValue, patterns); } /** @@ -111,7 +117,21 @@ public void toHeaders(Headers source, Map target) { target.put(headerName, ByteBuffer.wrap(header.value()).getInt()); } else { - target.put(headerName, headerValueToAddIn(header)); + if (!this.isHeaderForListValue(headerName)) { + target.put(headerName, headerValueToAddIn(header)); + } + else { + Object values = target.getOrDefault(headerName, new ArrayList<>()); + if (values instanceof List) { + @SuppressWarnings("unchecked") + List castedValues = (List) values; + castedValues.add(headerValueToAddIn(header)); + target.put(headerName, castedValues); + } + else { + target.put(headerName, headerValueToAddIn(header)); + } + } } } }); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapperTest.java b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperForMultiValueTest.java similarity index 95% rename from spring-kafka/src/test/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapperTest.java rename to spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperForMultiValueTest.java index d158f81683..61d1f73131 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapperTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperForMultiValueTest.java @@ -44,9 +44,9 @@ import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder; import org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport; import org.springframework.kafka.retrytopic.RetryTopicHeaders; -import org.springframework.kafka.support.MultiValueKafkaHeaderMapperTest.Config.MultiValueTestListener; -import org.springframework.kafka.support.MultiValueKafkaHeaderMapperTest.RetryTopicConfigurations.FirstTopicListener; -import org.springframework.kafka.support.MultiValueKafkaHeaderMapperTest.RetryTopicConfigurations.MyCustomDltProcessor; +import org.springframework.kafka.support.DefaultKafkaHeaderMapperForMultiValueTest.Config.MultiValueTestListener; +import org.springframework.kafka.support.DefaultKafkaHeaderMapperForMultiValueTest.RetryTopicConfigurations.FirstTopicListener; +import org.springframework.kafka.support.DefaultKafkaHeaderMapperForMultiValueTest.RetryTopicConfigurations.MyCustomDltProcessor; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; @@ -71,10 +71,10 @@ @SpringJUnitConfig @DirtiesContext @EmbeddedKafka(partitions = 1, topics = { - MultiValueKafkaHeaderMapperTest.TEST_TOPIC, - MultiValueKafkaHeaderMapperTest.RETRY_TOPIC + DefaultKafkaHeaderMapperForMultiValueTest.TEST_TOPIC, + DefaultKafkaHeaderMapperForMultiValueTest.RETRY_TOPIC }) -class MultiValueKafkaHeaderMapperTest { +class DefaultKafkaHeaderMapperForMultiValueTest { public final static String TEST_TOPIC = "multi-value.tests"; @@ -211,10 +211,8 @@ public KafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - MultiValueKafkaHeaderMapper headerMapper = new MultiValueKafkaHeaderMapper(); - - // Add Test - headerMapper.addSingleValueHeader(SINGLE_VALUE_HEADER); + // For Test + DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(List.of(MULTI_VALUE_HEADER1, MULTI_VALUE_HEADER2)); MessagingMessageConverter converter = new MessagingMessageConverter(headerMapper); factory.setConsumerFactory(consumerFactory()); From dfdfb8d650d78dba8543ef3404058769e3bf23ad Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sat, 3 May 2025 10:19:15 +0900 Subject: [PATCH 3/4] Add java docs and test case for SimpleKafkaHeaderMapper. Signed-off-by: chickenchickenlove --- .../support/AbstractKafkaHeaderMapper.java | 4 +- .../support/DefaultKafkaHeaderMapper.java | 16 +- .../support/SimpleKafkaHeaderMapper.java | 21 + ...ultKafkaHeaderMapperForMultiValueTest.java | 160 ++++--- ...HeaderMapperMultiValueIntegrationTest.java | 446 ++++++++++++++++++ 5 files changed, 568 insertions(+), 79 deletions(-) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperMultiValueIntegrationTest.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index 5cdd76445b..43cb085520 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -110,7 +110,9 @@ protected AbstractKafkaHeaderMapper(boolean outbound, String... patterns) { /** * Construct a mapper that will match the supplied patterns (outbound) and all headers * (inbound). For outbound mapping, certain internal framework headers are never - * mapped. + * mapped. For inbound mapping, Headers that match the pattern specified in + * {@code patternsForListValue} will be appended to the values under the same key. + * * @param outbound true for an outbound mapper. * @param patternsForListValue the patterns for multiple values at the same key. * @param patterns the patterns. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java index 5ada1996d9..109ddb83e0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java @@ -106,19 +106,23 @@ public DefaultKafkaHeaderMapper() { /** * Construct an instance with the default object mapper and default header patterns - * for outbound headers; all inbound headers are mapped. The default pattern list is + * for outbound headers and default header patterns for inbound multi-value headers; + * all inbound headers are mapped. The default pattern list is * {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in * {@link KafkaHeaders} are never mapped as headers since they represent data in * consumer/producer records. + * Headers that match the pattern specified in {@code patternsForListValue} will be + * appended to the values under the same key. + * + * @param patternsForListValue the patterns for multiple values at the same key. * @see #DefaultKafkaHeaderMapper(ObjectMapper) - * @param patternsForListValue */ public DefaultKafkaHeaderMapper(List patternsForListValue) { this(JacksonUtils.enhancedObjectMapper(), - patternsForListValue, - "!" + MessageHeaders.ID, - "!" + MessageHeaders.TIMESTAMP, - "*"); + patternsForListValue, + "!" + MessageHeaders.ID, + "!" + MessageHeaders.TIMESTAMP, + "*"); } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java index f54b63477a..3abfeeec8b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java @@ -37,6 +37,8 @@ * The exceptions are correlation and reply headers for request/reply * * @author Gary Russell + * @author Sanghyeok An + * * @since 2.1.3 * */ @@ -63,6 +65,25 @@ public SimpleKafkaHeaderMapper() { "*"); } + /** + * Construct an instance with the default object mapper and default header patterns + * for outbound headers default header patterns for inbound multi-value headers; + * all inbound headers are mapped. The default pattern list is + * {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in + * {@link KafkaHeaders} are never mapped as headers since they represent data in + * consumer/producer records. + * Headers that match the pattern specified in {@code patternsForListValue} will be + * appended to the values under the same key. + * + * @param patternsForMultiValue the patterns for multiple values at the same key. + */ + public SimpleKafkaHeaderMapper(List patternsForMultiValue) { + this(true, patternsForMultiValue, + "!" + MessageHeaders.ID, + "!" + MessageHeaders.TIMESTAMP, + "*"); + } + /** * Construct an instance with a default object mapper and the provided header patterns * for outbound headers; all inbound headers are mapped. The patterns are applied in diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperForMultiValueTest.java b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperForMultiValueTest.java index 61d1f73131..3d3acb9854 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperForMultiValueTest.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperForMultiValueTest.java @@ -17,6 +17,7 @@ package org.springframework.kafka.support; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -62,6 +63,8 @@ import static org.assertj.core.api.Assertions.assertThat; /** + * This test class demonstrates that the DefaultKafkaHeaderMapper can handle + * multi-value headers for both input and output. * * @author Sanghyeok An * @@ -125,11 +128,16 @@ class DefaultKafkaHeaderMapperForMultiValueTest { void testForCommonCase() throws InterruptedException { // GIVEN + byte[] multiHeader1Value1 = "value1".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader1Value2 = "value2".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader2Value1 = "value3".getBytes(StandardCharsets.UTF_8); + String singleHeaderValue = "value4"; + Iterable recordHeaders = List.of( - new RecordHeader(MULTI_VALUE_HEADER1, "value1".getBytes(StandardCharsets.UTF_8)), - new RecordHeader(MULTI_VALUE_HEADER1, "value2".getBytes(StandardCharsets.UTF_8)), - new RecordHeader(MULTI_VALUE_HEADER2, "value3".getBytes(StandardCharsets.UTF_8)), - new RecordHeader(SINGLE_VALUE_HEADER, "value3".getBytes(StandardCharsets.UTF_8)) + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value1), + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value2), + new RecordHeader(MULTI_VALUE_HEADER2, multiHeader2Value1), + new RecordHeader(SINGLE_VALUE_HEADER, singleHeaderValue.getBytes(StandardCharsets.UTF_8)) ); ProducerRecord record = new ProducerRecord<>( @@ -140,22 +148,31 @@ void testForCommonCase() throws InterruptedException { // THEN assertThat(this.multiValueTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(this.multiValueTestListener.latchForHeader1.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(this.multiValueTestListener.latchForHeader2.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(this.multiValueTestListener.latchForCustomSingleValueHeaders.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.multiValueTestListener.latchForSingleValueHeaders.await(10, TimeUnit.SECONDS)).isTrue(); + // the multi-value header cannot be converted to String type. + assertThat(multiValueTestListener.multiValueHeaders1.get(0)).isEqualTo(multiHeader1Value1); + assertThat(multiValueTestListener.multiValueHeaders1.get(1)).isEqualTo(multiHeader1Value2); + assertThat(multiValueTestListener.multiValueHeaders2.get(0)).isEqualTo(multiHeader2Value1); + + // the single-value header can be converted to String type. + assertThat(multiValueTestListener.singleValueHeaders.get(0)).isEqualTo(singleHeaderValue); } @Test void testForDltAndRetryCase() throws InterruptedException { // GIVEN + byte[] multiHeader1Value1 = "value1".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader1Value2 = "value2".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader2Value1 = "value3".getBytes(StandardCharsets.UTF_8); + String singleHeaderValue = "value4"; + Iterable recordHeaders = List.of( - new RecordHeader(MULTI_VALUE_HEADER1, "value1".getBytes(StandardCharsets.UTF_8)), - new RecordHeader(MULTI_VALUE_HEADER1, "value2".getBytes(StandardCharsets.UTF_8)), - new RecordHeader(MULTI_VALUE_HEADER2, "value3".getBytes(StandardCharsets.UTF_8)), - new RecordHeader(SINGLE_VALUE_HEADER, "value3".getBytes(StandardCharsets.UTF_8)) + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value1), + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value2), + new RecordHeader(MULTI_VALUE_HEADER2, multiHeader2Value1), + new RecordHeader(SINGLE_VALUE_HEADER, singleHeaderValue.getBytes(StandardCharsets.UTF_8)) ); ProducerRecord record = new ProducerRecord<>( @@ -166,18 +183,41 @@ void testForDltAndRetryCase() throws InterruptedException { // THEN assertThat(this.firstTopicListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(this.firstTopicListener.latchForHeader1.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(this.firstTopicListener.latchForHeader2.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.firstTopicListener.latchForSingleValueHeadersInFirstTry.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(this.firstTopicListener.latchForCustomSingleValueHeaders.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.firstTopicListener.latchForSingleValueHeadersInRetry.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.myCustomDltProcessor.latch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(this.myCustomDltProcessor.latchForHeader1.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(this.myCustomDltProcessor.latchForHeader2.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.myCustomDltProcessor.latchForSingleValueHeadersInFirstTry.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(this.myCustomDltProcessor.latchForCustomSingleValueHeaders.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.myCustomDltProcessor.latchForSingleValueHeadersInRetry.await(10, TimeUnit.SECONDS)).isTrue(); + + // the multi-value header cannot be converted to String type. + assertThat(firstTopicListener.multiValueHeaders1.get(0)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(1)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(2)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(3)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(4)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(5)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(6)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(7)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(8)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(9)).containsExactly(multiHeader1Value2); + + assertThat(firstTopicListener.multiValueHeaders2.get(0)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(1)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(2)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(3)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(4)).containsExactly(multiHeader2Value1); + + assertThat(myCustomDltProcessor.multiValueHeaders1.get(0)).containsExactly(multiHeader1Value1); + assertThat(myCustomDltProcessor.multiValueHeaders1.get(1)).containsExactly(multiHeader1Value2); + assertThat(myCustomDltProcessor.multiValueHeaders2.get(0)).containsExactly(multiHeader2Value1); + + // the single-value header can be converted to String type. + assertThat(firstTopicListener.singleValueHeaders.get(0)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(1)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(2)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(3)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(4)).isEqualTo(singleHeaderValue); } static boolean isHeadersExpected(Map headers, String headerName) { @@ -253,31 +293,26 @@ public static class MultiValueTestListener { private final CountDownLatch latch = new CountDownLatch(1); - private final CountDownLatch latchForHeader1 = new CountDownLatch(1); - - private final CountDownLatch latchForHeader2 = new CountDownLatch(1); - private final CountDownLatch latchForSingleValueHeaders = new CountDownLatch(8); private final CountDownLatch latchForCustomSingleValueHeaders = new CountDownLatch(1); + final List multiValueHeaders1 = new ArrayList<>(); + + final List multiValueHeaders2 = new ArrayList<>(); + + final List singleValueHeaders = new ArrayList<>(); + @KafkaHandler - public void listen1(@Header(MULTI_VALUE_HEADER1) List header1, - @Header(MULTI_VALUE_HEADER2) List header2, + public void listen1(@Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, @Header(SINGLE_VALUE_HEADER) String header3, @Headers Map headers, String message) { this.latch.countDown(); - if (!header1.isEmpty()) { - this.latchForHeader1.countDown(); - } - - if (!header2.isEmpty()) { - this.latchForHeader2.countDown(); - } - if (header3 != null) { - this.latchForCustomSingleValueHeaders.countDown(); - } + header1.forEach(value -> multiValueHeaders1.add(value)); + header2.forEach(value -> multiValueHeaders2.add(value)); + this.singleValueHeaders.add(header3); for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { if (isHeadersExpected(headers, headerName)) { @@ -329,35 +364,28 @@ static class FirstTopicListener { private final CountDownLatch latch = new CountDownLatch(4); - private final CountDownLatch latchForHeader1 = new CountDownLatch(4); + private final CountDownLatch latchForSingleValueHeadersInFirstTry = new CountDownLatch(32); - private final CountDownLatch latchForHeader2 = new CountDownLatch(4); + private final CountDownLatch latchForSingleValueHeadersInRetry = new CountDownLatch(52); - private final CountDownLatch latchForSingleValueHeadersInFirstTry = new CountDownLatch(32); + final List multiValueHeaders1 = new ArrayList<>(); - private final CountDownLatch latchForCustomSingleValueHeaders = new CountDownLatch(4); + final List multiValueHeaders2 = new ArrayList<>(); - private final CountDownLatch latchForSingleValueHeadersInRetry = new CountDownLatch(52); + final List singleValueHeaders = new ArrayList<>(); @KafkaHandler public void listen( - @Header(MULTI_VALUE_HEADER1) List header1, - @Header(MULTI_VALUE_HEADER2) List header2, + @Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, @Header(SINGLE_VALUE_HEADER) String header3, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, @Headers Map headers, String message) { this.latch.countDown(); - if (!header1.isEmpty()) { - this.latchForHeader1.countDown(); - } - - if (!header2.isEmpty()) { - this.latchForHeader2.countDown(); - } - - if (header3 != null) { - this.latchForCustomSingleValueHeaders.countDown(); - } + header1.forEach(multiValueHeaders1::add); + header2.forEach(multiValueHeaders2::add); + this.singleValueHeaders.add(header3); for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { if (isHeadersExpected(headers, headerName)) { @@ -379,35 +407,23 @@ static class MyCustomDltProcessor { private final CountDownLatch latch = new CountDownLatch(1); - private final CountDownLatch latchForHeader1 = new CountDownLatch(1); - - private final CountDownLatch latchForHeader2 = new CountDownLatch(1); - private final CountDownLatch latchForSingleValueHeadersInFirstTry = new CountDownLatch(8); - private final CountDownLatch latchForCustomSingleValueHeaders = new CountDownLatch(1); - private final CountDownLatch latchForSingleValueHeadersInRetry = new CountDownLatch(13); + final List multiValueHeaders1 = new ArrayList<>(); + + final List multiValueHeaders2 = new ArrayList<>(); + public void processDltMessage( - Object message, - @Header(MULTI_VALUE_HEADER1) List header1, - @Header(MULTI_VALUE_HEADER2) List header2, - @Header(SINGLE_VALUE_HEADER) String header3, - @Headers Map headers) { + @Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, + @Headers Map headers, String message) { this.latch.countDown(); - if (!header1.isEmpty()) { - this.latchForHeader1.countDown(); - } - - if (!header2.isEmpty()) { - this.latchForHeader2.countDown(); - } - if (header3 != null) { - this.latchForCustomSingleValueHeaders.countDown(); - } + header1.forEach(multiValueHeaders1::add); + header2.forEach(multiValueHeaders2::add); for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { if (isHeadersExpected(headers, headerName)) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperMultiValueIntegrationTest.java b/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperMultiValueIntegrationTest.java new file mode 100644 index 0000000000..791a05babf --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperMultiValueIntegrationTest.java @@ -0,0 +1,446 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerConfigUtils; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.retrytopic.RetryTopicConfiguration; +import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder; +import org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport; +import org.springframework.kafka.retrytopic.RetryTopicHeaders; +import org.springframework.kafka.support.SimpleKafkaHeaderMapperMultiValueIntegrationTest.Config.MultiValueTestListener; +import org.springframework.kafka.support.SimpleKafkaHeaderMapperMultiValueIntegrationTest.RetryTopicConfigurations.FirstTopicListener; +import org.springframework.kafka.support.SimpleKafkaHeaderMapperMultiValueIntegrationTest.RetryTopicConfigurations.MyCustomDltProcessor; +import org.springframework.kafka.support.converter.MessagingMessageConverter; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Headers; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.stereotype.Component; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * This test class demonstrates that the SimpleKafkaHeaderMapper can handle + * multi-value headers for both input and output. + * + * @author Sanghyeok An + * + * @since 4.0.0 + */ + +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(partitions = 1, topics = { + SimpleKafkaHeaderMapperMultiValueIntegrationTest.TEST_TOPIC, + SimpleKafkaHeaderMapperMultiValueIntegrationTest.RETRY_TOPIC +}) +class SimpleKafkaHeaderMapperMultiValueIntegrationTest { + + public final static String TEST_TOPIC = "multi-value.tests"; + + public final static String RETRY_TOPIC = "multi-value-retry.tests"; + + static final String MULTI_VALUE_HEADER1 = "test-multi-value1"; + + static final String MULTI_VALUE_HEADER2 = "test-multi-value2"; + + static final String SINGLE_VALUE_HEADER = "test-single-value"; + + static final List SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY = List.of(KafkaHeaders.OFFSET, + KafkaHeaders.CONSUMER, + KafkaHeaders.TIMESTAMP_TYPE, + KafkaHeaders.RECEIVED_PARTITION, + KafkaHeaders.RECEIVED_KEY, + KafkaHeaders.RECEIVED_TOPIC, + KafkaHeaders.RECEIVED_TIMESTAMP, + KafkaHeaders.GROUP_ID); + + static final List SHOULD_BE_SINGLE_VALUE_IN_RETRY = List.of(KafkaHeaders.ORIGINAL_OFFSET, + KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE, + KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP, + KafkaHeaders.ORIGINAL_TOPIC, + KafkaHeaders.ORIGINAL_TIMESTAMP, + KafkaHeaders.ORIGINAL_PARTITION, + RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, + KafkaHeaders.EXCEPTION_CAUSE_FQCN, + KafkaHeaders.EXCEPTION_STACKTRACE, + KafkaHeaders.EXCEPTION_FQCN, + KafkaHeaders.EXCEPTION_MESSAGE, + RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, + RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP); + + @Autowired + private KafkaTemplate template; + + @Autowired + private MultiValueTestListener multiValueTestListener; + + @Autowired + private FirstTopicListener firstTopicListener; + + @Autowired + private MyCustomDltProcessor myCustomDltProcessor; + + @Test + void testForCommonCase() throws InterruptedException { + + // GIVEN + byte[] multiHeader1Value1 = "value1".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader1Value2 = "value2".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader2Value1 = "value3".getBytes(StandardCharsets.UTF_8); + String singleHeaderValue = "value4"; + + Iterable recordHeaders = List.of( + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value1), + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value2), + new RecordHeader(MULTI_VALUE_HEADER2, multiHeader2Value1), + new RecordHeader(SINGLE_VALUE_HEADER, singleHeaderValue.getBytes(StandardCharsets.UTF_8)) + ); + + ProducerRecord record = new ProducerRecord<>( + TEST_TOPIC, 0, 1, "hello-value", recordHeaders); + + // WHEN + this.template.send(record); + + // THEN + assertThat(this.multiValueTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.multiValueTestListener.latchForSingleValueHeaders.await(10, TimeUnit.SECONDS)).isTrue(); + + // the multi-value header cannot be converted to String type. + assertThat(multiValueTestListener.multiValueHeaders1.get(0)).isEqualTo(multiHeader1Value1); + assertThat(multiValueTestListener.multiValueHeaders1.get(1)).isEqualTo(multiHeader1Value2); + assertThat(multiValueTestListener.multiValueHeaders2.get(0)).isEqualTo(multiHeader2Value1); + + // the single-value header can be converted to String type. + assertThat(multiValueTestListener.singleValueHeaders.get(0)).isEqualTo(singleHeaderValue); + } + + @Test + void testForDltAndRetryCase() throws InterruptedException { + + // GIVEN + byte[] multiHeader1Value1 = "value1".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader1Value2 = "value2".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader2Value1 = "value3".getBytes(StandardCharsets.UTF_8); + String singleHeaderValue = "value4"; + + Iterable recordHeaders = List.of( + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value1), + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value2), + new RecordHeader(MULTI_VALUE_HEADER2, multiHeader2Value1), + new RecordHeader(SINGLE_VALUE_HEADER, singleHeaderValue.getBytes(StandardCharsets.UTF_8)) + ); + + ProducerRecord record = new ProducerRecord<>( + RETRY_TOPIC, 0, 1, "hello-value", recordHeaders); + + // WHEN + this.template.send(record); + + // THEN + assertThat(this.firstTopicListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.firstTopicListener.latchForSingleValueHeadersInFirstTry.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.firstTopicListener.latchForSingleValueHeadersInRetry.await(10, TimeUnit.SECONDS)).isTrue(); + + assertThat(this.myCustomDltProcessor.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.myCustomDltProcessor.latchForSingleValueHeadersInFirstTry.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.myCustomDltProcessor.latchForSingleValueHeadersInRetry.await(10, TimeUnit.SECONDS)).isTrue(); + + // the multi-value header cannot be converted to String type. + assertThat(firstTopicListener.multiValueHeaders1.get(0)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(1)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(2)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(3)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(4)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(5)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(6)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(7)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(8)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(9)).containsExactly(multiHeader1Value2); + + assertThat(firstTopicListener.multiValueHeaders2.get(0)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(1)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(2)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(3)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(4)).containsExactly(multiHeader2Value1); + + assertThat(myCustomDltProcessor.multiValueHeaders1.get(0)).containsExactly(multiHeader1Value1); + assertThat(myCustomDltProcessor.multiValueHeaders1.get(1)).containsExactly(multiHeader1Value2); + assertThat(myCustomDltProcessor.multiValueHeaders2.get(0)).containsExactly(multiHeader2Value1); + + // the single-value header can be converted to String type. + assertThat(firstTopicListener.singleValueHeaders.get(0)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(1)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(2)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(3)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(4)).isEqualTo(singleHeaderValue); + } + + static boolean isHeadersExpected(Map headers, String headerName) { + Object value = headers.get(headerName); + if (value == null) { + return false; + } + + if (value instanceof Iterable) { + return false; + } + return true; + } + + @Configuration + @EnableKafka + public static class Config { + + @Autowired + private EmbeddedKafkaBroker broker; + + final CountDownLatch latch = new CountDownLatch(1); + + @Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) + public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() { + return new KafkaListenerEndpointRegistry(); + } + + @Bean + public KafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + + // For Test + SimpleKafkaHeaderMapper headerMapper = new SimpleKafkaHeaderMapper(List.of(MULTI_VALUE_HEADER1, MULTI_VALUE_HEADER2)); + MessagingMessageConverter converter = new MessagingMessageConverter(headerMapper); + + factory.setConsumerFactory(consumerFactory()); + factory.setRecordMessageConverter(converter); + return factory; + } + + @Bean + public DefaultKafkaConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public Map consumerConfigs() { + Map consumerProps = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), "mutliValueGroup", "false"); + return consumerProps; + } + + @Bean + public KafkaTemplate template() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public Map producerConfigs() { + return KafkaTestUtils.producerProps(this.broker.getBrokersAsString()); + } + + @Component + @KafkaListener(topics = TEST_TOPIC) + public static class MultiValueTestListener { + + private final CountDownLatch latch = new CountDownLatch(1); + + private final CountDownLatch latchForSingleValueHeaders = new CountDownLatch(8); + + private final CountDownLatch latchForCustomSingleValueHeaders = new CountDownLatch(1); + + final List multiValueHeaders1 = new ArrayList<>(); + + final List multiValueHeaders2 = new ArrayList<>(); + + final List singleValueHeaders = new ArrayList<>(); + + @KafkaHandler + public void listen1(@Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, + @Header(SINGLE_VALUE_HEADER) String header3, + @Headers Map headers, String message) { + this.latch.countDown(); + + header1.forEach(value -> multiValueHeaders1.add(value)); + header2.forEach(value -> multiValueHeaders2.add(value)); + this.singleValueHeaders.add(header3); + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeaders.countDown(); + } + } + } + + } + + } + + @Configuration + static class RetryTopicConfigurations extends RetryTopicConfigurationSupport { + + private static final String DLT_METHOD_NAME = "processDltMessage"; + + @Bean + RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .fixedBackOff(50) + .maxAttempts(5) + .concurrency(1) + .useSingleTopicForSameIntervals() + .includeTopic(RETRY_TOPIC) + .doNotRetryOnDltFailure() + .dltHandlerMethod("myCustomDltProcessor", DLT_METHOD_NAME) + .create(template); + } + + @Bean + FirstTopicListener firstTopicListener() { + return new FirstTopicListener(); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor() { + return new MyCustomDltProcessor(); + } + + @Bean + TaskScheduler sched() { + return new ThreadPoolTaskScheduler(); + } + + @KafkaListener(id = "firstTopicId", topics = RETRY_TOPIC, concurrency = "2") + static class FirstTopicListener { + + private final CountDownLatch latch = new CountDownLatch(4); + + private final CountDownLatch latchForSingleValueHeadersInFirstTry = new CountDownLatch(32); + + private final CountDownLatch latchForSingleValueHeadersInRetry = new CountDownLatch(52); + + final List multiValueHeaders1 = new ArrayList<>(); + + final List multiValueHeaders2 = new ArrayList<>(); + + final List singleValueHeaders = new ArrayList<>(); + + @KafkaHandler + public void listen( + @Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, + @Header(SINGLE_VALUE_HEADER) String header3, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @Headers Map headers, String message) { + this.latch.countDown(); + + header1.forEach(multiValueHeaders1::add); + header2.forEach(multiValueHeaders2::add); + this.singleValueHeaders.add(header3); + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInFirstTry.countDown(); + } + } + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInRetry.countDown(); + } + } + throw new RuntimeException("Woooops... in topic " + message); + } + + } + + static class MyCustomDltProcessor { + + private final CountDownLatch latch = new CountDownLatch(1); + + private final CountDownLatch latchForSingleValueHeadersInFirstTry = new CountDownLatch(8); + + private final CountDownLatch latchForSingleValueHeadersInRetry = new CountDownLatch(13); + + final List multiValueHeaders1 = new ArrayList<>(); + + final List multiValueHeaders2 = new ArrayList<>(); + + public void processDltMessage( + @Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, + @Headers Map headers, String message) { + + this.latch.countDown(); + + header1.forEach(multiValueHeaders1::add); + header2.forEach(multiValueHeaders2::add); + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInFirstTry.countDown(); + } + } + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInRetry.countDown(); + } + } + throw new RuntimeException("Woooops... in topic " + message); + } + + } + + } + +} From 865b26aaf1bff175a4599759c640c20f72c13486 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sat, 3 May 2025 10:24:10 +0900 Subject: [PATCH 4/4] Fixes typo Signed-off-by: chickenchickenlove --- .../support/DefaultKafkaHeaderMapper.java | 4 +- .../support/MultiValueKafkaHeaderMapper.java | 129 ------------------ 2 files changed, 2 insertions(+), 131 deletions(-) delete mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapper.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java index 109ddb83e0..527ea2fbdb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2025 the original author or authors. + * Copyright 2017-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,7 +49,7 @@ * @author Gary Russell * @author Artem Bilan * @author Soby Chacko - * @author Sanghyoek An + * @author Sanghyeok An * * @since 1.3 * diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapper.java deleted file mode 100644 index ae58c44b30..0000000000 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapper.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright 2017-2025 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.kafka.support; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.kafka.common.header.Header; - -import org.springframework.kafka.retrytopic.RetryTopicHeaders; - -/** - * Extended Header Mapper based on {@link DefaultKafkaHeaderMapper}. - * This Header Mapper manages header values as a list, - * except for certain reserved headers. - * Other behaviors are identical to {@link DefaultKafkaHeaderMapper}. - * - * @author Sanghyeok An - * - * @since 4.0.0 - * - */ -public class MultiValueKafkaHeaderMapper extends DefaultKafkaHeaderMapper { - - private final List defaultSingleValueHeaderList = List.of( - KafkaHeaders.PREFIX, - KafkaHeaders.RECEIVED, - KafkaHeaders.TOPIC, - KafkaHeaders.KEY, - KafkaHeaders.PARTITION, - KafkaHeaders.OFFSET, - KafkaHeaders.RAW_DATA, - KafkaHeaders.RECORD_METADATA, - KafkaHeaders.ACKNOWLEDGMENT, - KafkaHeaders.CONSUMER, - KafkaHeaders.RECEIVED_TOPIC, - KafkaHeaders.RECEIVED_KEY, - KafkaHeaders.RECEIVED_PARTITION, - KafkaHeaders.TIMESTAMP_TYPE, - KafkaHeaders.TIMESTAMP, - KafkaHeaders.RECEIVED_TIMESTAMP, - KafkaHeaders.NATIVE_HEADERS, - KafkaHeaders.BATCH_CONVERTED_HEADERS, - KafkaHeaders.CORRELATION_ID, - KafkaHeaders.REPLY_TOPIC, - KafkaHeaders.REPLY_PARTITION, - KafkaHeaders.DLT_EXCEPTION_FQCN, - KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN, - KafkaHeaders.DLT_EXCEPTION_STACKTRACE, - KafkaHeaders.DLT_EXCEPTION_MESSAGE, - KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE, - KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE, - KafkaHeaders.DLT_KEY_EXCEPTION_FQCN, - KafkaHeaders.DLT_ORIGINAL_TOPIC, - KafkaHeaders.DLT_ORIGINAL_PARTITION, - KafkaHeaders.DLT_ORIGINAL_OFFSET, - KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP, - KafkaHeaders.DLT_ORIGINAL_TIMESTAMP, - KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE, - KafkaHeaders.GROUP_ID, - KafkaHeaders.DELIVERY_ATTEMPT, - KafkaHeaders.EXCEPTION_FQCN, - KafkaHeaders.EXCEPTION_CAUSE_FQCN, - KafkaHeaders.EXCEPTION_STACKTRACE, - KafkaHeaders.EXCEPTION_MESSAGE, - KafkaHeaders.KEY_EXCEPTION_STACKTRACE, - KafkaHeaders.KEY_EXCEPTION_MESSAGE, - KafkaHeaders.KEY_EXCEPTION_FQCN, - KafkaHeaders.ORIGINAL_TOPIC, - KafkaHeaders.ORIGINAL_PARTITION, - KafkaHeaders.ORIGINAL_OFFSET, - KafkaHeaders.ORIGINAL_TIMESTAMP, - KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE, - KafkaHeaders.CONVERSION_FAILURES, - KafkaHeaders.LISTENER_INFO, - RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, - RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, - RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP); - - private final Set singleValueHeaders = new HashSet<>(this.defaultSingleValueHeaderList); - - /** - * Adds headers that the {@link MultiValueKafkaHeaderMapper} should handle as single values. - * @param headerName the header name. - */ - public void addSingleValueHeader(String headerName) { - this.singleValueHeaders.add(headerName); - } - - @Override - protected void handleHeader(String headerName, Header header, Map headers) { - if (this.singleValueHeaders.contains(headerName)) { - headers.put(headerName, headerValueToAddIn(header)); - } - else { - Object values = headers.getOrDefault(headerName, new ArrayList<>()); - - if (values instanceof List) { - @SuppressWarnings("unchecked") - List castedValues = (List) values; - castedValues.add(headerValueToAddIn(header)); - headers.put(headerName, castedValues); - } - else { - headers.put(headerName, headerValueToAddIn(header)); - } - - } - - } - -}