From 2a766c7a689f667b850b50228abc8798116503cc Mon Sep 17 00:00:00 2001 From: George Gensure Date: Thu, 10 Jul 2025 09:49:54 -0400 Subject: [PATCH] Guarantee missing stream promise delivery In observed cases, whether RST_STREAM or another failure from netty or the server, listeners can fail to be notified when a connection yields a null stream for the selected streamId. This causes hangs in clients, despite deadlines, with no obvious resolution. Tests which relied upon this promise succeeding must now change. --- .../io/grpc/netty/NettyClientHandler.java | 19 ++++++++++++------- .../io/grpc/netty/NettyClientHandlerTest.java | 4 ++-- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index a5fa0f80077..276fa623c38 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -738,14 +738,19 @@ public void operationComplete(ChannelFuture future) throws Exception { // Attach the client stream to the HTTP/2 stream object as user data. stream.setHttp2Stream(http2Stream); + promise.setSuccess(); + } else { + // Otherwise, the stream has been cancelled and Netty is sending a + // RST_STREAM frame which causes it to purge pending writes from the + // flow-controller and delete the http2Stream. The stream listener has already + // been notified of cancellation so there is nothing to do. + // + // This process has been observed to fail in some circumstances, leaving listeners + // unanswered. Ensure that some exception has been delivered consistent with the + // implied RST_STREAM result above. + Status status = Status.INTERNAL.withDescription("unknown stream for connection"); + promise.setFailure(status.asRuntimeException()); } - // Otherwise, the stream has been cancelled and Netty is sending a - // RST_STREAM frame which causes it to purge pending writes from the - // flow-controller and delete the http2Stream. The stream listener has already - // been notified of cancellation so there is nothing to do. - - // Just forward on the success status to the original promise. - promise.setSuccess(); } else { Throwable cause = future.cause(); if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) { diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index f8fbeea9b82..dd4fcb4ea6c 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -268,7 +268,7 @@ public void cancelBufferedStreamShouldChangeClientStreamStatus() throws Exceptio // Cancel the stream. cancelStream(Status.CANCELLED); - assertTrue(createFuture.isSuccess()); + assertFalse(createFuture.isSuccess()); verify(streamListener).closed(eq(Status.CANCELLED), same(PROCESSED), any(Metadata.class)); } @@ -311,7 +311,7 @@ public void cancelWhileBufferedShouldSucceed() throws Exception { ChannelFuture cancelFuture = cancelStream(Status.CANCELLED); assertTrue(cancelFuture.isSuccess()); assertTrue(createFuture.isDone()); - assertTrue(createFuture.isSuccess()); + assertFalse(createFuture.isSuccess()); } /**