Skip to content

Commit

Permalink
Add NonNullApi into 'annotation' package
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanQingyangXu authored Jan 17, 2024
1 parent ef719b5 commit 474651d
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@

/**
* Annotation to determine the method that should process the DLT topic message.
* The method can have the same parameters as a {@link KafkaListener} method can (Message, Acknowledgement, etc).
* The method can have the same parameters as a {@link KafkaListener} method can have (Message, Acknowledgement, etc).
*
* <p>
* The annotated method must be in the same class as the corresponding {@link KafkaListener} annotation.
*
* @author Tomaz Fernandes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,6 +55,7 @@
* bean. This annotation is meta-annotated with {@code @EnableKafka} so it is not
* necessary to specify both.
*
* <p>
* To configure the feature's components, extend the
* {@link RetryTopicConfigurationSupport} class and override the appropriate methods on a
* {@link Configuration @Configuration} class, such as:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.springframework.core.MethodParameter;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
Expand Down Expand Up @@ -59,7 +60,7 @@ public Object resolveArgument(MethodParameter parameter, Message<?> message) thr
}

@Override
protected boolean isEmptyPayload(Object payload) {
protected boolean isEmptyPayload(@Nullable Object payload) {
return payload == null || payload instanceof KafkaNull;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.springframework.core.annotation.RepeatableContainers;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.lang.Nullable;


/**
Expand All @@ -42,9 +43,11 @@
* one from a {@link RetryableTopic} annotation, or from the bean container if no
* annotation is available.
*
* <p>
* If beans are found in the container there's a check to determine whether or not the
* provided topics should be handled by any of such instances.
*
* <p>
* If the annotation is provided, a
* {@link org.springframework.kafka.annotation.DltHandler} annotated method is looked up.
*
Expand All @@ -58,10 +61,13 @@
*/
public class RetryTopicConfigurationProvider {

@Nullable
private final BeanFactory beanFactory;

@Nullable
private final BeanExpressionResolver resolver;

@Nullable
private final BeanExpressionContext expressionContext;

private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(RetryTopicConfigurationProvider.class));
Expand All @@ -71,7 +77,7 @@ public class RetryTopicConfigurationProvider {
* expression context.
* @param beanFactory the bean factory.
*/
public RetryTopicConfigurationProvider(BeanFactory beanFactory) {
public RetryTopicConfigurationProvider(@Nullable BeanFactory beanFactory) {
this(beanFactory, new StandardBeanExpressionResolver(), beanFactory instanceof ConfigurableBeanFactory
? new BeanExpressionContext((ConfigurableBeanFactory) beanFactory, null)
: null); // NOSONAR
Expand All @@ -83,13 +89,14 @@ public RetryTopicConfigurationProvider(BeanFactory beanFactory) {
* @param resolver the bean expression resolver.
* @param expressionContext the bean expression context.
*/
public RetryTopicConfigurationProvider(BeanFactory beanFactory, BeanExpressionResolver resolver,
BeanExpressionContext expressionContext) {
public RetryTopicConfigurationProvider(@Nullable BeanFactory beanFactory, @Nullable BeanExpressionResolver resolver,
@Nullable BeanExpressionContext expressionContext) {

this.beanFactory = beanFactory;
this.resolver = resolver;
this.expressionContext = expressionContext;
}
@Nullable
public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, Method method, Object bean) {
RetryableTopic annotation = MergedAnnotations.from(method, SearchStrategy.TYPE_HIERARCHY,
RepeatableContainers.none())
Expand All @@ -102,6 +109,7 @@ public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, Method
: maybeGetFromContext(topics);
}

@Nullable
private RetryTopicConfiguration maybeGetFromContext(String[] topics) {
if (this.beanFactory == null || !ListableBeanFactory.class.isAssignableFrom(this.beanFactory.getClass())) {
LOGGER.warn("No ListableBeanFactory found, skipping RetryTopic configuration.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanInitializationException;
Expand All @@ -38,6 +40,7 @@
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
import org.springframework.kafka.retrytopic.RetryTopicConstants;
import org.springframework.kafka.support.EndpointHandlerMethod;
import org.springframework.lang.Nullable;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.backoff.ExponentialRandomBackOffPolicy;
Expand Down Expand Up @@ -70,10 +73,13 @@ public class RetryableTopicAnnotationProcessor {

private static final String CSQ_FOR_OSQ = "] for [";

@Nullable
private final BeanFactory beanFactory;

@Nullable
private final BeanExpressionResolver resolver;

@Nullable
private final BeanExpressionContext expressionContext;

/**
Expand All @@ -93,8 +99,8 @@ public RetryableTopicAnnotationProcessor(BeanFactory beanFactory) {
* @param resolver the bean expression resolver.
* @param expressionContext the bean expression context.
*/
public RetryableTopicAnnotationProcessor(BeanFactory beanFactory, BeanExpressionResolver resolver,
BeanExpressionContext expressionContext) {
public RetryableTopicAnnotationProcessor(@Nullable BeanFactory beanFactory, @Nullable BeanExpressionResolver resolver,
@Nullable BeanExpressionContext expressionContext) {

this.beanFactory = beanFactory;
this.resolver = resolver;
Expand All @@ -116,12 +122,10 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
boolean traverse = false;
if (StringUtils.hasText(annotation.traversingCauses())) {
Boolean traverseResolved = resolveExpressionAsBoolean(annotation.traversingCauses(), "traversingCauses");
if (traverseResolved != null) {
traverse = traverseResolved;
}
else {
traverse = !includes.isEmpty() || !excludes.isEmpty();
}
traverse = Objects.requireNonNullElseGet(
traverseResolved,
() -> !includes.isEmpty() || !excludes.isEmpty()
);
}
Boolean autoStartDlt = null;
if (StringUtils.hasText(annotation.autoStartDltHandler())) {
Expand Down Expand Up @@ -157,9 +161,11 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
return builder.create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics));
}

private SleepingBackOffPolicy<?> createBackoffFromAnnotation(Backoff backoff, BeanFactory beanFactory) { // NOSONAR
private SleepingBackOffPolicy<?> createBackoffFromAnnotation(Backoff backoff, @Nullable BeanFactory beanFactory) { // NOSONAR
StandardEvaluationContext evaluationContext = new StandardEvaluationContext();
evaluationContext.setBeanResolver(new BeanFactoryResolver(beanFactory));
if (beanFactory != null) {
evaluationContext.setBeanResolver(new BeanFactoryResolver(beanFactory));
}

// Code from Spring Retry
Long min = backoff.delay() == 0 ? backoff.value() : backoff.delay();
Expand Down Expand Up @@ -210,7 +216,7 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
.orElse(RetryTopicConfigurer.DEFAULT_DLT_HANDLER);
}

private KafkaOperations<?, ?> getKafkaTemplate(String kafkaTemplateName, String[] topics) {
private KafkaOperations<?, ?> getKafkaTemplate(@Nullable String kafkaTemplateName, String[] topics) {
if (StringUtils.hasText(kafkaTemplateName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain kafka template by bean name");
try {
Expand All @@ -236,6 +242,7 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
}
}

@Nullable
private String resolveExpressionAsString(String value, String attribute) {
Object resolved = resolveExpression(value);
if (resolved instanceof String str) {
Expand All @@ -248,6 +255,7 @@ else if (resolved != null) {
return null;
}

@Nullable
private Integer resolveExpressionAsInteger(String value, String attribute, boolean required) {
Object resolved = resolveExpression(value);
Integer result = null;
Expand All @@ -268,6 +276,8 @@ else if (resolved != null || required) {
return result;
}

@SuppressWarnings("SameParameterValue")
@Nullable
private Short resolveExpressionAsShort(String value, String attribute, boolean required) {
Object resolved = resolveExpression(value);
Short result = null;
Expand All @@ -288,6 +298,7 @@ else if (resolved != null || required) {
return result;
}

@Nullable
private Long resolveExpressionAsLong(String value, String attribute, boolean required) {
Object resolved = resolveExpression(value);
Long result = null;
Expand All @@ -308,6 +319,8 @@ else if (resolved != null || required) {
return result;
}

@SuppressWarnings("SameParameterValue")
@Nullable
private Double resolveExpressionAsDouble(String value, String attribute, boolean required) {
Object resolved = resolveExpression(value);
Double result = null;
Expand All @@ -328,6 +341,7 @@ else if (resolved != null || required) {
return result;
}

@Nullable
private Boolean resolveExpressionAsBoolean(String value, String attribute) {
Object resolved = resolveExpression(value);
Boolean result = null;
Expand All @@ -349,7 +363,8 @@ else if (resolved != null) {
private List<Class<? extends Throwable>> resolveClasses(Class<? extends Throwable>[] fromAnnot, String[] names,
String type) {

List<Class<? extends Throwable>> classes = new ArrayList<>(Arrays.asList(fromAnnot));
List<Class<? extends Throwable>> classes = new ArrayList<>(fromAnnot.length + names.length);
Collections.addAll(classes, fromAnnot);
try {
for (String name : names) {
Class<?> clazz = ClassUtils.forName(name, ClassUtils.getDefaultClassLoader());
Expand All @@ -365,8 +380,9 @@ private List<Class<? extends Throwable>> resolveClasses(Class<? extends Throwabl
return classes;
}

@Nullable
private Object resolveExpression(String value) {
if (this.expressionContext != null) {
if (this.expressionContext != null && this.resolver != null) {
String resolved = resolve(value);
return this.resolver.evaluate(resolved, this.expressionContext);
}
Expand All @@ -375,6 +391,7 @@ private Object resolveExpression(String value) {
}
}

@Nullable
private String resolve(String value) {
if (this.beanFactory instanceof ConfigurableBeanFactory cbf) {
return cbf.resolveEmbeddedValue(value);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/**
* Package for kafka annotations
*/
@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
package org.springframework.kafka.annotation;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.lang.Nullable;

/**
* Provide the component instances that will be used with
Expand Down Expand Up @@ -154,7 +155,7 @@ public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
* @param applicationContext the application context.
* @return the instance.
*/
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry,
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(@Nullable ListenerContainerRegistry registry,
ApplicationContext applicationContext) {

return new ContainerPartitionPausingBackOffManagerFactory(registry, applicationContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -310,7 +310,7 @@ protected Consumer<DestinationTopicResolver> configureDestinationTopicResolver()
@Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME)
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContext applicationContext,
@Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
ListenerContainerRegistry registry,
@Nullable ListenerContainerRegistry registry,
ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider,
@Nullable RetryTopicSchedulerWrapper wrapper,
@Nullable TaskScheduler taskScheduler) {
Expand All @@ -325,7 +325,7 @@ public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContex
}

private void configurePartitionPausingFactory(ContainerPartitionPausingBackOffManagerFactory factory,
ListenerContainerRegistry registry, @Nullable TaskScheduler scheduler) {
@Nullable ListenerContainerRegistry registry, @Nullable TaskScheduler scheduler) {

Assert.notNull(scheduler, "Either a RetryTopicSchedulerWrapper or TaskScheduler bean is required");
factory.setBackOffHandler(new ContainerPausingBackOffHandler(
Expand All @@ -346,8 +346,10 @@ protected RetryTopicComponentFactory createComponentFactory() {
*/
public static class BlockingRetriesConfigurer {

@Nullable
private BackOff backOff;

@Nullable
private Class<? extends Exception>[] retryableExceptions;

/**
Expand Down Expand Up @@ -378,10 +380,12 @@ public BlockingRetriesConfigurer backOff(BackOff backoff) {
return this;
}

@Nullable
BackOff getBackOff() {
return this.backOff;
}

@Nullable
Class<? extends Exception>[] getRetryableExceptions() {
return this.retryableExceptions;
}
Expand All @@ -393,10 +397,13 @@ Class<? extends Exception>[] getRetryableExceptions() {
*/
public static class CustomizersConfigurer {

@Nullable
private Consumer<DefaultErrorHandler> errorHandlerCustomizer;

@Nullable
private Consumer<ConcurrentMessageListenerContainer<?, ?>> listenerContainerCustomizer;

@Nullable
private Consumer<DeadLetterPublishingRecoverer> deadLetterPublishingRecovererCustomizer;

/**
Expand All @@ -406,6 +413,7 @@ public static class CustomizersConfigurer {
* @return the configurer.
* @see DefaultErrorHandler
*/
@SuppressWarnings("unused")
public CustomizersConfigurer customizeErrorHandler(Consumer<DefaultErrorHandler> errorHandlerCustomizer) {
this.errorHandlerCustomizer = errorHandlerCustomizer;
return this;
Expand Down Expand Up @@ -433,14 +441,17 @@ public CustomizersConfigurer customizeDeadLetterPublishingRecoverer(Consumer<Dea
return this;
}

@Nullable
Consumer<DefaultErrorHandler> getErrorHandlerCustomizer() {
return this.errorHandlerCustomizer;
}

@Nullable
Consumer<ConcurrentMessageListenerContainer<?, ?>> getListenerContainerCustomizer() {
return this.listenerContainerCustomizer;
}

@Nullable
Consumer<DeadLetterPublishingRecoverer> getDeadLetterPublishingRecovererCustomizer() {
return this.deadLetterPublishingRecovererCustomizer;
}
Expand Down

0 comments on commit 474651d

Please sign in to comment.