diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/PollableAmqpChannel.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/PollableAmqpChannel.java index f23e9907f7..12fd0e81bf 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/PollableAmqpChannel.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/PollableAmqpChannel.java @@ -322,7 +322,7 @@ public boolean removeInterceptor(ChannelInterceptor interceptor) { } @Override - public @Nullable ChannelInterceptor removeInterceptor(int index) { + public ChannelInterceptor removeInterceptor(int index) { ChannelInterceptor interceptor = super.removeInterceptor(index); if (interceptor instanceof ExecutorChannelInterceptor) { this.executorInterceptorsSize--; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aot/CoreRuntimeHints.java b/spring-integration-core/src/main/java/org/springframework/integration/aot/CoreRuntimeHints.java index b10153465a..0367dc7aac 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aot/CoreRuntimeHints.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aot/CoreRuntimeHints.java @@ -26,6 +26,8 @@ import java.util.function.Supplier; import java.util.stream.Stream; +import org.jspecify.annotations.Nullable; + import org.springframework.aop.framework.AopProxyUtils; import org.springframework.aot.hint.ExecutableMode; import org.springframework.aot.hint.MemberCategory; @@ -79,7 +81,7 @@ class CoreRuntimeHints implements RuntimeHintsRegistrar { @Override - public void registerHints(RuntimeHints hints, ClassLoader classLoader) { + public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader) { ReflectionHints reflectionHints = hints.reflection(); Stream.of( GenericSelector.class, diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aot/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/aot/package-info.java index 7949a5e0ad..c3ae56e083 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aot/package-info.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aot/package-info.java @@ -1,6 +1,5 @@ /** * Provides classes to support Spring AOT. */ -@org.springframework.lang.NonNullApi -@org.springframework.lang.NonNullFields +@org.jspecify.annotations.NullMarked package org.springframework.integration.aot; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractExecutorChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractExecutorChannel.java index 2211fbdf92..40417801b1 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractExecutorChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractExecutorChannel.java @@ -59,13 +59,15 @@ public abstract class AbstractExecutorChannel extends AbstractSubscribableChannel implements ExecutorChannelInterceptorAware { - protected Executor executor; // NOSONAR + protected @Nullable Executor executor; - protected AbstractDispatcher dispatcher; // NOSONAR + @SuppressWarnings("NullAway.Init") + protected AbstractDispatcher dispatcher; - protected Integer maxSubscribers; // NOSONAR + @Nullable + protected Integer maxSubscribers; - protected int executorInterceptorsSize; // NOSONAR + protected int executorInterceptorsSize; public AbstractExecutorChannel(@Nullable Executor executor) { this.executor = executor; @@ -117,7 +119,6 @@ public boolean removeInterceptor(ChannelInterceptor interceptor) { } @Override - @Nullable public ChannelInterceptor removeInterceptor(int index) { ChannelInterceptor interceptor = super.removeInterceptor(index); if (interceptor instanceof ExecutorChannelInterceptor) { @@ -167,7 +168,7 @@ public void run() { if (!CollectionUtils.isEmpty(interceptorStack)) { triggerAfterMessageHandled(message, ex, interceptorStack); } - if (ex instanceof MessagingException) { // NOSONAR + if (ex instanceof MessagingException) { throw new MessagingExceptionWrapper(message, (MessagingException) ex); } String description = "Failed to handle " + message + " to " + this + " in " + messageHandler; @@ -195,7 +196,7 @@ private Message applyBeforeHandle(Message message, Deque executorInterceptor.getClass().getSimpleName() + " returned null from beforeHandle, i.e. precluding the send."); } - triggerAfterMessageHandled(null, null, interceptorStack); + triggerAfterMessageHandled(message, null, interceptorStack); return null; } interceptorStack.add(executorInterceptor); @@ -204,13 +205,13 @@ private Message applyBeforeHandle(Message message, Deque message, @Nullable Exception ex, + private void triggerAfterMessageHandled(Message message, @Nullable Exception ex, Deque interceptorStack) { Iterator iterator = interceptorStack.descendingIterator(); while (iterator.hasNext()) { ExecutorChannelInterceptor interceptor = iterator.next(); try { - interceptor.afterMessageHandled(message, AbstractExecutorChannel.this, //NOSONAR + interceptor.afterMessageHandled(message, AbstractExecutorChannel.this, this.delegate.getMessageHandler(), ex); } catch (Throwable ex2) { //NOSONAR diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java index fa1d2124e6..a8f09d0240 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java @@ -95,28 +95,27 @@ public abstract class AbstractMessageChannel extends IntegrationObjectSupport private ObservationRegistry observationRegistry = ObservationRegistry.NOOP; - @Nullable - private MessageSenderObservationConvention observationConvention; + private @Nullable MessageSenderObservationConvention observationConvention; private boolean shouldTrack = false; private Class[] datatypes = new Class[0]; - private MessageConverter messageConverter; + private @Nullable MessageConverter messageConverter; private boolean loggingEnabled = true; - private MetricsCaptor metricsCaptor; + private @Nullable MetricsCaptor metricsCaptor; - private TimerFacade successTimer; + private @Nullable TimerFacade successTimer; - private TimerFacade failureTimer; + private @Nullable TimerFacade failureTimer; - private volatile String fullChannelName; + private volatile @Nullable String fullChannelName; private volatile boolean applicationRunning; - private volatile Lifecycle applicationRunningController; + private volatile @Nullable Lifecycle applicationRunningController; @Override public String getComponentType() { @@ -235,7 +234,6 @@ public boolean removeInterceptor(ChannelInterceptor interceptor) { } @Override - @Nullable public ChannelInterceptor removeInterceptor(int index) { return this.interceptors.remove(index); } @@ -391,16 +389,19 @@ private boolean sendWithObservation(Message message, long timeout) { Boolean observe = observation.observe(() -> { Message messageToSendInternal = messageToSend; if (message instanceof ErrorMessage errorMessage) { + Message originalMessage = errorMessage.getOriginalMessage(); messageToSendInternal = - new ErrorMessage(errorMessage.getPayload(), + (originalMessage != null) ? new ErrorMessage(errorMessage.getPayload(), messageToSend.getHeaders(), - errorMessage.getOriginalMessage()); + originalMessage) : new ErrorMessage(errorMessage.getPayload(), + messageToSend.getHeaders()); } return sendInternal(messageToSendInternal, timeout); }); return Boolean.TRUE.equals(observe); } + @SuppressWarnings("NullAway") // Dataflow analysis limitation private boolean sendWithMetrics(Message message, long timeout) { SampleFacade sample = this.metricsCaptor.start(); try { @@ -468,6 +469,7 @@ private TimerFacade sendTimer(boolean sent) { } } + @SuppressWarnings("NullAway") // Dataflow analysis limitation private TimerFacade buildSendTimer(boolean success, String exception) { TimerFacade timer = this.metricsCaptor.timerBuilder(SEND_TIMER_NAME) .tag("type", "channel") @@ -676,12 +678,9 @@ public boolean remove(ChannelInterceptor interceptor) { } } - @Nullable public ChannelInterceptor remove(int index) { ChannelInterceptor removed = this.interceptors.remove(index); - if (removed != null) { - this.size--; - } + this.size--; return removed; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractPollableChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractPollableChannel.java index 393e65891e..cf795a032e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractPollableChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractPollableChannel.java @@ -45,7 +45,7 @@ public abstract class AbstractPollableChannel extends AbstractMessageChannel private int executorInterceptorsSize; - private CounterFacade receiveCounter; + private @Nullable CounterFacade receiveCounter; @Override public IntegrationPatternType getIntegrationPatternType() { @@ -186,7 +186,6 @@ public boolean removeInterceptor(ChannelInterceptor interceptor) { } @Override - @Nullable public ChannelInterceptor removeInterceptor(int index) { ChannelInterceptor interceptor = super.removeInterceptor(index); if (interceptor instanceof ExecutorChannelInterceptor) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/ChannelPurger.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/ChannelPurger.java index f301e5f0b0..81fc63440b 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/ChannelPurger.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/ChannelPurger.java @@ -48,7 +48,7 @@ public class ChannelPurger { private final QueueChannel[] channels; - private final MessageSelector selector; + private final @Nullable MessageSelector selector; public ChannelPurger(QueueChannel... channels) { this(null, channels); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/DefaultHeaderChannelRegistry.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/DefaultHeaderChannelRegistry.java index bec75c9269..f3d739dd82 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/DefaultHeaderChannelRegistry.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/DefaultHeaderChannelRegistry.java @@ -67,7 +67,7 @@ public class DefaultHeaderChannelRegistry extends IntegrationObjectSupport private long reaperDelay; - private volatile ScheduledFuture reaperScheduledFuture; + private volatile @Nullable ScheduledFuture reaperScheduledFuture; private volatile boolean running; @@ -147,8 +147,9 @@ public void stop() { this.lock.lock(); try { this.running = false; - if (this.reaperScheduledFuture != null) { - this.reaperScheduledFuture.cancel(true); + ScheduledFuture reaperScheduledFutureToCancel = this.reaperScheduledFuture; + if (reaperScheduledFutureToCancel != null) { + reaperScheduledFutureToCancel.cancel(true); this.reaperScheduledFuture = null; } this.explicitlyStopped = true; @@ -221,8 +222,9 @@ public MessageChannel channelNameToChannel(@Nullable String name) { public void runReaper() { this.lock.lock(); try { - if (this.reaperScheduledFuture != null) { - this.reaperScheduledFuture.cancel(true); + ScheduledFuture reaperScheduledFutureToCancel = this.reaperScheduledFuture; + if (reaperScheduledFutureToCancel != null) { + reaperScheduledFutureToCancel.cancel(true); this.reaperScheduledFuture = null; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/DirectChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/DirectChannel.java index a3c803ae3b..1a85a352f7 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/DirectChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/DirectChannel.java @@ -39,7 +39,7 @@ public class DirectChannel extends AbstractSubscribableChannel { private final UnicastingDispatcher dispatcher = new UnicastingDispatcher(); - private volatile Integer maxSubscribers; + private volatile @Nullable Integer maxSubscribers; /** * Create a channel with default {@link RoundRobinLoadBalancingStrategy}. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/ExecutorChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/ExecutorChannel.java index 4c8e711128..a3e14698be 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/ExecutorChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/ExecutorChannel.java @@ -49,7 +49,7 @@ */ public class ExecutorChannel extends AbstractExecutorChannel { - private final LoadBalancingStrategy loadBalancingStrategy; + private final @Nullable LoadBalancingStrategy loadBalancingStrategy; private Predicate failoverStrategy = (exception) -> true; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/FixedSubscriberChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/FixedSubscriberChannel.java index 1dd0b0d77c..acb2d27302 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/FixedSubscriberChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/FixedSubscriberChannel.java @@ -45,6 +45,7 @@ public final class FixedSubscriberChannel implements SubscribableChannel, BeanNa private final MessageHandler handler; + @SuppressWarnings("NullAway.Init") private String beanName; public FixedSubscriberChannel() { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java index 718a922630..624a27c160 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java @@ -45,9 +45,11 @@ public class MessagePublishingErrorHandler extends ErrorMessagePublisher impleme private static final int DEFAULT_SEND_TIMEOUT = 1000; + @SuppressWarnings("NullAway") // Dataflow analysis limitation private static final ErrorMessageStrategy DEFAULT_ERROR_MESSAGE_STRATEGY = (ex, attrs) -> { - if (ex instanceof MessagingExceptionWrapper) { - return new ErrorMessage(ex.getCause(), ((MessagingExceptionWrapper) ex).getFailedMessage()); + if (ex instanceof MessagingExceptionWrapper messagingExceptionWrapper) { + return new ErrorMessage(messagingExceptionWrapper.getCause(), + messagingExceptionWrapper.getFailedMessage()); } else { return new ErrorMessage(ex); @@ -66,7 +68,7 @@ public MessagePublishingErrorHandler(DestinationResolver channel setChannelResolver(channelResolver); } - public void setDefaultErrorChannel(@Nullable MessageChannel defaultErrorChannel) { + public void setDefaultErrorChannel(MessageChannel defaultErrorChannel) { setChannel(defaultErrorChannel); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/NullChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/NullChannel.java index a311f5108d..8e6a00e0ea 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/NullChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/NullChannel.java @@ -60,13 +60,14 @@ public class NullChannel implements PollableChannel, private boolean loggingEnabled = true; + @SuppressWarnings("NullAway.Init") private String beanName; - private MetricsCaptor metricsCaptor; + private @Nullable MetricsCaptor metricsCaptor; - private TimerFacade successTimer; + private @Nullable TimerFacade successTimer; - private CounterFacade receiveCounter; + private @Nullable CounterFacade receiveCounter; @Override public void setBeanName(String beanName) { @@ -161,6 +162,7 @@ public void onComplete() { return true; } + @SuppressWarnings("NullAway") // Dataflow analysis limitation private TimerFacade sendTimer() { if (this.successTimer == null) { this.successTimer = @@ -176,7 +178,7 @@ private TimerFacade sendTimer() { } @Override - public Message receive() { + public @Nullable Message receive() { if (this.loggingEnabled) { LOG.debug("receive called on null channel"); } @@ -185,7 +187,7 @@ public Message receive() { } @Override - public Message receive(long timeout) { + public @Nullable Message receive(long timeout) { return receive(); } @@ -198,6 +200,7 @@ private void incrementReceiveCounter() { } } + @SuppressWarnings("NullAway") // Dataflow analysis limitation private CounterFacade buildReceiveCounter() { return this.metricsCaptor .counterBuilder(RECEIVE_COUNTER_NAME) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java index 37f1330d12..b9ff9aca89 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java @@ -16,6 +16,7 @@ package org.springframework.integration.channel; +import java.util.Objects; import java.util.concurrent.ThreadFactory; import java.util.function.Function; import java.util.function.Predicate; @@ -65,7 +66,8 @@ public class PartitionedChannel extends AbstractExecutorChannel { * sent to this channel. */ public PartitionedChannel(int partitionCount) { - this(partitionCount, (message) -> message.getHeaders().get(IntegrationMessageHeaderAccessor.CORRELATION_ID)); + this(partitionCount, (message) -> + Objects.requireNonNull(message.getHeaders().get(IntegrationMessageHeaderAccessor.CORRELATION_ID))); } /** diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/PriorityChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/PriorityChannel.java index 92969f603b..addfc12f6f 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/PriorityChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/PriorityChannel.java @@ -135,7 +135,7 @@ protected boolean doSend(Message message, long timeout) { } @Override - protected Message doReceive(long timeout) { + protected @Nullable Message doReceive(long timeout) { Message message = super.doReceive(timeout); if (message != null) { if (!this.useMessageStore) { @@ -148,7 +148,7 @@ protected Message doReceive(long timeout) { private static final class SequenceFallbackComparator implements Comparator> { - private final Comparator> targetComparator; + private final @Nullable Comparator> targetComparator; SequenceFallbackComparator(@Nullable Comparator> targetComparator) { this.targetComparator = targetComparator; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java index ffeac95589..b7484ff3dd 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java @@ -40,7 +40,7 @@ public class PublishSubscribeChannel extends AbstractExecutorChannel implements private final boolean requireSubscribers; - private ErrorHandler errorHandler; + private @Nullable ErrorHandler errorHandler; private boolean ignoreFailures; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/package-info.java index 2f75f5ef6f..5f5b24d989 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/package-info.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/package-info.java @@ -1,5 +1,5 @@ /** * Provides classes representing various channel types. */ -@org.springframework.lang.NonNullApi +@org.jspecify.annotations.NullMarked package org.springframework.integration.channel; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java index 7d43acdd4d..2a71d189bd 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java @@ -185,7 +185,7 @@ private Message applyBeforeHandle(Message message, Deque executorInterceptor.getClass().getSimpleName() + " returned null from beforeHandle, i.e. precluding the send."); - triggerAfterMessageHandled(null, null, interceptorStack); + triggerAfterMessageHandled(message, null, interceptorStack); return null; } interceptorStack.add(executorInterceptor); @@ -203,7 +203,7 @@ private void triggerAfterMessageHandled(Message message, Exception ex, try { interceptor.afterMessageHandled(message, this.inputChannel, this.handler, ex); } - catch (Throwable ex2) { //NOSONAR + catch (Throwable ex2) { logger.error(ex2, () -> "Exception from afterMessageHandled in " + interceptor); } } diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/PollableKafkaChannel.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/PollableKafkaChannel.java index 009f36c18c..a25993579e 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/PollableKafkaChannel.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/PollableKafkaChannel.java @@ -183,7 +183,6 @@ public boolean removeInterceptor(ChannelInterceptor interceptor) { } @Override - @Nullable public ChannelInterceptor removeInterceptor(int index) { ChannelInterceptor interceptor = super.removeInterceptor(index); if (interceptor instanceof ExecutorChannelInterceptor) {