diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index ddad12080e..b760a9c75a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -65,6 +65,8 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { private final List matchers = new ArrayList<>(); + private final List headerMatchersForMultiValue = new ArrayList<>(); + private final Map rawMappedHeaders = new HashMap<>(); { @@ -191,6 +193,16 @@ public void addRawMappedHeader(String name, boolean toString) { this.rawMappedHeaders.put(name, toString); } + /** + * Add patterns for matching multi-value headers under the same key. + * @param patterns the patterns for header. + */ + public void addHeaderPatternsForMultiValue(String ... patterns) { + for (String pattern : patterns) { + this.headerMatchersForMultiValue.add(new SimplePatternBasedHeaderMatcher(pattern)); + } + } + protected boolean matches(String header, Object value) { if (matches(header)) { if ((header.equals(MessageHeaders.REPLY_CHANNEL) || header.equals(MessageHeaders.ERROR_CHANNEL)) @@ -251,6 +263,20 @@ protected Object headerValueToAddOut(String key, Object value) { return valueToAdd; } + /** + * Check whether the header value should be mapped to multiple values. + * @param headerName the header name. + * @return True for multiple values at the same key. + */ + protected boolean doesMatchMultiValueHeader(String headerName) { + for (HeaderMatcher headerMatcher : this.headerMatchersForMultiValue) { + if (headerMatcher.matchHeader(headerName)) { + return true; + } + } + return false; + } + @SuppressWarnings("NullAway") // Dataflow analysis limitation @Nullable private byte[] mapRawOut(String header, Object value) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java index 45e6c1eed2..055b934895 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -48,6 +49,7 @@ * @author Gary Russell * @author Artem Bilan * @author Soby Chacko + * @author Sanghyoek An * * @since 1.3 * @@ -324,12 +326,37 @@ else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) { populateJsonValueHeader(header, requestedType, headers); } else { - headers.put(headerName, headerValueToAddIn(header)); + handleHeader(headerName, header, headers); } } }); } + /** + * Handle non-reserved headers in {@link DefaultKafkaHeaderMapper}. + * @param headerName the header name. + * @param header the header instance. + * @param headers the target headers. + * @since 4.0.0 + */ + protected void handleHeader(String headerName, Header header, final Map headers) { + if (!this.doesMatchMultiValueHeader(headerName)) { + headers.put(headerName, headerValueToAddIn(header)); + } + else { + Object values = headers.getOrDefault(headerName, new ArrayList<>()); + if (values instanceof List) { + @SuppressWarnings("unchecked") + List castedValues = (List) values; + castedValues.add(headerValueToAddIn(header)); + headers.put(headerName, castedValues); + } + else { + headers.put(headerName, headerValueToAddIn(header)); + } + } + } + private void populateJsonValueHeader(Header header, String requestedType, Map headers) { Class type = Object.class; boolean trusted = false; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java index 30effa38e0..5d030c23ed 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,9 @@ package org.springframework.kafka.support; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -35,6 +37,7 @@ * The exceptions are correlation and reply headers for request/reply * * @author Gary Russell + * @author Sanghyeok An * @since 2.1.3 * */ @@ -111,7 +114,21 @@ public void toHeaders(Headers source, Map target) { target.put(headerName, ByteBuffer.wrap(header.value()).getInt()); } else { - target.put(headerName, headerValueToAddIn(header)); + if (!this.doesMatchMultiValueHeader(headerName)) { + target.put(headerName, headerValueToAddIn(header)); + } + else { + Object values = target.getOrDefault(headerName, new ArrayList<>()); + if (values instanceof List) { + @SuppressWarnings("unchecked") + List castedValues = (List) values; + castedValues.add(headerValueToAddIn(header)); + target.put(headerName, castedValues); + } + else { + target.put(headerName, headerValueToAddIn(header)); + } + } } } }); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperForMultiValueTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperForMultiValueTests.java new file mode 100644 index 0000000000..0feb1d0da2 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperForMultiValueTests.java @@ -0,0 +1,449 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerConfigUtils; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.retrytopic.RetryTopicConfiguration; +import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder; +import org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport; +import org.springframework.kafka.retrytopic.RetryTopicHeaders; +import org.springframework.kafka.support.DefaultKafkaHeaderMapperForMultiValueTests.Config.MultiValueTestListener; +import org.springframework.kafka.support.DefaultKafkaHeaderMapperForMultiValueTests.RetryTopicConfigurations.FirstTopicListener; +import org.springframework.kafka.support.DefaultKafkaHeaderMapperForMultiValueTests.RetryTopicConfigurations.MyCustomDltProcessor; +import org.springframework.kafka.support.converter.MessagingMessageConverter; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Headers; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.stereotype.Component; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * This test class demonstrates that the DefaultKafkaHeaderMapper can handle + * multi-value headers for both input and output. + * + * @author Sanghyeok An + * + * @since 4.0.0 + */ + +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(partitions = 1, topics = { + DefaultKafkaHeaderMapperForMultiValueTests.TEST_TOPIC, + DefaultKafkaHeaderMapperForMultiValueTests.RETRY_TOPIC +}) +class DefaultKafkaHeaderMapperForMultiValueTests { + + public final static String TEST_TOPIC = "multi-value.tests"; + + public final static String RETRY_TOPIC = "multi-value-retry.tests"; + + static final String MULTI_VALUE_HEADER1 = "test-multi-value1"; + + static final String MULTI_VALUE_HEADER2 = "test-multi-value2"; + + static final String SINGLE_VALUE_HEADER = "test-single-value"; + + static final List SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY = List.of(KafkaHeaders.OFFSET, + KafkaHeaders.CONSUMER, + KafkaHeaders.TIMESTAMP_TYPE, + KafkaHeaders.RECEIVED_PARTITION, + KafkaHeaders.RECEIVED_KEY, + KafkaHeaders.RECEIVED_TOPIC, + KafkaHeaders.RECEIVED_TIMESTAMP, + KafkaHeaders.GROUP_ID); + + static final List SHOULD_BE_SINGLE_VALUE_IN_RETRY = List.of(KafkaHeaders.ORIGINAL_OFFSET, + KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE, + KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP, + KafkaHeaders.ORIGINAL_TOPIC, + KafkaHeaders.ORIGINAL_TIMESTAMP, + KafkaHeaders.ORIGINAL_PARTITION, + RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, + KafkaHeaders.EXCEPTION_CAUSE_FQCN, + KafkaHeaders.EXCEPTION_STACKTRACE, + KafkaHeaders.EXCEPTION_FQCN, + KafkaHeaders.EXCEPTION_MESSAGE, + RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, + RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP); + + @Autowired + private KafkaTemplate template; + + @Autowired + private MultiValueTestListener multiValueTestListener; + + @Autowired + private FirstTopicListener firstTopicListener; + + @Autowired + private MyCustomDltProcessor myCustomDltProcessor; + + @Test + void testForCommonCase() throws InterruptedException { + + // GIVEN + byte[] multiHeader1Value1 = "value1".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader1Value2 = "value2".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader2Value1 = "value3".getBytes(StandardCharsets.UTF_8); + String singleHeaderValue = "value4"; + + Iterable recordHeaders = List.of( + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value1), + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value2), + new RecordHeader(MULTI_VALUE_HEADER2, multiHeader2Value1), + new RecordHeader(SINGLE_VALUE_HEADER, singleHeaderValue.getBytes(StandardCharsets.UTF_8)) + ); + + ProducerRecord record = new ProducerRecord<>( + TEST_TOPIC, 0, 1, "hello-value", recordHeaders); + + // WHEN + this.template.send(record); + + // THEN + assertThat(this.multiValueTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.multiValueTestListener.latchForSingleValueHeaders.await(10, TimeUnit.SECONDS)).isTrue(); + + // the multi-value header cannot be converted to String type. + assertThat(multiValueTestListener.multiValueHeaders1.get(0)).isEqualTo(multiHeader1Value1); + assertThat(multiValueTestListener.multiValueHeaders1.get(1)).isEqualTo(multiHeader1Value2); + assertThat(multiValueTestListener.multiValueHeaders2.get(0)).isEqualTo(multiHeader2Value1); + + // the single-value header can be converted to String type. + assertThat(multiValueTestListener.singleValueHeaders.get(0)).isEqualTo(singleHeaderValue); + } + + @Test + void testForDltAndRetryCase() throws InterruptedException { + + // GIVEN + byte[] multiHeader1Value1 = "value1".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader1Value2 = "value2".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader2Value1 = "value3".getBytes(StandardCharsets.UTF_8); + String singleHeaderValue = "value4"; + + Iterable recordHeaders = List.of( + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value1), + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value2), + new RecordHeader(MULTI_VALUE_HEADER2, multiHeader2Value1), + new RecordHeader(SINGLE_VALUE_HEADER, singleHeaderValue.getBytes(StandardCharsets.UTF_8)) + ); + + ProducerRecord record = new ProducerRecord<>( + RETRY_TOPIC, 0, 1, "hello-value", recordHeaders); + + // WHEN + this.template.send(record); + + // THEN + assertThat(this.firstTopicListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.firstTopicListener.latchForSingleValueHeadersInFirstTry.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.firstTopicListener.latchForSingleValueHeadersInRetry.await(10, TimeUnit.SECONDS)).isTrue(); + + assertThat(this.myCustomDltProcessor.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.myCustomDltProcessor.latchForSingleValueHeadersInFirstTry.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.myCustomDltProcessor.latchForSingleValueHeadersInRetry.await(10, TimeUnit.SECONDS)).isTrue(); + + // the multi-value header cannot be converted to String type. + assertThat(firstTopicListener.multiValueHeaders1.get(0)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(1)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(2)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(3)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(4)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(5)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(6)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(7)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(8)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(9)).containsExactly(multiHeader1Value2); + + assertThat(firstTopicListener.multiValueHeaders2.get(0)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(1)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(2)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(3)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(4)).containsExactly(multiHeader2Value1); + + assertThat(myCustomDltProcessor.multiValueHeaders1.get(0)).containsExactly(multiHeader1Value1); + assertThat(myCustomDltProcessor.multiValueHeaders1.get(1)).containsExactly(multiHeader1Value2); + assertThat(myCustomDltProcessor.multiValueHeaders2.get(0)).containsExactly(multiHeader2Value1); + + // the single-value header can be converted to String type. + assertThat(firstTopicListener.singleValueHeaders.get(0)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(1)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(2)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(3)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(4)).isEqualTo(singleHeaderValue); + } + + static boolean isHeadersExpected(Map headers, String headerName) { + Object value = headers.get(headerName); + if (value == null) { + return false; + } + + if (value instanceof Iterable) { + return false; + } + return true; + } + + @Configuration + @EnableKafka + public static class Config { + + @Autowired + private EmbeddedKafkaBroker broker; + + final CountDownLatch latch = new CountDownLatch(1); + + @Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) + public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() { + return new KafkaListenerEndpointRegistry(); + } + + @Bean + public KafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + + // For Test + DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(); + headerMapper.addHeaderPatternsForMultiValue(MULTI_VALUE_HEADER1, MULTI_VALUE_HEADER2); + + MessagingMessageConverter converter = new MessagingMessageConverter(); + converter.setHeaderMapper(headerMapper); + + factory.setConsumerFactory(consumerFactory()); + factory.setRecordMessageConverter(converter); + return factory; + } + + @Bean + public DefaultKafkaConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public Map consumerConfigs() { + Map consumerProps = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), "mutliValueGroup", "false"); + return consumerProps; + } + + @Bean + public KafkaTemplate template() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public Map producerConfigs() { + return KafkaTestUtils.producerProps(this.broker.getBrokersAsString()); + } + + @Component + @KafkaListener(topics = TEST_TOPIC) + public static class MultiValueTestListener { + + private final CountDownLatch latch = new CountDownLatch(1); + + private final CountDownLatch latchForSingleValueHeaders = new CountDownLatch(8); + + private final CountDownLatch latchForCustomSingleValueHeaders = new CountDownLatch(1); + + final List multiValueHeaders1 = new ArrayList<>(); + + final List multiValueHeaders2 = new ArrayList<>(); + + final List singleValueHeaders = new ArrayList<>(); + + @KafkaHandler + public void listen1(@Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, + @Header(SINGLE_VALUE_HEADER) String header3, + @Headers Map headers, String message) { + this.latch.countDown(); + + header1.forEach(value -> multiValueHeaders1.add(value)); + header2.forEach(value -> multiValueHeaders2.add(value)); + this.singleValueHeaders.add(header3); + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeaders.countDown(); + } + } + } + + } + + } + + @Configuration + static class RetryTopicConfigurations extends RetryTopicConfigurationSupport { + + private static final String DLT_METHOD_NAME = "processDltMessage"; + + @Bean + RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .fixedBackOff(50) + .maxAttempts(5) + .concurrency(1) + .useSingleTopicForSameIntervals() + .includeTopic(RETRY_TOPIC) + .doNotRetryOnDltFailure() + .dltHandlerMethod("myCustomDltProcessor", DLT_METHOD_NAME) + .create(template); + } + + @Bean + FirstTopicListener firstTopicListener() { + return new FirstTopicListener(); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor() { + return new MyCustomDltProcessor(); + } + + @Bean + TaskScheduler sched() { + return new ThreadPoolTaskScheduler(); + } + + @KafkaListener(id = "firstTopicId", topics = RETRY_TOPIC, concurrency = "2") + static class FirstTopicListener { + + private final CountDownLatch latch = new CountDownLatch(4); + + private final CountDownLatch latchForSingleValueHeadersInFirstTry = new CountDownLatch(32); + + private final CountDownLatch latchForSingleValueHeadersInRetry = new CountDownLatch(52); + + final List multiValueHeaders1 = new ArrayList<>(); + + final List multiValueHeaders2 = new ArrayList<>(); + + final List singleValueHeaders = new ArrayList<>(); + + @KafkaHandler + public void listen( + @Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, + @Header(SINGLE_VALUE_HEADER) String header3, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @Headers Map headers, String message) { + this.latch.countDown(); + + header1.forEach(multiValueHeaders1::add); + header2.forEach(multiValueHeaders2::add); + this.singleValueHeaders.add(header3); + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInFirstTry.countDown(); + } + } + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInRetry.countDown(); + } + } + throw new RuntimeException("Woooops... in topic " + message); + } + + } + + static class MyCustomDltProcessor { + + private final CountDownLatch latch = new CountDownLatch(1); + + private final CountDownLatch latchForSingleValueHeadersInFirstTry = new CountDownLatch(8); + + private final CountDownLatch latchForSingleValueHeadersInRetry = new CountDownLatch(13); + + final List multiValueHeaders1 = new ArrayList<>(); + + final List multiValueHeaders2 = new ArrayList<>(); + + public void processDltMessage( + @Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, + @Headers Map headers, String message) { + + this.latch.countDown(); + + header1.forEach(multiValueHeaders1::add); + header2.forEach(multiValueHeaders2::add); + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInFirstTry.countDown(); + } + } + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInRetry.countDown(); + } + } + throw new RuntimeException("Woooops... in topic " + message); + } + + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperMultiValueIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperMultiValueIntegrationTests.java new file mode 100644 index 0000000000..6280f57f36 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperMultiValueIntegrationTests.java @@ -0,0 +1,448 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerConfigUtils; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.retrytopic.RetryTopicConfiguration; +import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder; +import org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport; +import org.springframework.kafka.retrytopic.RetryTopicHeaders; +import org.springframework.kafka.support.SimpleKafkaHeaderMapperMultiValueIntegrationTests.Config.MultiValueTestListener; +import org.springframework.kafka.support.SimpleKafkaHeaderMapperMultiValueIntegrationTests.RetryTopicConfigurations.FirstTopicListener; +import org.springframework.kafka.support.SimpleKafkaHeaderMapperMultiValueIntegrationTests.RetryTopicConfigurations.MyCustomDltProcessor; +import org.springframework.kafka.support.converter.MessagingMessageConverter; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Headers; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.stereotype.Component; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * This test class demonstrates that the SimpleKafkaHeaderMapper can handle + * multi-value headers for both input and output. + * + * @author Sanghyeok An + * + * @since 4.0.0 + */ + +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(partitions = 1, topics = { + SimpleKafkaHeaderMapperMultiValueIntegrationTests.TEST_TOPIC, + SimpleKafkaHeaderMapperMultiValueIntegrationTests.RETRY_TOPIC +}) +class SimpleKafkaHeaderMapperMultiValueIntegrationTests { + + public final static String TEST_TOPIC = "multi-value.tests"; + + public final static String RETRY_TOPIC = "multi-value-retry.tests"; + + static final String MULTI_VALUE_HEADER1 = "test-multi-value1"; + + static final String MULTI_VALUE_HEADER2 = "test-multi-value2"; + + static final String SINGLE_VALUE_HEADER = "test-single-value"; + + static final List SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY = List.of(KafkaHeaders.OFFSET, + KafkaHeaders.CONSUMER, + KafkaHeaders.TIMESTAMP_TYPE, + KafkaHeaders.RECEIVED_PARTITION, + KafkaHeaders.RECEIVED_KEY, + KafkaHeaders.RECEIVED_TOPIC, + KafkaHeaders.RECEIVED_TIMESTAMP, + KafkaHeaders.GROUP_ID); + + static final List SHOULD_BE_SINGLE_VALUE_IN_RETRY = List.of(KafkaHeaders.ORIGINAL_OFFSET, + KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE, + KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP, + KafkaHeaders.ORIGINAL_TOPIC, + KafkaHeaders.ORIGINAL_TIMESTAMP, + KafkaHeaders.ORIGINAL_PARTITION, + RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, + KafkaHeaders.EXCEPTION_CAUSE_FQCN, + KafkaHeaders.EXCEPTION_STACKTRACE, + KafkaHeaders.EXCEPTION_FQCN, + KafkaHeaders.EXCEPTION_MESSAGE, + RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, + RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP); + + @Autowired + private KafkaTemplate template; + + @Autowired + private MultiValueTestListener multiValueTestListener; + + @Autowired + private FirstTopicListener firstTopicListener; + + @Autowired + private MyCustomDltProcessor myCustomDltProcessor; + + @Test + void testForCommonCase() throws InterruptedException { + + // GIVEN + byte[] multiHeader1Value1 = "value1".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader1Value2 = "value2".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader2Value1 = "value3".getBytes(StandardCharsets.UTF_8); + String singleHeaderValue = "value4"; + + Iterable recordHeaders = List.of( + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value1), + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value2), + new RecordHeader(MULTI_VALUE_HEADER2, multiHeader2Value1), + new RecordHeader(SINGLE_VALUE_HEADER, singleHeaderValue.getBytes(StandardCharsets.UTF_8)) + ); + + ProducerRecord record = new ProducerRecord<>( + TEST_TOPIC, 0, 1, "hello-value", recordHeaders); + + // WHEN + this.template.send(record); + + // THEN + assertThat(this.multiValueTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.multiValueTestListener.latchForSingleValueHeaders.await(10, TimeUnit.SECONDS)).isTrue(); + + // the multi-value header cannot be converted to String type. + assertThat(multiValueTestListener.multiValueHeaders1.get(0)).isEqualTo(multiHeader1Value1); + assertThat(multiValueTestListener.multiValueHeaders1.get(1)).isEqualTo(multiHeader1Value2); + assertThat(multiValueTestListener.multiValueHeaders2.get(0)).isEqualTo(multiHeader2Value1); + + // the single-value header can be converted to String type. + assertThat(multiValueTestListener.singleValueHeaders.get(0)).isEqualTo(singleHeaderValue); + } + + @Test + void testForDltAndRetryCase() throws InterruptedException { + + // GIVEN + byte[] multiHeader1Value1 = "value1".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader1Value2 = "value2".getBytes(StandardCharsets.UTF_8); + byte[] multiHeader2Value1 = "value3".getBytes(StandardCharsets.UTF_8); + String singleHeaderValue = "value4"; + + Iterable recordHeaders = List.of( + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value1), + new RecordHeader(MULTI_VALUE_HEADER1, multiHeader1Value2), + new RecordHeader(MULTI_VALUE_HEADER2, multiHeader2Value1), + new RecordHeader(SINGLE_VALUE_HEADER, singleHeaderValue.getBytes(StandardCharsets.UTF_8)) + ); + + ProducerRecord record = new ProducerRecord<>( + RETRY_TOPIC, 0, 1, "hello-value", recordHeaders); + + // WHEN + this.template.send(record); + + // THEN + assertThat(this.firstTopicListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.firstTopicListener.latchForSingleValueHeadersInFirstTry.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.firstTopicListener.latchForSingleValueHeadersInRetry.await(10, TimeUnit.SECONDS)).isTrue(); + + assertThat(this.myCustomDltProcessor.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.myCustomDltProcessor.latchForSingleValueHeadersInFirstTry.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.myCustomDltProcessor.latchForSingleValueHeadersInRetry.await(10, TimeUnit.SECONDS)).isTrue(); + + // the multi-value header cannot be converted to String type. + assertThat(firstTopicListener.multiValueHeaders1.get(0)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(1)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(2)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(3)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(4)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(5)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(6)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(7)).containsExactly(multiHeader1Value2); + assertThat(firstTopicListener.multiValueHeaders1.get(8)).containsExactly(multiHeader1Value1); + assertThat(firstTopicListener.multiValueHeaders1.get(9)).containsExactly(multiHeader1Value2); + + assertThat(firstTopicListener.multiValueHeaders2.get(0)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(1)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(2)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(3)).containsExactly(multiHeader2Value1); + assertThat(firstTopicListener.multiValueHeaders2.get(4)).containsExactly(multiHeader2Value1); + + assertThat(myCustomDltProcessor.multiValueHeaders1.get(0)).containsExactly(multiHeader1Value1); + assertThat(myCustomDltProcessor.multiValueHeaders1.get(1)).containsExactly(multiHeader1Value2); + assertThat(myCustomDltProcessor.multiValueHeaders2.get(0)).containsExactly(multiHeader2Value1); + + // the single-value header can be converted to String type. + assertThat(firstTopicListener.singleValueHeaders.get(0)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(1)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(2)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(3)).isEqualTo(singleHeaderValue); + assertThat(firstTopicListener.singleValueHeaders.get(4)).isEqualTo(singleHeaderValue); + } + + static boolean isHeadersExpected(Map headers, String headerName) { + Object value = headers.get(headerName); + if (value == null) { + return false; + } + + if (value instanceof Iterable) { + return false; + } + return true; + } + + @Configuration + @EnableKafka + public static class Config { + + @Autowired + private EmbeddedKafkaBroker broker; + + final CountDownLatch latch = new CountDownLatch(1); + + @Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) + public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() { + return new KafkaListenerEndpointRegistry(); + } + + @Bean + public KafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + + // For Test + SimpleKafkaHeaderMapper headerMapper = new SimpleKafkaHeaderMapper(); + headerMapper.addHeaderPatternsForMultiValue(MULTI_VALUE_HEADER1, MULTI_VALUE_HEADER2); + MessagingMessageConverter converter = new MessagingMessageConverter(); + converter.setHeaderMapper(headerMapper); + + factory.setConsumerFactory(consumerFactory()); + factory.setRecordMessageConverter(converter); + return factory; + } + + @Bean + public DefaultKafkaConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public Map consumerConfigs() { + Map consumerProps = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), "mutliValueGroup", "false"); + return consumerProps; + } + + @Bean + public KafkaTemplate template() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public Map producerConfigs() { + return KafkaTestUtils.producerProps(this.broker.getBrokersAsString()); + } + + @Component + @KafkaListener(topics = TEST_TOPIC) + public static class MultiValueTestListener { + + private final CountDownLatch latch = new CountDownLatch(1); + + private final CountDownLatch latchForSingleValueHeaders = new CountDownLatch(8); + + private final CountDownLatch latchForCustomSingleValueHeaders = new CountDownLatch(1); + + final List multiValueHeaders1 = new ArrayList<>(); + + final List multiValueHeaders2 = new ArrayList<>(); + + final List singleValueHeaders = new ArrayList<>(); + + @KafkaHandler + public void listen1(@Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, + @Header(SINGLE_VALUE_HEADER) String header3, + @Headers Map headers, String message) { + this.latch.countDown(); + + header1.forEach(value -> multiValueHeaders1.add(value)); + header2.forEach(value -> multiValueHeaders2.add(value)); + this.singleValueHeaders.add(header3); + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeaders.countDown(); + } + } + } + + } + + } + + @Configuration + static class RetryTopicConfigurations extends RetryTopicConfigurationSupport { + + private static final String DLT_METHOD_NAME = "processDltMessage"; + + @Bean + RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { + return RetryTopicConfigurationBuilder + .newInstance() + .fixedBackOff(50) + .maxAttempts(5) + .concurrency(1) + .useSingleTopicForSameIntervals() + .includeTopic(RETRY_TOPIC) + .doNotRetryOnDltFailure() + .dltHandlerMethod("myCustomDltProcessor", DLT_METHOD_NAME) + .create(template); + } + + @Bean + FirstTopicListener firstTopicListener() { + return new FirstTopicListener(); + } + + @Bean + MyCustomDltProcessor myCustomDltProcessor() { + return new MyCustomDltProcessor(); + } + + @Bean + TaskScheduler sched() { + return new ThreadPoolTaskScheduler(); + } + + @KafkaListener(id = "firstTopicId", topics = RETRY_TOPIC, concurrency = "2") + static class FirstTopicListener { + + private final CountDownLatch latch = new CountDownLatch(4); + + private final CountDownLatch latchForSingleValueHeadersInFirstTry = new CountDownLatch(32); + + private final CountDownLatch latchForSingleValueHeadersInRetry = new CountDownLatch(52); + + final List multiValueHeaders1 = new ArrayList<>(); + + final List multiValueHeaders2 = new ArrayList<>(); + + final List singleValueHeaders = new ArrayList<>(); + + @KafkaHandler + public void listen( + @Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, + @Header(SINGLE_VALUE_HEADER) String header3, + @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, + @Headers Map headers, String message) { + this.latch.countDown(); + + header1.forEach(multiValueHeaders1::add); + header2.forEach(multiValueHeaders2::add); + this.singleValueHeaders.add(header3); + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInFirstTry.countDown(); + } + } + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInRetry.countDown(); + } + } + throw new RuntimeException("Woooops... in topic " + message); + } + + } + + static class MyCustomDltProcessor { + + private final CountDownLatch latch = new CountDownLatch(1); + + private final CountDownLatch latchForSingleValueHeadersInFirstTry = new CountDownLatch(8); + + private final CountDownLatch latchForSingleValueHeadersInRetry = new CountDownLatch(13); + + final List multiValueHeaders1 = new ArrayList<>(); + + final List multiValueHeaders2 = new ArrayList<>(); + + public void processDltMessage( + @Header(MULTI_VALUE_HEADER1) List header1, + @Header(MULTI_VALUE_HEADER2) List header2, + @Headers Map headers, String message) { + + this.latch.countDown(); + + header1.forEach(multiValueHeaders1::add); + header2.forEach(multiValueHeaders2::add); + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_NOT_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInFirstTry.countDown(); + } + } + + for (String headerName : SHOULD_BE_SINGLE_VALUE_IN_RETRY) { + if (isHeadersExpected(headers, headerName)) { + latchForSingleValueHeadersInRetry.countDown(); + } + } + throw new RuntimeException("Woooops... in topic " + message); + } + + } + + } + +}