-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove proxy in DefaultKafkaConsumerFactory #2822
Remove proxy in DefaultKafkaConsumerFactory #2822
Conversation
The proxy in the `DefaultKafkaConsumerFactory` for `KafkaConsumer` is created only to intercept `close()` call to remove an instance from the `listeners`. There is no need in such a proxy since simple `KafkaConsumer` class extension can handle that scenario. The reason behind this change is to avoid a `Serializable` (`java.lang.reflect.Proxy`) header in the produced message for the `KafkaConsumer` where we still fail to serialize it because other properties of the proxy are not `Serializable` * Introduce `DefaultKafkaConsumerFactory.ExtendedKafkaConsumer` to handle `listeners` interaction * The `createRawConsumer()` might be considered as breaking change since now end-user must extend this `ExtendedKafkaConsumer` to be able to handle `listeners` same way as before * Adjust `DefaultKafkaConsumerFactoryTests.listener()` test for the current code state
Fixes spring-cloud#500 When `listeners` are provided for `DefaultKafkaConsumerFactory`, the target `KafkaConsumer` instance is proxied. The `java.lang.reflect.Proxy` is `Serializable`, but the value it is wrapping is not. When the `MessageHeaders` is serialized (e.g. into persistent `MessageStore`), it checks for `Serializable` type only on top-level object of the header. Therefore, the `Proxy` is passing condition, but eventually we fail with `NotSerializableException`, since the proxied object is not like that * Remove `kafka_consumer` from a message before it reaches an aggregator with its logic to serialize message into the store This is a workaround until Spring for Apache Kafka is released with the fix: spring-projects/spring-kafka#2822
Fixes #500 When `listeners` are provided for `DefaultKafkaConsumerFactory`, the target `KafkaConsumer` instance is proxied. The `java.lang.reflect.Proxy` is `Serializable`, but the value it is wrapping is not. When the `MessageHeaders` is serialized (e.g. into persistent `MessageStore`), it checks for `Serializable` type only on top-level object of the header. Therefore, the `Proxy` is passing condition, but eventually we fail with `NotSerializableException`, since the proxied object is not like that * Remove `kafka_consumer` from a message before it reaches an aggregator with its logic to serialize message into the store This is a workaround until Spring for Apache Kafka is released with the fix: spring-projects/spring-kafka#2822
@@ -445,68 +440,68 @@ private void checkInaccessible(Properties properties, Map<String, Object> modifi | |||
} | |||
} | |||
|
|||
@SuppressWarnings("resource") | |||
protected Consumer<K, V> createKafkaConsumer(Map<String, Object> configProps) { | |||
checkBootstrap(configProps); | |||
Consumer<K, V> kafkaConsumer = createRawConsumer(configProps); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could throw an exception here, if listeners are present and the created consumer is not an instance of ExtendedKafkaConsumer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the other hand, Java doesn't support multiple inheritance (the user might already be extending some other class), so a hard failure would not be so good; perhaps log a warning that listeners must be invoked when closing a custom consumer.
Or, simply revert the code and have ExtendedKafkaConsumer
wrap the proxy instead so that the consumer is not Serializable
.
But, of course, that would mean we'd have to delegate all methods - ugh!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish we have never had this protected Consumer<K, V> createKafkaConsumer()
on our default CF
: for any custom client you always can implement your own CF
.
So, I'm just going to log a WARN that listeners are ignored because not an ExtendedKafkaConsumer
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree; makes sense; maybe also deprecate that method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would let it live for now until we really fail for some peculiar use-case to learn from.
If we deprecate and remove it (I guess there was a reason behind that method), we may never learn the mentioned use-case 😉
spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java
Outdated
Show resolved
Hide resolved
Fixes spring-cloud/stream-applications#500 When `listeners` are provided for `DefaultKafkaConsumerFactory`, the target `KafkaConsumer` instance is proxied. The `java.lang.reflect.Proxy` is `Serializable`, but the value it is wrapping is not. When the `MessageHeaders` is serialized (e.g. into persistent `MessageStore`), it checks for `Serializable` type only on top-level object of the header. Therefore, the `Proxy` is passing condition, but eventually we fail with `NotSerializableException`, since the proxied object is not like that * Remove `kafka_consumer` from a message before it reaches an aggregator with its logic to serialize message into the store This is a workaround until Spring for Apache Kafka is released with the fix: spring-projects/spring-kafka#2822
The proxy in the
DefaultKafkaConsumerFactory
forKafkaConsumer
is created only to interceptclose()
call to remove an instance from thelisteners
.There is no need in such a proxy since simple
KafkaConsumer
class extension can handle that scenario.The reason behind this change is to avoid a
Serializable
(java.lang.reflect.Proxy
) header in the produced message for theKafkaConsumer
where we still fail to serialize it because other properties of the proxy are notSerializable
DefaultKafkaConsumerFactory.ExtendedKafkaConsumer
to handlelisteners
interactioncreateRawConsumer()
might be considered as breaking change since now end-user must extend thisExtendedKafkaConsumer
to be able to handlelisteners
same way as beforeDefaultKafkaConsumerFactoryTests.listener()
test for the current code state