From 638ee013e1b742c1e0af21b565f1e49af94932da Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 4 Oct 2023 13:16:54 -0400 Subject: [PATCH] Get rid of proxy in DefaultKafkaConsumerFactory (#2822) * Get rid of proxy in DefaultKafkaConsumerFactory The proxy in the `DefaultKafkaConsumerFactory` for `KafkaConsumer` is created only to intercept `close()` call to remove an instance from the `listeners`. There is no need in such a proxy since simple `KafkaConsumer` class extension can handle that scenario. The reason behind this change is to avoid a `Serializable` (`java.lang.reflect.Proxy`) header in the produced message for the `KafkaConsumer` where we still fail to serialize it because other properties of the proxy are not `Serializable` * Introduce `DefaultKafkaConsumerFactory.ExtendedKafkaConsumer` to handle `listeners` interaction * The `createRawConsumer()` might be considered as breaking change since now end-user must extend this `ExtendedKafkaConsumer` to be able to handle `listeners` same way as before * Adjust `DefaultKafkaConsumerFactoryTests.listener()` test for the current code state * * Log warn for ignored `listeners` and not an `ExtendedKafkaConsumer` * Log polishing. --------- Co-authored-by: Gary Russell --- .../core/DefaultKafkaConsumerFactory.java | 94 +++++++++---------- .../DefaultKafkaConsumerFactoryTests.java | 35 ++----- 2 files changed, 56 insertions(+), 73 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java index 78a6e92175..69dba9da56 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.kafka.core; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; @@ -28,19 +29,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; -import org.aopalliance.aop.Advice; -import org.aopalliance.intercept.MethodInterceptor; -import org.aopalliance.intercept.MethodInvocation; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.serialization.Deserializer; -import org.springframework.aop.framework.ProxyFactory; -import org.springframework.aop.support.NameMatchMethodPointcutAdvisor; import org.springframework.beans.factory.BeanNameAware; import org.springframework.core.log.LogAccessor; import org.springframework.lang.Nullable; @@ -445,27 +440,15 @@ private void checkInaccessible(Properties properties, Map modifi } } - @SuppressWarnings("resource") protected Consumer createKafkaConsumer(Map configProps) { checkBootstrap(configProps); Consumer kafkaConsumer = createRawConsumer(configProps); - - if (this.listeners.size() > 0) { - Map metrics = kafkaConsumer.metrics(); - Iterator metricIterator = metrics.keySet().iterator(); - String clientId; - if (metricIterator.hasNext()) { - clientId = metricIterator.next().tags().get("client-id"); - } - else { - clientId = "unknown"; - } - String id = this.beanName + "." + clientId; - kafkaConsumer = createProxy(kafkaConsumer, id); - for (Listener listener : this.listeners) { - listener.consumerAdded(id, kafkaConsumer); - } + if (!this.listeners.isEmpty() && !(kafkaConsumer instanceof ExtendedKafkaConsumer)) { + LOGGER.warn("The 'ConsumerFactory.Listener' configuration is ignored " + + "because the consumer is not an instance of 'ExtendedKafkaConsumer'." + + "Consider extending 'ExtendedKafkaConsumer' or implement your own 'ConsumerFactory'."); } + for (ConsumerPostProcessor pp : this.postProcessors) { kafkaConsumer = pp.apply(kafkaConsumer); } @@ -473,40 +456,57 @@ protected Consumer createKafkaConsumer(Map configProps) { } /** - * Create a Consumer. + * Create a {@link Consumer}. + * By default, this method returns an internal {@link ExtendedKafkaConsumer} + * which is aware of provided into this {@link #listeners}, therefore it is recommended + * to extend that class if {@link #listeners} are still involved for a custom {@link Consumer}. * @param configProps the configuration properties. * @return the consumer. * @since 2.5 */ protected Consumer createRawConsumer(Map configProps) { - return new KafkaConsumer<>(configProps, this.keyDeserializerSupplier.get(), - this.valueDeserializerSupplier.get()); + return new ExtendedKafkaConsumer(configProps); } - @SuppressWarnings("unchecked") - private Consumer createProxy(Consumer kafkaConsumer, String id) { - ProxyFactory pf = new ProxyFactory(kafkaConsumer); - Advice advice = new MethodInterceptor() { + @Override + public boolean isAutoCommit() { + Object auto = this.configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + return auto instanceof Boolean + ? (Boolean) auto + : !(auto instanceof String) || Boolean.parseBoolean((String) auto); + } + + protected class ExtendedKafkaConsumer extends KafkaConsumer { + + private String idForListeners; - @Override - public Object invoke(MethodInvocation invocation) throws Throwable { - DefaultKafkaConsumerFactory.this.listeners.forEach(listener -> - listener.consumerRemoved(id, kafkaConsumer)); - return invocation.proceed(); + protected ExtendedKafkaConsumer(Map configProps) { + super(configProps, + DefaultKafkaConsumerFactory.this.keyDeserializerSupplier.get(), + DefaultKafkaConsumerFactory.this.valueDeserializerSupplier.get()); + + if (!DefaultKafkaConsumerFactory.this.listeners.isEmpty()) { + Iterator metricIterator = metrics().keySet().iterator(); + String clientId = "unknown"; + if (metricIterator.hasNext()) { + clientId = metricIterator.next().tags().get("client-id"); + } + this.idForListeners = DefaultKafkaConsumerFactory.this.beanName + "." + clientId; + for (Listener listener : DefaultKafkaConsumerFactory.this.listeners) { + listener.consumerAdded(this.idForListeners, this); + } } + } - }; - NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(advice); - advisor.addMethodName("close"); - pf.addAdvisor(advisor); - return (Consumer) pf.getProxy(); - } + @Override + public void close(Duration timeout) { + super.close(timeout); + + for (Listener listener : DefaultKafkaConsumerFactory.this.listeners) { + listener.consumerRemoved(this.idForListeners, this); + } + } - @Override - public boolean isAutoCommit() { - Object auto = this.configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); - return auto instanceof Boolean ? (Boolean) auto - : auto instanceof String ? Boolean.valueOf((String) auto) : true; } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index ab51365ca2..ae12c8e6aa 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ package org.springframework.kafka.core; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -38,14 +37,11 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.Test; import org.springframework.aop.framework.ProxyFactory; -import org.springframework.aop.support.AopUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.ConsumerFactory.Listener; @@ -63,6 +59,8 @@ /** * @author Gary Russell * @author Chris Gilbert + * @author Artem Bilan + * * @since 1.0.6 */ @EmbeddedKafka(topics = { "txCache1", "txCache2", "txCacheSendFromListener" }, @@ -345,7 +343,7 @@ public void testNestedTxProducerIsCached() throws Exception { ProxyFactory prox = new ProxyFactory(); prox.setTarget(consumer); @SuppressWarnings("unchecked") - Consumer proxy = (Consumer) prox.getProxy(); + Consumer proxy = (Consumer) prox.getProxy(); wrapped.set(proxy); return proxy; }); @@ -381,25 +379,12 @@ public void testNestedTxProducerIsCached() throws Exception { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void listener() { - Consumer consumer = mock(Consumer.class); - Map metrics = new HashMap<>(); - metrics.put(new MetricName("test", "group", "desc", Collections.singletonMap("client-id", "foo-0")), null); - given(consumer.metrics()).willReturn(metrics); - DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(Collections.emptyMap()) { - - @Override - protected Consumer createRawConsumer(Map configProps) { - return consumer; - } - - }; + Map consumerConfig = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka); + consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "foo-0"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(consumerConfig); List adds = new ArrayList<>(); List removals = new ArrayList<>(); - Consumer consum = cf.createConsumer(); - assertThat(AopUtils.isAopProxy(consum)).isFalse(); - assertThat(adds).hasSize(0); - cf.addListener(new Listener() { @Override @@ -415,13 +400,11 @@ public void consumerRemoved(String id, Consumer consumer) { }); cf.setBeanName("cf"); - consum = cf.createConsumer(); - assertThat(AopUtils.isAopProxy(consum)).isTrue(); - assertThat(AopUtils.isJdkDynamicProxy(consum)).isTrue(); + Consumer consumer = cf.createConsumer(); assertThat(adds).hasSize(1); assertThat(adds.get(0)).isEqualTo("cf.foo-0"); assertThat(removals).hasSize(0); - consum.close(); + consumer.close(); assertThat(removals).hasSize(1); }