Skip to content

Apply NullAbility to rsocket module #10303

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package org.springframework.integration.rsocket;

import org.jspecify.annotations.Nullable;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
Expand Down Expand Up @@ -64,8 +65,7 @@ public void setDataMimeType(@Nullable MimeType dataMimeType) {
this.rSocketMessageHandler.setDefaultDataMimeType(dataMimeType);
}

@Nullable
protected MimeType getDataMimeType() {
protected @Nullable MimeType getDataMimeType() {
return this.rSocketMessageHandler.getDefaultDataMimeType();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ public class ClientRSocketConnector extends AbstractRSocketConnector {
private RSocketConnectorConfigurer connectorConfigurer = (connector) -> {
};

@SuppressWarnings("NullAway.Init")
private Object setupData;

@SuppressWarnings("NullAway.Init")
private String setupRoute;

private Object[] setupRouteVars = new Object[0];

private boolean autoConnect;

@SuppressWarnings("NullAway.Init")
private RSocketRequester rsocketRequester;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@

import io.rsocket.Payload;
import io.rsocket.frame.FrameType;
import org.jspecify.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.context.ApplicationContext;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.codec.Encoder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.ReactiveMessageHandler;
import org.springframework.messaging.handler.CompositeMessageCondition;
Expand All @@ -60,6 +60,7 @@
*/
class IntegrationRSocketMessageHandler extends RSocketMessageHandler {

@SuppressWarnings("NullAway") // Reflection
private static final Method HANDLE_MESSAGE_METHOD =
ReflectionUtils.findMethod(ReactiveMessageHandler.class, "handleMessage", Message.class);

Expand Down Expand Up @@ -105,7 +106,7 @@ public void addEndpoint(IntegrationRSocketEndpoint endpoint) {
registerHandlerMethod(endpoint, HANDLE_MESSAGE_METHOD,
new CompositeMessageCondition(
frameTypeMessageCondition,
new DestinationPatternsMessageCondition(endpoint.getPath(), getRouteMatcher()))); // NOSONAR
new DestinationPatternsMessageCondition(endpoint.getPath(), obtainRouteMatcher()))); // NOSONAR
}

@Override
Expand Down Expand Up @@ -173,9 +174,8 @@ public Mono<Void> handleReturnValue(@Nullable Object returnValue, MethodParamete
}
}

@Nullable
@SuppressWarnings("unchecked")
private static AtomicReference<Flux<Payload>> getResponseReference(Message<?> message) {
private static @Nullable AtomicReference<Flux<Payload>> getResponseReference(Message<?> message) {
Object headerValue = message.getHeaders().get(RESPONSE_HEADER);
Assert.state(headerValue == null || headerValue instanceof AtomicReference, "Expected AtomicReference");
return (AtomicReference<Flux<Payload>>) headerValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import org.jspecify.annotations.Nullable;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServer;
Expand All @@ -34,7 +35,6 @@
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.Assert;
Expand All @@ -51,11 +51,12 @@
*/
public class ServerRSocketConnector extends AbstractRSocketConnector implements ApplicationEventPublisherAware {

private final ServerTransport<CloseableChannel> serverTransport;
private final @Nullable ServerTransport<CloseableChannel> serverTransport;

private Consumer<RSocketServer> serverConfigurer = (rsocketServer) -> {
};

@SuppressWarnings("NullAway.Init")
private Mono<CloseableChannel> serverMono;

/**
Expand Down Expand Up @@ -191,8 +192,7 @@ public Map<Object, RSocketRequester> getClientRSocketRequesters() {
* @return the {@link RSocketRequester} or null.
* @see ServerRSocketMessageHandler#getClientRSocketRequester(Object)
*/
@Nullable
public RSocketRequester getClientRSocketRequester(Object key) {
public @Nullable RSocketRequester getClientRSocketRequester(Object key) {
return serverRSocketMessageHandler().getClientRSocketRequester(key);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import java.util.Map;
import java.util.function.BiFunction;

import org.jspecify.annotations.Nullable;

import org.springframework.aot.hint.annotation.Reflective;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.CompositeMessageCondition;
Expand Down Expand Up @@ -60,6 +61,7 @@
public class ServerRSocketMessageHandler extends IntegrationRSocketMessageHandler
implements ApplicationEventPublisherAware {

@SuppressWarnings("NullAway") // Reflection
private static final Method HANDLE_CONNECTION_SETUP_METHOD =
ReflectionUtils.findMethod(ServerRSocketMessageHandler.class, "handleConnectionSetup", Message.class);

Expand All @@ -68,7 +70,7 @@ public class ServerRSocketMessageHandler extends IntegrationRSocketMessageHandle
private BiFunction<Map<String, Object>, DataBuffer, Object> clientRSocketKeyStrategy =
(headers, data) -> data.toString(StandardCharsets.UTF_8);

private ApplicationEventPublisher applicationEventPublisher;
private @Nullable ApplicationEventPublisher applicationEventPublisher;

/**
* Create an service side RSocket message handler instance for delegating
Expand Down Expand Up @@ -120,8 +122,7 @@ public Map<Object, RSocketRequester> getClientRSocketRequesters() {
* @param key the key for mapped {@link RSocketRequester} if any.
* @return the mapped {@link RSocketRequester} or null.
*/
@Nullable
public RSocketRequester getClientRSocketRequester(Object key) {
public @Nullable RSocketRequester getClientRSocketRequester(Object key) {
return this.clientRSocketRequesters.get(key);
}

Expand All @@ -146,6 +147,7 @@ private void handleConnectionSetup(Message<DataBuffer> connectMessage) {
RSocketRequester rsocketRequester =
messageHeaders.get(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER,
RSocketRequester.class);
Assert.notNull(rsocketRequester, "'rsocketRequester' can not be null");
this.clientRSocketRequesters.put(rsocketRequesterKey, rsocketRequester);
RSocketConnectedEvent rSocketConnectedEvent =
new RSocketConnectedEvent(this, messageHeaders, dataBuffer, rsocketRequester); // NOSONAR
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Provides classes for RSocket XML namespace parsing and configuration support.
*/
@org.springframework.lang.NonNullApi
@org.jspecify.annotations.NullMarked
package org.springframework.integration.rsocket.config;
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/**
* Provides RSocket Components support for Spring Integration Java DSL.
*/
@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
@org.jspecify.annotations.NullMarked
package org.springframework.integration.rsocket.dsl;
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;

import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -37,7 +38,6 @@
import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
import org.springframework.integration.rsocket.RSocketInteractionModel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHeaders;
Expand Down Expand Up @@ -78,11 +78,9 @@ public class RSocketInboundGateway extends MessagingGatewaySupport implements In

private RSocketStrategies rsocketStrategies = RSocketStrategies.create();

@Nullable
private AbstractRSocketConnector rsocketConnector;
private @Nullable AbstractRSocketConnector rsocketConnector;

@Nullable
private ResolvableType requestElementType;
private @Nullable ResolvableType requestElementType;

private boolean decodeFluxAsUnit;

Expand Down Expand Up @@ -188,8 +186,8 @@ protected void onInit() {
@Override
protected void doStart() {
super.doStart();
if (this.rsocketConnector instanceof ClientRSocketConnector) {
((ClientRSocketConnector) this.rsocketConnector).connect();
if (this.rsocketConnector instanceof ClientRSocketConnector clientRSocketConnector) {
clientRSocketConnector.connect();
}
}

Expand Down Expand Up @@ -265,8 +263,8 @@ private Object decodePayload(Message<?> requestMessage) {

// The MessagingRSocket logic ensures that we can have only a single DataBuffer payload or Flux<DataBuffer>.
Decoder<Object> decoder = this.rsocketStrategies.decoder(elementType, mimeType);
if (payload instanceof DataBuffer) {
return decoder.decode((DataBuffer) payload, elementType, mimeType, null);
if (payload instanceof DataBuffer dataBuffer) {
return decoder.decode(dataBuffer, elementType, mimeType, null);
}
else if (this.decodeFluxAsUnit) {
return decoder.decode((Publisher<DataBuffer>) payload, elementType, mimeType, null);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Provides classes representing inbound RSocket components.
*/
@org.springframework.lang.NonNullApi
@org.jspecify.annotations.NullMarked
package org.springframework.integration.rsocket.inbound;
Loading