diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index eceb7d7a738..4c5bcef2013 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -779,8 +779,12 @@ static final class JumpToApplicationThreadServerStreamListener implements Server // Only accessed from callExecutor. private ServerStreamListener listener; - public JumpToApplicationThreadServerStreamListener(Executor executor, - Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) { + public JumpToApplicationThreadServerStreamListener( + Executor executor, + Executor cancelExecutor, + ServerStream stream, + Context.CancellableContext context, + Tag tag) { this.callExecutor = executor; this.cancelExecutor = cancelExecutor; this.stream = stream; @@ -809,9 +813,12 @@ void setListener(ServerStreamListener listener) { * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. */ private void internalClose(Throwable t) { - // TODO(ejona86): this is not thread-safe :) String description = "Application error processing RPC"; - stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata()); + Metadata metadata = Status.trailersFromThrowable(t); + if (metadata == null) { + metadata = new Metadata(); + } + stream.close(Status.UNKNOWN.withDescription(description).withCause(t), metadata); } @Override diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 3125edca1e6..36a6e69f73e 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -1567,7 +1567,6 @@ private void ensureServerStateNotLeaked() { assertEquals(Status.UNKNOWN.getCode(), statusCaptor.getValue().getCode()); // Used in InProcessTransport when set to include the cause with the status assertNotNull(statusCaptor.getValue().getCause()); - assertTrue(metadataCaptor.getValue().keys().isEmpty()); } private static class SimpleServer implements io.grpc.internal.InternalServer { diff --git a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java index aea7ff49032..d85c54a31e3 100644 --- a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; @@ -1598,8 +1599,10 @@ public void interactionsAfterServerStreamCloseAreNoops() throws Exception { assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); // Ensure that for a closed ServerStream, interactions are noops - server.stream.writeHeaders(new Metadata(), true); - server.stream.writeMessage(methodDescriptor.streamResponse("response")); + assertThrows(Exception.class, () -> + server.stream.writeHeaders(new Metadata(), true)); + assertThrows(Exception.class, () -> + server.stream.writeMessage(methodDescriptor.streamResponse("response"))); server.stream.close(Status.INTERNAL, new Metadata()); // Make sure new streams still work properly diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java index eacf46ca4a2..0fea117a9bf 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -17,6 +17,7 @@ package io.grpc.inprocess; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; import static java.lang.Math.max; @@ -414,6 +415,7 @@ private class InProcessServerStream implements ServerStream { private boolean closed; @GuardedBy("this") private int outboundSeqNo; + private boolean closeCalled; InProcessServerStream(MethodDescriptor method, Metadata headers) { statsTraceCtx = StatsTraceContext.newServerContext( @@ -431,6 +433,7 @@ public void setListener(ServerStreamListener serverStreamListener) { @Override public void request(int numMessages) { + checkState(!closeCalled, "call already closed"); boolean onReady = clientStream.serverRequested(numMessages); if (onReady) { synchronized (this) { @@ -487,6 +490,7 @@ private void clientCancelled(Status status) { @Override public void writeMessage(InputStream message) { + checkState(!closeCalled, "call already closed"); long messageLength = 0; if (isEnabledSupportTracingMessageSizes) { try { @@ -546,6 +550,7 @@ public synchronized boolean isReady() { @Override public void writeHeaders(Metadata headers, boolean flush) { + checkState(!closeCalled, "call already closed"); if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) { int metadataSize = metadataSize(headers); if (metadataSize > clientMaxInboundMetadataSize) { @@ -581,6 +586,7 @@ public void close(Status status, Metadata trailers) { // clientStreamListener.closed can trigger clientStream.cancel (see code in // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are // calling internalCancel(). + closeCalled = true; clientStream.serverClosed(Status.OK, status); if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) { diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java index d97aa8cd36c..cfde8a0dad2 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java @@ -131,7 +131,6 @@ public void onCompleted() { assertTrue(finishLatch.await(900, TimeUnit.MILLISECONDS)); assertEquals(fakeResponse, responseRef.get()); - assertNull(throwableRef.get()); } @Test