Skip to content

GH-3067: Spring Kafka support multiple headers with same key. #3874

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

chickenchickenlove
Copy link
Contributor

@chickenchickenlove chickenchickenlove commented Apr 27, 2025

I know that there is an existing PR(#3101) related to this issue, but it seems to have been suspended for long time.
Therefore, I created a new PR to address it.
If you are planning to maintain the existing one, I am happy to close mine.

I approached the problem differently from the existing PR and proposed an alternative solution.
For backward compatibility, user can configure MultiValueKafkaHeaderMapper to KafkaListenerContainerFactory.
For example,

ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
					new ConcurrentKafkaListenerContainerFactory<>();

MultiValueKafkaHeaderMapper headerMapper = new MultiValueKafkaHeaderMapper();

// Create new header mapper.
headerMapper.addSingleValueHeader(SINGLE_VALUE_HEADER);
MessagingMessageConverter converter = new MessagingMessageConverter(headerMapper);

// Set header mapper.
factory.setConsumerFactory(consumerFactory());

@chickenchickenlove chickenchickenlove changed the title spring-projectsGH-3067: Spring Kafka support multiple headers with sa… GH-3067: Spring Kafka support multiple headers with sa… Apr 28, 2025
@chickenchickenlove chickenchickenlove changed the title GH-3067: Spring Kafka support multiple headers with sa… GH-3067: Spring Kafka support multiple headers with same key. Apr 28, 2025
@sobychacko
Copy link
Contributor

@chickenchickenlove We were planning to resuscitate the other PR before the next milestone (4.0.0-M3) along with some other refactorings, so please stay tuned for that. Then we can compare your approach in this PR, but I will try first to rebase that PR on top of the latest main and see how it goes.

@chickenchickenlove
Copy link
Contributor Author

@sobychacko Thanks for your time!!
I didn't get your point clearly.
From what I understand, this PR is no longer needed since you will be taking on a new task.
Could you please confirm if my understanding is correct?

@sobychacko
Copy link
Contributor

Yes, we will need to start from the other PR first. But we will keep this open to compare.

* @since 4.0.0
*
*/
public class MultiValueKafkaHeaderMapper extends DefaultKafkaHeaderMapper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that I agree about extra abstraction on the matter.
I think the existing HeaderMapper must support iterables for end-user headers.
So, we should expect some pattern for those custom headers which we can treat as iterable.
And I think that one should be on the in (from Kafka) only.
If we produce and some header is a collection, then populate it into Kafka headers as is.
On the other side, for convenience, we still should support first value mapping by default.
And map only those to collection values, which we have opted-in.

Make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artembilan
There are parts that I understand and parts that I don't.
It's been a while since I last worked with Kafka in my work, so I might be misunderstanding some aspects.
Please keep that in mind as you read this.

How can Spring Kafka anticipate a custom header pattern that is iterable?
Even if Spring Kafka expects a custom header to be iterable, users might always expect the header to be a single value.
What I mean is, users may have already written their code accordingly, like @Header("blahblah") String value.

To handle such cases, I think it would be necessary to add a public method that allows users to configure the pattern for iterable custom headers as a workaround.

If the headers that should be treated as iterable are managed with a whitelist approach,
I believe DefaultKafkaHeaderMapper could support iterable headers without requiring additional abstraction as you mentioned before.

I believe that by moving part of the logic from the MultiValueKafkaHeaderMapper class which I implement to DefaultKafkaHeaderMapper and simply adding a whitelist field,
spring kafka can support multi-header values while maintaining backward compatibility.

Overall, it makes sense to me 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the @Header("blahblah") String value should work. That annotation processor is able to extract single value from an iterable header.

Yes, I would expect some configuration property on the DefaultKafkaHeaderMapper to opt-in to those Kafka headers which have to be mapped to Message header as List.
We cannot use white and black words though. So, we need to come up with other name. Like headerAsListPattern. Or even vararg String... headerAsListPatterns. Where pattern has to follow org.springframework.util.PatternMatchUtils expectations.

On the mapping from Message to Kafka there is no need to think about pattern.
The trigger is simple: if Message is an iterable, then we put it into multi-header in Kafka record.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Spring Kafka does not allow multiple headers with same key
3 participants