Skip to content

Commit

Permalink
inprocess: Fix listener race if transport is shutdown while starting
Browse files Browse the repository at this point in the history
Returning the runnable did nothing, as both the start method and the
runnable are run within the synchronization context. I believe the
Runnable used to be required in the previous implementation of
ManagedChannelImpl (the lock-based implementation before we created
SynchronizationContext).

This fixes a NPE seen in ServerImpl because the server expects proper
ordering of transport lifecycle events.
```
Uncaught exception in the SynchronizationContext. Panic!
java.lang.NullPointerException: Cannot invoke "java.util.concurrent.Future.cancel(boolean)" because "this.handshakeTimeoutFuture" is null
	at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.transportReady(ServerImpl.java:440)
	at io.grpc.inprocess.InProcessTransport$4.run(InProcessTransport.java:215)
	at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:94)
```

b/338445186
  • Loading branch information
ejona86 authored May 28, 2024
1 parent 107fdb4 commit e4e7f3a
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,26 @@ public void clientStartStop() throws Exception {
verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
}

@Test
public void clientShutdownBeforeStartRunnable() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
Runnable runnable = client.start(mockClientTransportListener);
// Shutdown before calling 'runnable'
client.shutdown(Status.UNAVAILABLE.withDescription("shutdown called"));
runIfNotNull(runnable);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
// We should verify that clients don't call transportReady() after transportTerminated(), but
// transports do this today and nothing cares. ServerImpl, on the other hand, doesn't appreciate
// the out-of-order calls.
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);

This comment has been minimized.

Copy link
@jdcormie

jdcormie Jun 3, 2024

Member

This new test fails for me when run via BinderTransportTest because BinderClientTransport actually initiates the connection in the returned Runnable, but only if it hasn't already been shutdown. Is this test's assumption guaranteed by some API contract for transports that I don't see?

Also curious how this passed presubmits (we run AbstractTransportTest as part of :grpc-binder:connectedCheck)

assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));
// Allow any status as some transports (e.g., Netty) don't communicate the original status when
// shutdown while handshaking. It won't be used anyway, so no big deal.
verify(mockClientTransportListener).transportShutdown(any(Status.class));
}

@Test
public void clientStartAndStopOnceConnected() throws Exception {
server.start(serverListener);
Expand Down Expand Up @@ -2251,6 +2271,7 @@ public void streamCreated(ServerStream stream, String method, Metadata headers)

@Override
public Attributes transportReady(Attributes attributes) {
assertFalse(terminated.isDone());
return Attributes.newBuilder()
.setAll(attributes)
.set(ADDITIONAL_TRANSPORT_ATTR_KEY, "additional attribute value")
Expand Down
23 changes: 8 additions & 15 deletions inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,21 +203,14 @@ public void run() {
}
};
}
return new Runnable() {
@Override
@SuppressWarnings("deprecation")
public void run() {
synchronized (InProcessTransport.this) {
Attributes serverTransportAttrs = Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
.build();
serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
attributes = clientTransportListener.filterTransport(attributes);
clientTransportListener.transportReady();
}
}
};
Attributes serverTransportAttrs = Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
.build();
serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
attributes = clientTransportListener.filterTransport(attributes);
clientTransportListener.transportReady();
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,9 @@ public void clientChecksInboundMetadataSize_header() {}
@Ignore("https://github.com/jetty/jetty.project/issues/11822")
@Test
public void clientChecksInboundMetadataSize_trailer() {}

@Override
@Ignore("Not yet investigated, but has been seen for multiple servlet containers")
@Test
public void clientShutdownBeforeStartRunnable() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,9 @@ public void earlyServerClose_serverFailure_withClientCancelOnListenerClosed() {}
@Ignore("regression since bumping grpc v1.46 to v1.53")
@Test
public void messageProducerOnlyProducesRequestedMessages() {}

@Override
@Ignore("Not yet investigated, but has been seen for multiple servlet containers")
@Test
public void clientShutdownBeforeStartRunnable() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,9 @@ public void clientCancel() {}
@Ignore("regression since bumping grpc v1.46 to v1.53")
@Test
public void messageProducerOnlyProducesRequestedMessages() {}

@Override
@Ignore("Not yet investigated, but has been seen for multiple servlet containers")
@Test
public void clientShutdownBeforeStartRunnable() {}
}

0 comments on commit e4e7f3a

Please sign in to comment.