Skip to content

Commit

Permalink
Get rid of proxy in DefaultKafkaConsumerFactory (#2822)
Browse files Browse the repository at this point in the history
* Get rid of proxy in DefaultKafkaConsumerFactory

The proxy in the `DefaultKafkaConsumerFactory` for `KafkaConsumer`
is created only to intercept `close()` call to remove an
instance from the `listeners`.
There is no need in such a proxy since simple `KafkaConsumer` class
extension can handle that scenario.

The reason behind this change is to avoid a `Serializable` (`java.lang.reflect.Proxy`) header
in the produced message for the `KafkaConsumer` where we still fail to serialize it
because other properties of the proxy are not `Serializable`

* Introduce `DefaultKafkaConsumerFactory.ExtendedKafkaConsumer`
to handle `listeners` interaction
* The `createRawConsumer()` might be considered as breaking change since now
end-user must extend this `ExtendedKafkaConsumer` to be able to handle `listeners`
same way as before
* Adjust `DefaultKafkaConsumerFactoryTests.listener()` test for the current code state

* * Log warn for ignored `listeners` and not an `ExtendedKafkaConsumer`

* Log polishing.

---------

Co-authored-by: Gary Russell <[email protected]>
  • Loading branch information
artembilan and garyrussell authored Oct 4, 2023
1 parent e95f041 commit 638ee01
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.kafka.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
Expand All @@ -28,19 +29,13 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import org.aopalliance.aop.Advice;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Deserializer;

import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -445,68 +440,73 @@ private void checkInaccessible(Properties properties, Map<String, Object> modifi
}
}

@SuppressWarnings("resource")
protected Consumer<K, V> createKafkaConsumer(Map<String, Object> configProps) {
checkBootstrap(configProps);
Consumer<K, V> kafkaConsumer = createRawConsumer(configProps);

if (this.listeners.size() > 0) {
Map<MetricName, ? extends Metric> metrics = kafkaConsumer.metrics();
Iterator<MetricName> metricIterator = metrics.keySet().iterator();
String clientId;
if (metricIterator.hasNext()) {
clientId = metricIterator.next().tags().get("client-id");
}
else {
clientId = "unknown";
}
String id = this.beanName + "." + clientId;
kafkaConsumer = createProxy(kafkaConsumer, id);
for (Listener<K, V> listener : this.listeners) {
listener.consumerAdded(id, kafkaConsumer);
}
if (!this.listeners.isEmpty() && !(kafkaConsumer instanceof ExtendedKafkaConsumer)) {
LOGGER.warn("The 'ConsumerFactory.Listener' configuration is ignored " +
"because the consumer is not an instance of 'ExtendedKafkaConsumer'." +
"Consider extending 'ExtendedKafkaConsumer' or implement your own 'ConsumerFactory'.");
}

for (ConsumerPostProcessor<K, V> pp : this.postProcessors) {
kafkaConsumer = pp.apply(kafkaConsumer);
}
return kafkaConsumer;
}

/**
* Create a Consumer.
* Create a {@link Consumer}.
* By default, this method returns an internal {@link ExtendedKafkaConsumer}
* which is aware of provided into this {@link #listeners}, therefore it is recommended
* to extend that class if {@link #listeners} are still involved for a custom {@link Consumer}.
* @param configProps the configuration properties.
* @return the consumer.
* @since 2.5
*/
protected Consumer<K, V> createRawConsumer(Map<String, Object> configProps) {
return new KafkaConsumer<>(configProps, this.keyDeserializerSupplier.get(),
this.valueDeserializerSupplier.get());
return new ExtendedKafkaConsumer(configProps);
}

@SuppressWarnings("unchecked")
private Consumer<K, V> createProxy(Consumer<K, V> kafkaConsumer, String id) {
ProxyFactory pf = new ProxyFactory(kafkaConsumer);
Advice advice = new MethodInterceptor() {
@Override
public boolean isAutoCommit() {
Object auto = this.configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
return auto instanceof Boolean
? (Boolean) auto
: !(auto instanceof String) || Boolean.parseBoolean((String) auto);
}

protected class ExtendedKafkaConsumer extends KafkaConsumer<K, V> {

private String idForListeners;

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
DefaultKafkaConsumerFactory.this.listeners.forEach(listener ->
listener.consumerRemoved(id, kafkaConsumer));
return invocation.proceed();
protected ExtendedKafkaConsumer(Map<String, Object> configProps) {
super(configProps,
DefaultKafkaConsumerFactory.this.keyDeserializerSupplier.get(),
DefaultKafkaConsumerFactory.this.valueDeserializerSupplier.get());

if (!DefaultKafkaConsumerFactory.this.listeners.isEmpty()) {
Iterator<MetricName> metricIterator = metrics().keySet().iterator();
String clientId = "unknown";
if (metricIterator.hasNext()) {
clientId = metricIterator.next().tags().get("client-id");
}
this.idForListeners = DefaultKafkaConsumerFactory.this.beanName + "." + clientId;
for (Listener<K, V> listener : DefaultKafkaConsumerFactory.this.listeners) {
listener.consumerAdded(this.idForListeners, this);
}
}
}

};
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(advice);
advisor.addMethodName("close");
pf.addAdvisor(advisor);
return (Consumer<K, V>) pf.getProxy();
}
@Override
public void close(Duration timeout) {
super.close(timeout);

for (Listener<K, V> listener : DefaultKafkaConsumerFactory.this.listeners) {
listener.consumerRemoved(this.idForListeners, this);
}
}

@Override
public boolean isAutoCommit() {
Object auto = this.configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
return auto instanceof Boolean ? (Boolean) auto
: auto instanceof String ? Boolean.valueOf((String) auto) : true;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@
package org.springframework.kafka.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

Expand All @@ -38,14 +37,11 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;

import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory.Listener;
Expand All @@ -63,6 +59,8 @@
/**
* @author Gary Russell
* @author Chris Gilbert
* @author Artem Bilan
*
* @since 1.0.6
*/
@EmbeddedKafka(topics = { "txCache1", "txCache2", "txCacheSendFromListener" },
Expand Down Expand Up @@ -345,7 +343,7 @@ public void testNestedTxProducerIsCached() throws Exception {
ProxyFactory prox = new ProxyFactory();
prox.setTarget(consumer);
@SuppressWarnings("unchecked")
Consumer<Integer, String> proxy = (Consumer<Integer, String>) prox.getProxy();
Consumer<Integer, String> proxy = (Consumer<Integer, String>) prox.getProxy();
wrapped.set(proxy);
return proxy;
});
Expand Down Expand Up @@ -381,25 +379,12 @@ public void testNestedTxProducerIsCached() throws Exception {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void listener() {
Consumer consumer = mock(Consumer.class);
Map<MetricName, ? extends Metric> metrics = new HashMap<>();
metrics.put(new MetricName("test", "group", "desc", Collections.singletonMap("client-id", "foo-0")), null);
given(consumer.metrics()).willReturn(metrics);
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(Collections.emptyMap()) {

@Override
protected Consumer createRawConsumer(Map configProps) {
return consumer;
}

};
Map<String, Object> consumerConfig = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "foo-0");
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(consumerConfig);
List<String> adds = new ArrayList<>();
List<String> removals = new ArrayList<>();

Consumer consum = cf.createConsumer();
assertThat(AopUtils.isAopProxy(consum)).isFalse();
assertThat(adds).hasSize(0);

cf.addListener(new Listener() {

@Override
Expand All @@ -415,13 +400,11 @@ public void consumerRemoved(String id, Consumer consumer) {
});
cf.setBeanName("cf");

consum = cf.createConsumer();
assertThat(AopUtils.isAopProxy(consum)).isTrue();
assertThat(AopUtils.isJdkDynamicProxy(consum)).isTrue();
Consumer consumer = cf.createConsumer();
assertThat(adds).hasSize(1);
assertThat(adds.get(0)).isEqualTo("cf.foo-0");
assertThat(removals).hasSize(0);
consum.close();
consumer.close();
assertThat(removals).hasSize(1);
}

Expand Down

0 comments on commit 638ee01

Please sign in to comment.