Skip to content

Apply JSpecify Nullify to the Channel and AOT packages #10182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public boolean removeInterceptor(ChannelInterceptor interceptor) {
}

@Override
public @Nullable ChannelInterceptor removeInterceptor(int index) {
public ChannelInterceptor removeInterceptor(int index) {
ChannelInterceptor interceptor = super.removeInterceptor(index);
if (interceptor instanceof ExecutorChannelInterceptor) {
this.executorInterceptorsSize--;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.jspecify.annotations.Nullable;

import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aot.hint.ExecutableMode;
import org.springframework.aot.hint.MemberCategory;
Expand Down Expand Up @@ -79,7 +81,7 @@
class CoreRuntimeHints implements RuntimeHintsRegistrar {

@Override
public void registerHints(RuntimeHints hints, ClassLoader classLoader) {
public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader) {
ReflectionHints reflectionHints = hints.reflection();
Stream.of(
GenericSelector.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/**
* Provides classes to support Spring AOT.
*/
@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
@org.jspecify.annotations.NullMarked
package org.springframework.integration.aot;
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@
public abstract class AbstractExecutorChannel extends AbstractSubscribableChannel
implements ExecutorChannelInterceptorAware {

protected Executor executor; // NOSONAR
protected @Nullable Executor executor;

protected AbstractDispatcher dispatcher; // NOSONAR
@SuppressWarnings("NullAway.Init")
protected AbstractDispatcher dispatcher;

protected Integer maxSubscribers; // NOSONAR
@Nullable
protected Integer maxSubscribers;

protected int executorInterceptorsSize; // NOSONAR
protected int executorInterceptorsSize;

public AbstractExecutorChannel(@Nullable Executor executor) {
this.executor = executor;
Expand Down Expand Up @@ -117,7 +119,6 @@ public boolean removeInterceptor(ChannelInterceptor interceptor) {
}

@Override
@Nullable
public ChannelInterceptor removeInterceptor(int index) {
ChannelInterceptor interceptor = super.removeInterceptor(index);
if (interceptor instanceof ExecutorChannelInterceptor) {
Expand Down Expand Up @@ -167,7 +168,7 @@ public void run() {
if (!CollectionUtils.isEmpty(interceptorStack)) {
triggerAfterMessageHandled(message, ex, interceptorStack);
}
if (ex instanceof MessagingException) { // NOSONAR
if (ex instanceof MessagingException) {
throw new MessagingExceptionWrapper(message, (MessagingException) ex);
}
String description = "Failed to handle " + message + " to " + this + " in " + messageHandler;
Expand Down Expand Up @@ -195,7 +196,7 @@ private Message<?> applyBeforeHandle(Message<?> message, Deque<ExecutorChannelIn
logger.debug(() -> executorInterceptor.getClass().getSimpleName()
+ " returned null from beforeHandle, i.e. precluding the send.");
}
triggerAfterMessageHandled(null, null, interceptorStack);
triggerAfterMessageHandled(message, null, interceptorStack);
return null;
}
interceptorStack.add(executorInterceptor);
Expand All @@ -204,13 +205,13 @@ private Message<?> applyBeforeHandle(Message<?> message, Deque<ExecutorChannelIn
return theMessage;
}

private void triggerAfterMessageHandled(@Nullable Message<?> message, @Nullable Exception ex,
private void triggerAfterMessageHandled(Message<?> message, @Nullable Exception ex,
Deque<ExecutorChannelInterceptor> interceptorStack) {
Iterator<ExecutorChannelInterceptor> iterator = interceptorStack.descendingIterator();
while (iterator.hasNext()) {
ExecutorChannelInterceptor interceptor = iterator.next();
try {
interceptor.afterMessageHandled(message, AbstractExecutorChannel.this, //NOSONAR
interceptor.afterMessageHandled(message, AbstractExecutorChannel.this,
this.delegate.getMessageHandler(), ex);
}
catch (Throwable ex2) { //NOSONAR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,27 @@ public abstract class AbstractMessageChannel extends IntegrationObjectSupport

private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;

@Nullable
private MessageSenderObservationConvention observationConvention;
private @Nullable MessageSenderObservationConvention observationConvention;

private boolean shouldTrack = false;

private Class<?>[] datatypes = new Class<?>[0];

private MessageConverter messageConverter;
private @Nullable MessageConverter messageConverter;

private boolean loggingEnabled = true;

private MetricsCaptor metricsCaptor;
private @Nullable MetricsCaptor metricsCaptor;

private TimerFacade successTimer;
private @Nullable TimerFacade successTimer;

private TimerFacade failureTimer;
private @Nullable TimerFacade failureTimer;

private volatile String fullChannelName;
private volatile @Nullable String fullChannelName;

private volatile boolean applicationRunning;

private volatile Lifecycle applicationRunningController;
private volatile @Nullable Lifecycle applicationRunningController;

@Override
public String getComponentType() {
Expand Down Expand Up @@ -235,7 +234,6 @@ public boolean removeInterceptor(ChannelInterceptor interceptor) {
}

@Override
@Nullable
public ChannelInterceptor removeInterceptor(int index) {
return this.interceptors.remove(index);
}
Expand Down Expand Up @@ -391,16 +389,19 @@ private boolean sendWithObservation(Message<?> message, long timeout) {
Boolean observe = observation.observe(() -> {
Message<?> messageToSendInternal = messageToSend;
if (message instanceof ErrorMessage errorMessage) {
Message<?> originalMessage = errorMessage.getOriginalMessage();
messageToSendInternal =
new ErrorMessage(errorMessage.getPayload(),
(originalMessage != null) ? new ErrorMessage(errorMessage.getPayload(),
messageToSend.getHeaders(),
errorMessage.getOriginalMessage());
originalMessage) : new ErrorMessage(errorMessage.getPayload(),
messageToSend.getHeaders());
}
return sendInternal(messageToSendInternal, timeout);
});
return Boolean.TRUE.equals(observe);
}

@SuppressWarnings("NullAway") // Dataflow analysis limitation
private boolean sendWithMetrics(Message<?> message, long timeout) {
SampleFacade sample = this.metricsCaptor.start();
try {
Expand Down Expand Up @@ -468,6 +469,7 @@ private TimerFacade sendTimer(boolean sent) {
}
}

@SuppressWarnings("NullAway") // Dataflow analysis limitation
private TimerFacade buildSendTimer(boolean success, String exception) {
TimerFacade timer = this.metricsCaptor.timerBuilder(SEND_TIMER_NAME)
.tag("type", "channel")
Expand Down Expand Up @@ -676,12 +678,9 @@ public boolean remove(ChannelInterceptor interceptor) {
}
}

@Nullable
public ChannelInterceptor remove(int index) {
ChannelInterceptor removed = this.interceptors.remove(index);
if (removed != null) {
this.size--;
}
this.size--;
return removed;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class AbstractPollableChannel extends AbstractMessageChannel

private int executorInterceptorsSize;

private CounterFacade receiveCounter;
private @Nullable CounterFacade receiveCounter;

@Override
public IntegrationPatternType getIntegrationPatternType() {
Expand Down Expand Up @@ -186,7 +186,6 @@ public boolean removeInterceptor(ChannelInterceptor interceptor) {
}

@Override
@Nullable
public ChannelInterceptor removeInterceptor(int index) {
ChannelInterceptor interceptor = super.removeInterceptor(index);
if (interceptor instanceof ExecutorChannelInterceptor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ChannelPurger {

private final QueueChannel[] channels;

private final MessageSelector selector;
private final @Nullable MessageSelector selector;

public ChannelPurger(QueueChannel... channels) {
this(null, channels);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class DefaultHeaderChannelRegistry extends IntegrationObjectSupport

private long reaperDelay;

private volatile ScheduledFuture<?> reaperScheduledFuture;
private volatile @Nullable ScheduledFuture<?> reaperScheduledFuture;

private volatile boolean running;

Expand Down Expand Up @@ -147,8 +147,9 @@ public void stop() {
this.lock.lock();
try {
this.running = false;
if (this.reaperScheduledFuture != null) {
this.reaperScheduledFuture.cancel(true);
ScheduledFuture<?> reaperScheduledFutureToCancel = this.reaperScheduledFuture;
if (reaperScheduledFutureToCancel != null) {
reaperScheduledFutureToCancel.cancel(true);
this.reaperScheduledFuture = null;
}
this.explicitlyStopped = true;
Expand Down Expand Up @@ -221,8 +222,9 @@ public MessageChannel channelNameToChannel(@Nullable String name) {
public void runReaper() {
this.lock.lock();
try {
if (this.reaperScheduledFuture != null) {
this.reaperScheduledFuture.cancel(true);
ScheduledFuture<?> reaperScheduledFutureToCancel = this.reaperScheduledFuture;
if (reaperScheduledFutureToCancel != null) {
reaperScheduledFutureToCancel.cancel(true);
this.reaperScheduledFuture = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class DirectChannel extends AbstractSubscribableChannel {

private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();

private volatile Integer maxSubscribers;
private volatile @Nullable Integer maxSubscribers;

/**
* Create a channel with default {@link RoundRobinLoadBalancingStrategy}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
*/
public class ExecutorChannel extends AbstractExecutorChannel {

private final LoadBalancingStrategy loadBalancingStrategy;
private final @Nullable LoadBalancingStrategy loadBalancingStrategy;

private Predicate<Exception> failoverStrategy = (exception) -> true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public final class FixedSubscriberChannel implements SubscribableChannel, BeanNa

private final MessageHandler handler;

@SuppressWarnings("NullAway.Init")
private String beanName;

public FixedSubscriberChannel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ public class MessagePublishingErrorHandler extends ErrorMessagePublisher impleme

private static final int DEFAULT_SEND_TIMEOUT = 1000;

@SuppressWarnings("NullAway") // Dataflow analysis limitation
private static final ErrorMessageStrategy DEFAULT_ERROR_MESSAGE_STRATEGY = (ex, attrs) -> {
if (ex instanceof MessagingExceptionWrapper) {
return new ErrorMessage(ex.getCause(), ((MessagingExceptionWrapper) ex).getFailedMessage());
if (ex instanceof MessagingExceptionWrapper messagingExceptionWrapper) {
return new ErrorMessage(messagingExceptionWrapper.getCause(),
messagingExceptionWrapper.getFailedMessage());
}
else {
return new ErrorMessage(ex);
Expand All @@ -66,7 +68,7 @@ public MessagePublishingErrorHandler(DestinationResolver<MessageChannel> channel
setChannelResolver(channelResolver);
}

public void setDefaultErrorChannel(@Nullable MessageChannel defaultErrorChannel) {
public void setDefaultErrorChannel(MessageChannel defaultErrorChannel) {
setChannel(defaultErrorChannel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ public class NullChannel implements PollableChannel,

private boolean loggingEnabled = true;

@SuppressWarnings("NullAway.Init")
private String beanName;

private MetricsCaptor metricsCaptor;
private @Nullable MetricsCaptor metricsCaptor;

private TimerFacade successTimer;
private @Nullable TimerFacade successTimer;

private CounterFacade receiveCounter;
private @Nullable CounterFacade receiveCounter;

@Override
public void setBeanName(String beanName) {
Expand Down Expand Up @@ -161,6 +162,7 @@ public void onComplete() {
return true;
}

@SuppressWarnings("NullAway") // Dataflow analysis limitation
private TimerFacade sendTimer() {
if (this.successTimer == null) {
this.successTimer =
Expand All @@ -176,7 +178,7 @@ private TimerFacade sendTimer() {
}

@Override
public Message<?> receive() {
public @Nullable Message<?> receive() {
if (this.loggingEnabled) {
LOG.debug("receive called on null channel");
}
Expand All @@ -185,7 +187,7 @@ public Message<?> receive() {
}

@Override
public Message<?> receive(long timeout) {
public @Nullable Message<?> receive(long timeout) {
return receive();
}

Expand All @@ -198,6 +200,7 @@ private void incrementReceiveCounter() {
}
}

@SuppressWarnings("NullAway") // Dataflow analysis limitation
private CounterFacade buildReceiveCounter() {
return this.metricsCaptor
.counterBuilder(RECEIVE_COUNTER_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.integration.channel;

import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -65,7 +66,8 @@ public class PartitionedChannel extends AbstractExecutorChannel {
* sent to this channel.
*/
public PartitionedChannel(int partitionCount) {
this(partitionCount, (message) -> message.getHeaders().get(IntegrationMessageHeaderAccessor.CORRELATION_ID));
this(partitionCount, (message) ->
Objects.requireNonNull(message.getHeaders().get(IntegrationMessageHeaderAccessor.CORRELATION_ID)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ protected boolean doSend(Message<?> message, long timeout) {
}

@Override
protected Message<?> doReceive(long timeout) {
protected @Nullable Message<?> doReceive(long timeout) {
Message<?> message = super.doReceive(timeout);
if (message != null) {
if (!this.useMessageStore) {
Expand All @@ -148,7 +148,7 @@ protected Message<?> doReceive(long timeout) {

private static final class SequenceFallbackComparator implements Comparator<Message<?>> {

private final Comparator<Message<?>> targetComparator;
private final @Nullable Comparator<Message<?>> targetComparator;

SequenceFallbackComparator(@Nullable Comparator<Message<?>> targetComparator) {
this.targetComparator = targetComparator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class PublishSubscribeChannel extends AbstractExecutorChannel implements

private final boolean requireSubscribers;

private ErrorHandler errorHandler;
private @Nullable ErrorHandler errorHandler;

private boolean ignoreFailures;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Provides classes representing various channel types.
*/
@org.springframework.lang.NonNullApi
@org.jspecify.annotations.NullMarked
package org.springframework.integration.channel;
Loading