From 1d6360f3f4dd8d832c433a504670b27272dffc57 Mon Sep 17 00:00:00 2001 From: litt <102969658+litt3@users.noreply.github.com> Date: Mon, 21 Oct 2024 16:20:42 -0400 Subject: [PATCH] fix: Make server keep connection open after receiving ES (#305) Signed-off-by: litt --- .../pbj/grpc/helidon/PbjProtocolHandler.java | 50 +++++++++----- .../pbj/grpc/helidon/GreeterService.java | 7 +- .../com/hedera/pbj/grpc/helidon/PbjTest.java | 31 ++++++--- .../com/hedera/pbj/runtime/grpc/Pipeline.java | 15 ++++ .../hedera/pbj/runtime/grpc/Pipelines.java | 69 ++++++++++++++----- .../pbj/runtime/grpc/ServiceInterface.java | 2 +- 6 files changed, 123 insertions(+), 51 deletions(-) create mode 100644 pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java index a32181c9..837fb1c3 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java @@ -30,6 +30,7 @@ import com.hedera.pbj.grpc.helidon.config.PbjConfig; import com.hedera.pbj.runtime.grpc.GrpcException; import com.hedera.pbj.runtime.grpc.GrpcStatus; +import com.hedera.pbj.runtime.grpc.Pipeline; import com.hedera.pbj.runtime.grpc.ServiceInterface; import com.hedera.pbj.runtime.io.buffer.Bytes; import edu.umd.cs.findbugs.annotations.NonNull; @@ -137,14 +138,14 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa private int entityBytesIndex = 0; /** - * The subscriber that will receive incoming messages from the client. + * The communication pipeline between server and client * *

This member isn't final because it is set in the {@link #init()} method. It should not be * set at any other time. * *

Method calls on this object are thread-safe. */ - private Flow.Subscriber incoming; + private Pipeline pipeline; /** Create a new instance */ PbjProtocolHandler( @@ -267,27 +268,27 @@ public void init() { // Setup the subscribers. The "outgoing" subscriber will send messages to the client. // This is given to the "open" method on the service to allow it to send messages to // the client. - final var outgoing = new SendToClientSubscriber(); - incoming = route.service().open(route.method(), options, outgoing); + final Flow.Subscriber outgoing = new SendToClientSubscriber(); + pipeline = route.service().open(route.method(), options, outgoing); } catch (final GrpcException grpcException) { route.failedGrpcRequestCounter().increment(); new TrailerOnlyBuilder() .grpcStatus(grpcException.status()) .statusMessage(grpcException.getMessage()) .send(); - close(); + error(); } catch (final HttpException httpException) { route.failedHttpRequestCounter().increment(); new TrailerOnlyBuilder() .httpStatus(httpException.status()) .grpcStatus(GrpcStatus.INVALID_ARGUMENT) .send(); - close(); + error(); } catch (final Exception unknown) { route.failedUnknownRequestCounter().increment(); LOGGER.log(ERROR, "Failed to initialize grpc protocol handler", unknown); new TrailerOnlyBuilder().grpcStatus(GrpcStatus.UNKNOWN).send(); - close(); + error(); } } @@ -299,7 +300,7 @@ public Http2StreamState streamState() { @Override public void rstStream(@NonNull final Http2RstStream rstStream) { - // Nothing to do + pipeline.onComplete(); } @Override @@ -368,7 +369,7 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa if (entityBytesIndex == entityBytes.length) { // Grab and wrap the bytes and reset to being reading the next message final var bytes = Bytes.wrap(entityBytes); - incoming.onNext(bytes); + pipeline.onNext(bytes); entityBytesIndex = 0; entityBytes = null; } @@ -380,19 +381,23 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa if (header.flags(Http2FrameTypes.DATA).endOfStream()) { entityBytesIndex = 0; entityBytes = null; - currentStreamState.set(Http2StreamState.HALF_CLOSED_LOCAL); - incoming.onComplete(); + currentStreamState.set(Http2StreamState.HALF_CLOSED_REMOTE); + pipeline.clientEndStreamReceived(); } } catch (final Exception e) { // I have to propagate this error through the service interface, so it can respond to // errors in the connection, tear down resources, etc. It will also forward this on // to the client, causing the connection to be torn down. - incoming.onError(e); + pipeline.onError(e); } } - /** Close the connection with the client. May be called by different threads concurrently. */ - private void close() { + /** + * An error has occurred. Cancel the deadline future if it's still active, and set the stream state accordingly. + *

+ * May be called by different threads concurrently. + */ + private void error() { // Canceling a future that has already completed has no effect. So by canceling here, we are // saying: // "If you have not yet executed, never execute. If you have already executed, then just @@ -401,7 +406,7 @@ private void close() { // cancel is threadsafe deadlineFuture.cancel(false); - currentStreamState.set(Http2StreamState.HALF_CLOSED_LOCAL); + currentStreamState.set(Http2StreamState.CLOSED); } /** @@ -453,7 +458,7 @@ private ScheduledFuture scheduleDeadline(@NonNull final String timeout) { deadline, () -> { route.deadlineExceededCounter().increment(); - incoming.onError(new GrpcException(GrpcStatus.DEADLINE_EXCEEDED)); + pipeline.onError(new GrpcException(GrpcStatus.DEADLINE_EXCEEDED)); }); } @@ -650,13 +655,22 @@ public void onError(@NonNull final Throwable throwable) { LOGGER.log(ERROR, "Failed to send response", throwable); new TrailerBuilder().grpcStatus(GrpcStatus.INTERNAL).send(); } - close(); + error(); } @Override public void onComplete() { new TrailerBuilder().send(); - close(); + + deadlineFuture.cancel(false); + + currentStreamState.getAndUpdate( + currentValue -> { + if (requireNonNull(currentValue) == Http2StreamState.OPEN) { + return Http2StreamState.HALF_CLOSED_LOCAL; + } + return Http2StreamState.CLOSED; + }); } } diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java index 44c95465..ae4b4caf 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java @@ -18,6 +18,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; +import com.hedera.pbj.runtime.grpc.Pipeline; import com.hedera.pbj.runtime.grpc.Pipelines; import com.hedera.pbj.runtime.grpc.ServiceInterface; import com.hedera.pbj.runtime.io.buffer.Bytes; @@ -46,14 +47,14 @@ enum GreeterMethod implements Method { HelloReply sayHello(HelloRequest request); // A stream of messages coming from the client, with a single response from the server. - Flow.Subscriber sayHelloStreamRequest( + Pipeline sayHelloStreamRequest( Flow.Subscriber replies); // A single request from the client, with a stream of responses from the server. void sayHelloStreamReply(HelloRequest request, Flow.Subscriber replies); // A bidirectional stream of requests and responses between the client and the server. - Flow.Subscriber sayHelloStreamBidi( + Pipeline sayHelloStreamBidi( Flow.Subscriber replies); @NonNull @@ -73,7 +74,7 @@ default List methods() { @Override @NonNull - default Flow.Subscriber open( + default Pipeline open( final @NonNull Method method, final @NonNull RequestOptions options, final @NonNull Flow.Subscriber replies) { diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java index 0e28f30f..95705827 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java @@ -22,6 +22,7 @@ import com.google.protobuf.util.JsonFormat; import com.hedera.pbj.runtime.grpc.GrpcException; import com.hedera.pbj.runtime.grpc.GrpcStatus; +import com.hedera.pbj.runtime.grpc.Pipeline; import com.hedera.pbj.runtime.io.buffer.Bytes; import com.hedera.pbj.runtime.io.stream.ReadableStreamingData; import com.hedera.pbj.runtime.io.stream.WritableStreamingData; @@ -509,7 +510,7 @@ void exceptionThrownWhileOpening() { new GreeterAdapter() { @Override @NonNull - public Flow.Subscriber open( + public Pipeline open( @NonNull Method method, @NonNull RequestOptions options, @NonNull Flow.Subscriber replies) { @@ -804,10 +805,15 @@ public HelloReply sayHello(HelloRequest request) { // Streams of stuff coming from the client, with a single response. @Override - public Flow.Subscriber sayHelloStreamRequest( + public Pipeline sayHelloStreamRequest( Flow.Subscriber replies) { final var names = new ArrayList(); - return new Flow.Subscriber<>() { + return new Pipeline<>() { + @Override + public void clientEndStreamReceived() { + onComplete(); + } + @Override public void onSubscribe(Flow.Subscription subscription) { subscription.request(Long.MAX_VALUE); // turn off flow control @@ -846,11 +852,16 @@ public void sayHelloStreamReply( } @Override - public Flow.Subscriber sayHelloStreamBidi( + public Pipeline sayHelloStreamBidi( Flow.Subscriber replies) { // Here we receive info from the client. In this case, it is a stream of requests with // names. We will respond with a stream of replies. - return new Flow.Subscriber<>() { + return new Pipeline<>() { + @Override + public void clientEndStreamReceived() { + onComplete(); + } + @Override public void onSubscribe(Flow.Subscription subscription) { subscription.request(Long.MAX_VALUE); // turn off flow control @@ -882,7 +893,7 @@ default HelloReply sayHello(HelloRequest request) { } @Override - default Flow.Subscriber sayHelloStreamRequest( + default Pipeline sayHelloStreamRequest( Flow.Subscriber replies) { return null; } @@ -892,7 +903,7 @@ default void sayHelloStreamReply( HelloRequest request, Flow.Subscriber replies) {} @Override - default Flow.Subscriber sayHelloStreamBidi( + default Pipeline sayHelloStreamBidi( Flow.Subscriber replies) { return null; } @@ -909,7 +920,7 @@ public HelloReply sayHello(HelloRequest request) { @Override @NonNull - public Flow.Subscriber sayHelloStreamRequest( + public Pipeline sayHelloStreamRequest( Flow.Subscriber replies) { return svc.sayHelloStreamRequest(replies); } @@ -922,14 +933,14 @@ public void sayHelloStreamReply( @Override @NonNull - public Flow.Subscriber sayHelloStreamBidi( + public Pipeline sayHelloStreamBidi( Flow.Subscriber replies) { return svc.sayHelloStreamBidi(replies); } @Override @NonNull - public Flow.Subscriber open( + public Pipeline open( @NonNull Method method, @NonNull RequestOptions options, @NonNull Flow.Subscriber replies) { diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java new file mode 100644 index 00000000..3360cf9f --- /dev/null +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java @@ -0,0 +1,15 @@ +package com.hedera.pbj.runtime.grpc; + +import java.util.concurrent.Flow; + +/** + * Represents a pipeline of data that is being processed by a gRPC service. + * + * @param The subscribed item type + */ +public interface Pipeline extends Flow.Subscriber { + /** + * Called when an END_STREAM frame is received from the client. + */ + void clientEndStreamReceived(); +} diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java index 00822f96..a25f2bae 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java @@ -38,8 +38,13 @@ private Pipelines() { * * @return A No-op subscriber. */ - public static Flow.Subscriber noop() { - return new Flow.Subscriber<>() { + public static Pipeline noop() { + return new Pipeline<>() { + @Override + public void clientEndStreamReceived() { + // Nothing to do + } + private Flow.Subscription subscription; @Override public void onSubscribe(@NonNull final Flow.Subscription subscription) { @@ -157,12 +162,13 @@ public interface UnaryBuilder { UnaryBuilder respondTo(@NonNull Flow.Subscriber replies); /** - * Builds the pipeline and returns the subscriber that should be used to receive the incoming messages. + * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains + * the replies that are sent back to the client. * - * @return The subscriber to receive the incoming messages. + * @return the communication pipeline */ @NonNull - Flow.Subscriber build(); + Pipeline build(); } /** @@ -214,12 +220,13 @@ public interface BidiStreamingBuilder { BidiStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); /** - * Builds the pipeline and returns the subscriber that should be used to receive the incoming messages. + * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains + * the replies that are sent back to the client. * - * @return The subscriber to receive the incoming messages. + * @return the communication pipeline */ @NonNull - Flow.Subscriber build(); + Pipeline build(); } /** @@ -273,12 +280,13 @@ ClientStreamingBuilder method( ClientStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); /** - * Builds the pipeline and returns the subscriber that should be used to receive the incoming messages. + * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains + * the replies that are sent back to the client. * - * @return The subscriber to receive the incoming messages. + * @return the communication pipeline */ @NonNull - Flow.Subscriber build(); + Pipeline build(); } /** @@ -328,12 +336,13 @@ public interface ServerStreamingBuilder { ServerStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); /** - * Builds the pipeline and returns the subscriber that should be used to receive the incoming messages. + * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains + * the replies that are sent back to the client. * - * @return The subscriber to receive the incoming messages. + * @return the communication pipeline */ @NonNull - Flow.Subscriber build(); + Pipeline build(); } /** @@ -407,7 +416,7 @@ public interface BidiStreamingMethod * @param The type of the request message. * @param The type of the response message. */ - private abstract static class PipelineBuilderImpl implements Flow.Subscriber, Flow.Subscription { + private abstract static class PipelineBuilderImpl implements Pipeline, Flow.Subscription { protected ExceptionalFunction requestMapper; protected ExceptionalFunction responseMapper; protected Flow.Subscriber replies; @@ -501,7 +510,7 @@ public UnaryBuilder respondTo(@NonNull final Flow.Subscriber build() { + public Pipeline build() { validateParams(); if (method == null) { throw new IllegalStateException("The method must be specified."); @@ -532,6 +541,11 @@ public void onNext(@NonNull final Bytes message) { replies.onError(e); } } + + @Override + public void clientEndStreamReceived() { + // nothing to do, as onComplete is always called inside onNext + } } /** @@ -575,7 +589,7 @@ public BidiStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber build() { + public Pipeline build() { validateParams(); if (method == null) { throw new IllegalStateException("The method must be specified."); @@ -616,6 +630,12 @@ public void onComplete() { incoming.onComplete(); super.onComplete(); } + + @Override + public void clientEndStreamReceived() { + // if the client stream is ended, the entire pipeline is ended + onComplete(); + } } /** @@ -658,7 +678,7 @@ public ClientStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber @Override @NonNull - public Flow.Subscriber build() { + public Pipeline build() { validateParams(); if (method == null) { throw new IllegalStateException("The method must be specified."); @@ -694,6 +714,11 @@ public void onComplete() { incoming.onComplete(); super.onComplete(); } + + @Override + public void clientEndStreamReceived() { + onComplete(); + } } /** @@ -737,7 +762,7 @@ public ServerStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber @Override @NonNull - public Flow.Subscriber build() { + public Pipeline build() { validateParams(); if (method == null) { throw new IllegalStateException("The method must be specified."); @@ -763,6 +788,12 @@ public void onNext(@NonNull final Bytes message) { replies.onError(e); } } + + @Override + public void clientEndStreamReceived() { + // nothing to do + // the server will continue streaming, since the message coming from the client is a subscription request + } } /** diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java index 8775adc3..a7e11d67 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java @@ -141,7 +141,7 @@ interface RequestOptions { * @param responses The subscriber used by the service to push responses back to the client. */ @NonNull - Flow.Subscriber open( + Pipeline open( @NonNull Method method, @NonNull RequestOptions opts, @NonNull Flow.Subscriber responses) throws GrpcException; }