Skip to content

Commit

Permalink
GH-2976: Cleanup in KafkaListener infra classes
Browse files Browse the repository at this point in the history
Fixes: #2976 

Minor improvement in `DelegatingInvocableHandler` and AKLE.
  • Loading branch information
Wzy19930507 authored Jan 17, 2024
1 parent f245b59 commit ef719b5
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableListableBeanFactory) {
this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, null);
if (beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory) {
this.resolver = configurableListableBeanFactory.getBeanExpressionResolver();
this.expressionContext = new BeanExpressionContext(configurableListableBeanFactory, null);
}
this.beanResolver = new BeanFactoryResolver(beanFactory);
}
Expand Down Expand Up @@ -275,7 +275,7 @@ public void setGroup(String group) {
* @since 1.1
*/
public boolean isBatchListener() {
return this.batchListener == null ? false : this.batchListener;
return this.batchListener != null && this.batchListener;
}

/**
Expand Down Expand Up @@ -530,11 +530,10 @@ private void setupMessageListener(MessageListenerContainer container,
.acceptIfNotNull(this.correlationHeaderName, adapter::setCorrelationHeaderName);
adapter.setSplitIterables(this.splitIterables);
Object messageListener = adapter;
boolean isBatchListener = isBatchListener();
Assert.state(messageListener != null,
() -> "Endpoint [" + this + "] must provide a non null message listener");
if (this.recordFilterStrategy != null) {
if (isBatchListener) {
if (isBatchListener()) {
if (((MessagingMessageListenerAdapter<K, V>) messageListener).isConsumerRecords()) {
this.logger.warn(() -> "Filter strategy ignored when consuming 'ConsumerRecords' instead of a List"
+ (this.id != null ? " id: " + this.id : ""));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,22 +207,22 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(

MessagingMessageListenerAdapter<K, V> listener;
if (isBatchListener()) {
BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<K, V>(
BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<>(
this.bean, this.method, this.errorHandler);
BatchToRecordAdapter<K, V> batchToRecordAdapter = getBatchToRecordAdapter();
if (batchToRecordAdapter != null) {
messageListener.setBatchToRecordAdapter(batchToRecordAdapter);
}
if (messageConverter instanceof BatchMessageConverter) {
messageListener.setBatchMessageConverter((BatchMessageConverter) messageConverter);
if (messageConverter instanceof BatchMessageConverter batchMessageConverter) {
messageListener.setBatchMessageConverter(batchMessageConverter);
}
listener = messageListener;
}
else {
RecordMessagingMessageListenerAdapter<K, V> messageListener = new RecordMessagingMessageListenerAdapter<K, V>(
RecordMessagingMessageListenerAdapter<K, V> messageListener = new RecordMessagingMessageListenerAdapter<>(
this.bean, this.method, this.errorHandler);
if (messageConverter instanceof RecordMessageConverter) {
messageListener.setMessageConverter((RecordMessageConverter) messageConverter);
if (messageConverter instanceof RecordMessageConverter recordMessageConverter) {
messageListener.setMessageConverter(recordMessageConverter);
}
listener = messageListener;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,7 +71,7 @@ public void setValidator(Validator validator) {

@Override
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) {
List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<InvocableHandlerMethod>();
List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<>();
InvocableHandlerMethod defaultHandler = null;
for (Method method : this.methods) {
InvocableHandlerMethod handler = getMessageHandlerMethodFactory()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,10 +67,9 @@ public static Object buildConsumerRecordMetadataFromArray(Object... data) {
*/
@Nullable
public static ConsumerRecordMetadata buildConsumerRecordMetadata(Object data) {
if (!(data instanceof ConsumerRecord)) {
if (!(data instanceof ConsumerRecord<?, ?> record)) {
return null;
}
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) data;
return new ConsumerRecordMetadata(new RecordMetadata(new TopicPartition(record.topic(), record.partition()),
record.offset(), 0, record.timestamp(), record.serializedKeySize(),
record.serializedValueSize()), record.timestampType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
this.bean = bean;
this.resolver = beanExpressionResolver;
this.beanExpressionContext = beanExpressionContext;
this.beanFactory = beanFactory instanceof ConfigurableListableBeanFactory
? (ConfigurableListableBeanFactory) beanFactory
this.beanFactory = beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory
? configurableListableBeanFactory
: null;
this.validator = validator == null ? null : new PayloadValidator(validator);
}
Expand All @@ -124,7 +124,7 @@ private void checkSpecial(@Nullable InvocableHandlerMethod handler) {
}
Parameter[] parameters = handler.getMethod().getParameters();
for (Parameter parameter : parameters) {
if (parameter.getType().equals(ConsumerRecordMetadata.class)) {
if (ConsumerRecordMetadata.class.equals(parameter.getType())) {
this.handlerMetadataAware.put(handler, true);
return;
}
Expand All @@ -148,7 +148,7 @@ public Object getBean() {
* or the method raised an exception.
*/
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
Class<? extends Object> payloadClass = message.getPayload().getClass();
Class<?> payloadClass = message.getPayload().getClass();
InvocableHandlerMethod handler = getHandlerForPayload(payloadClass);
if (this.validator != null && this.defaultHandler != null) {
MethodParameter parameter = this.payloadMethodParameters.get(handler);
Expand All @@ -175,7 +175,7 @@ public Object invoke(Message<?> message, Object... providedArgs) throws Exceptio
* @param payloadClass the payload class.
* @return the handler.
*/
protected InvocableHandlerMethod getHandlerForPayload(Class<? extends Object> payloadClass) {
protected InvocableHandlerMethod getHandlerForPayload(Class<?> payloadClass) {
InvocableHandlerMethod handler = this.cachedHandlers.get(payloadClass);
if (handler == null) {
handler = findHandlerForPayload(payloadClass);
Expand Down Expand Up @@ -246,36 +246,32 @@ protected InvocableHandlerMethod findHandlerForPayload(Class<? extends Object> p
InvocableHandlerMethod result = null;
for (InvocableHandlerMethod handler : this.handlers) {
if (matchHandlerMethod(payloadClass, handler)) {
if (result != null) {
boolean resultIsDefault = result.equals(this.defaultHandler);
if (!handler.equals(this.defaultHandler) && !resultIsDefault) {
if (result != null && !result.equals(this.defaultHandler)) {
if (!handler.equals(this.defaultHandler)) {
throw new KafkaException("Ambiguous methods for payload type: " + payloadClass + ": " +
result.getMethod().getName() + " and " + handler.getMethod().getName());
}
if (!resultIsDefault) {
continue; // otherwise replace the result with the actual match
}
continue; // otherwise replace the result with the actual match
}
result = handler;
}
}
return result != null ? result : this.defaultHandler;
}

protected boolean matchHandlerMethod(Class<? extends Object> payloadClass, InvocableHandlerMethod handler) {
protected boolean matchHandlerMethod(Class<?> payloadClass, InvocableHandlerMethod handler) {
Method method = handler.getMethod();
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
// Single param; no annotation or not @Header
if (parameterAnnotations.length == 1) {
MethodParameter methodParameter = new MethodParameter(method, 0);
if ((methodParameter.getParameterAnnotations().length == 0
|| !methodParameter.hasParameterAnnotation(Header.class))
&& methodParameter.getParameterType().isAssignableFrom(payloadClass)) {
boolean isPayload = assignPayload(methodParameter, payloadClass);
if (isPayload) {
if (this.validator != null) {
this.payloadMethodParameters.put(handler, methodParameter);
}
return true;
}
return isPayload;
}

MethodParameter foundCandidate = findCandidate(payloadClass, method, parameterAnnotations);
Expand All @@ -285,14 +281,12 @@ protected boolean matchHandlerMethod(Class<? extends Object> payloadClass, Invoc
return foundCandidate != null;
}

private MethodParameter findCandidate(Class<? extends Object> payloadClass, Method method,
Annotation[][] parameterAnnotations) {
@Nullable
private MethodParameter findCandidate(Class<?> payloadClass, Method method, Annotation[][] parameterAnnotations) {
MethodParameter foundCandidate = null;
for (int i = 0; i < parameterAnnotations.length; i++) {
MethodParameter methodParameter = new MethodParameter(method, i);
if ((methodParameter.getParameterAnnotations().length == 0
|| !methodParameter.hasParameterAnnotation(Header.class))
&& methodParameter.getParameterType().isAssignableFrom(payloadClass)) {
if (assignPayload(methodParameter, payloadClass)) {
if (foundCandidate != null) {
throw new KafkaException("Ambiguous payload parameter for " + method.toGenericString());
}
Expand All @@ -316,15 +310,20 @@ public boolean hasDefaultHandler() {
return this.defaultHandler != null;
}

private boolean assignPayload(MethodParameter methodParameter, Class<?> payloadClass) {
return (methodParameter.getParameterAnnotations().length == 0
|| !methodParameter.hasParameterAnnotation(Header.class))
&& methodParameter.getParameterType().isAssignableFrom(payloadClass);
}

private static final class PayloadValidator extends PayloadMethodArgumentResolver {

PayloadValidator(Validator validator) {
super(new MessageConverter() { // Required but never used

@Override
@Nullable
public Message<?> toMessage(Object payload, @Nullable
MessageHeaders headers) {
public Message<?> toMessage(Object payload, @Nullable MessageHeaders headers) {
return null;
}

Expand Down

0 comments on commit ef719b5

Please sign in to comment.