From 1d6360f3f4dd8d832c433a504670b27272dffc57 Mon Sep 17 00:00:00 2001 From: litt <102969658+litt3@users.noreply.github.com> Date: Mon, 21 Oct 2024 16:20:42 -0400 Subject: [PATCH 1/3] fix: Make server keep connection open after receiving ES (#305) Signed-off-by: litt --- .../pbj/grpc/helidon/PbjProtocolHandler.java | 50 +++++++++----- .../pbj/grpc/helidon/GreeterService.java | 7 +- .../com/hedera/pbj/grpc/helidon/PbjTest.java | 31 ++++++--- .../com/hedera/pbj/runtime/grpc/Pipeline.java | 15 ++++ .../hedera/pbj/runtime/grpc/Pipelines.java | 69 ++++++++++++++----- .../pbj/runtime/grpc/ServiceInterface.java | 2 +- 6 files changed, 123 insertions(+), 51 deletions(-) create mode 100644 pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java index a32181c9..837fb1c3 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java @@ -30,6 +30,7 @@ import com.hedera.pbj.grpc.helidon.config.PbjConfig; import com.hedera.pbj.runtime.grpc.GrpcException; import com.hedera.pbj.runtime.grpc.GrpcStatus; +import com.hedera.pbj.runtime.grpc.Pipeline; import com.hedera.pbj.runtime.grpc.ServiceInterface; import com.hedera.pbj.runtime.io.buffer.Bytes; import edu.umd.cs.findbugs.annotations.NonNull; @@ -137,14 +138,14 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa private int entityBytesIndex = 0; /** - * The subscriber that will receive incoming messages from the client. + * The communication pipeline between server and client * *

This member isn't final because it is set in the {@link #init()} method. It should not be * set at any other time. * *

Method calls on this object are thread-safe. */ - private Flow.Subscriber incoming; + private Pipeline pipeline; /** Create a new instance */ PbjProtocolHandler( @@ -267,27 +268,27 @@ public void init() { // Setup the subscribers. The "outgoing" subscriber will send messages to the client. // This is given to the "open" method on the service to allow it to send messages to // the client. - final var outgoing = new SendToClientSubscriber(); - incoming = route.service().open(route.method(), options, outgoing); + final Flow.Subscriber outgoing = new SendToClientSubscriber(); + pipeline = route.service().open(route.method(), options, outgoing); } catch (final GrpcException grpcException) { route.failedGrpcRequestCounter().increment(); new TrailerOnlyBuilder() .grpcStatus(grpcException.status()) .statusMessage(grpcException.getMessage()) .send(); - close(); + error(); } catch (final HttpException httpException) { route.failedHttpRequestCounter().increment(); new TrailerOnlyBuilder() .httpStatus(httpException.status()) .grpcStatus(GrpcStatus.INVALID_ARGUMENT) .send(); - close(); + error(); } catch (final Exception unknown) { route.failedUnknownRequestCounter().increment(); LOGGER.log(ERROR, "Failed to initialize grpc protocol handler", unknown); new TrailerOnlyBuilder().grpcStatus(GrpcStatus.UNKNOWN).send(); - close(); + error(); } } @@ -299,7 +300,7 @@ public Http2StreamState streamState() { @Override public void rstStream(@NonNull final Http2RstStream rstStream) { - // Nothing to do + pipeline.onComplete(); } @Override @@ -368,7 +369,7 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa if (entityBytesIndex == entityBytes.length) { // Grab and wrap the bytes and reset to being reading the next message final var bytes = Bytes.wrap(entityBytes); - incoming.onNext(bytes); + pipeline.onNext(bytes); entityBytesIndex = 0; entityBytes = null; } @@ -380,19 +381,23 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa if (header.flags(Http2FrameTypes.DATA).endOfStream()) { entityBytesIndex = 0; entityBytes = null; - currentStreamState.set(Http2StreamState.HALF_CLOSED_LOCAL); - incoming.onComplete(); + currentStreamState.set(Http2StreamState.HALF_CLOSED_REMOTE); + pipeline.clientEndStreamReceived(); } } catch (final Exception e) { // I have to propagate this error through the service interface, so it can respond to // errors in the connection, tear down resources, etc. It will also forward this on // to the client, causing the connection to be torn down. - incoming.onError(e); + pipeline.onError(e); } } - /** Close the connection with the client. May be called by different threads concurrently. */ - private void close() { + /** + * An error has occurred. Cancel the deadline future if it's still active, and set the stream state accordingly. + *

+ * May be called by different threads concurrently. + */ + private void error() { // Canceling a future that has already completed has no effect. So by canceling here, we are // saying: // "If you have not yet executed, never execute. If you have already executed, then just @@ -401,7 +406,7 @@ private void close() { // cancel is threadsafe deadlineFuture.cancel(false); - currentStreamState.set(Http2StreamState.HALF_CLOSED_LOCAL); + currentStreamState.set(Http2StreamState.CLOSED); } /** @@ -453,7 +458,7 @@ private ScheduledFuture scheduleDeadline(@NonNull final String timeout) { deadline, () -> { route.deadlineExceededCounter().increment(); - incoming.onError(new GrpcException(GrpcStatus.DEADLINE_EXCEEDED)); + pipeline.onError(new GrpcException(GrpcStatus.DEADLINE_EXCEEDED)); }); } @@ -650,13 +655,22 @@ public void onError(@NonNull final Throwable throwable) { LOGGER.log(ERROR, "Failed to send response", throwable); new TrailerBuilder().grpcStatus(GrpcStatus.INTERNAL).send(); } - close(); + error(); } @Override public void onComplete() { new TrailerBuilder().send(); - close(); + + deadlineFuture.cancel(false); + + currentStreamState.getAndUpdate( + currentValue -> { + if (requireNonNull(currentValue) == Http2StreamState.OPEN) { + return Http2StreamState.HALF_CLOSED_LOCAL; + } + return Http2StreamState.CLOSED; + }); } } diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java index 44c95465..ae4b4caf 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java @@ -18,6 +18,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; +import com.hedera.pbj.runtime.grpc.Pipeline; import com.hedera.pbj.runtime.grpc.Pipelines; import com.hedera.pbj.runtime.grpc.ServiceInterface; import com.hedera.pbj.runtime.io.buffer.Bytes; @@ -46,14 +47,14 @@ enum GreeterMethod implements Method { HelloReply sayHello(HelloRequest request); // A stream of messages coming from the client, with a single response from the server. - Flow.Subscriber sayHelloStreamRequest( + Pipeline sayHelloStreamRequest( Flow.Subscriber replies); // A single request from the client, with a stream of responses from the server. void sayHelloStreamReply(HelloRequest request, Flow.Subscriber replies); // A bidirectional stream of requests and responses between the client and the server. - Flow.Subscriber sayHelloStreamBidi( + Pipeline sayHelloStreamBidi( Flow.Subscriber replies); @NonNull @@ -73,7 +74,7 @@ default List methods() { @Override @NonNull - default Flow.Subscriber open( + default Pipeline open( final @NonNull Method method, final @NonNull RequestOptions options, final @NonNull Flow.Subscriber replies) { diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java index 0e28f30f..95705827 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java @@ -22,6 +22,7 @@ import com.google.protobuf.util.JsonFormat; import com.hedera.pbj.runtime.grpc.GrpcException; import com.hedera.pbj.runtime.grpc.GrpcStatus; +import com.hedera.pbj.runtime.grpc.Pipeline; import com.hedera.pbj.runtime.io.buffer.Bytes; import com.hedera.pbj.runtime.io.stream.ReadableStreamingData; import com.hedera.pbj.runtime.io.stream.WritableStreamingData; @@ -509,7 +510,7 @@ void exceptionThrownWhileOpening() { new GreeterAdapter() { @Override @NonNull - public Flow.Subscriber open( + public Pipeline open( @NonNull Method method, @NonNull RequestOptions options, @NonNull Flow.Subscriber replies) { @@ -804,10 +805,15 @@ public HelloReply sayHello(HelloRequest request) { // Streams of stuff coming from the client, with a single response. @Override - public Flow.Subscriber sayHelloStreamRequest( + public Pipeline sayHelloStreamRequest( Flow.Subscriber replies) { final var names = new ArrayList(); - return new Flow.Subscriber<>() { + return new Pipeline<>() { + @Override + public void clientEndStreamReceived() { + onComplete(); + } + @Override public void onSubscribe(Flow.Subscription subscription) { subscription.request(Long.MAX_VALUE); // turn off flow control @@ -846,11 +852,16 @@ public void sayHelloStreamReply( } @Override - public Flow.Subscriber sayHelloStreamBidi( + public Pipeline sayHelloStreamBidi( Flow.Subscriber replies) { // Here we receive info from the client. In this case, it is a stream of requests with // names. We will respond with a stream of replies. - return new Flow.Subscriber<>() { + return new Pipeline<>() { + @Override + public void clientEndStreamReceived() { + onComplete(); + } + @Override public void onSubscribe(Flow.Subscription subscription) { subscription.request(Long.MAX_VALUE); // turn off flow control @@ -882,7 +893,7 @@ default HelloReply sayHello(HelloRequest request) { } @Override - default Flow.Subscriber sayHelloStreamRequest( + default Pipeline sayHelloStreamRequest( Flow.Subscriber replies) { return null; } @@ -892,7 +903,7 @@ default void sayHelloStreamReply( HelloRequest request, Flow.Subscriber replies) {} @Override - default Flow.Subscriber sayHelloStreamBidi( + default Pipeline sayHelloStreamBidi( Flow.Subscriber replies) { return null; } @@ -909,7 +920,7 @@ public HelloReply sayHello(HelloRequest request) { @Override @NonNull - public Flow.Subscriber sayHelloStreamRequest( + public Pipeline sayHelloStreamRequest( Flow.Subscriber replies) { return svc.sayHelloStreamRequest(replies); } @@ -922,14 +933,14 @@ public void sayHelloStreamReply( @Override @NonNull - public Flow.Subscriber sayHelloStreamBidi( + public Pipeline sayHelloStreamBidi( Flow.Subscriber replies) { return svc.sayHelloStreamBidi(replies); } @Override @NonNull - public Flow.Subscriber open( + public Pipeline open( @NonNull Method method, @NonNull RequestOptions options, @NonNull Flow.Subscriber replies) { diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java new file mode 100644 index 00000000..3360cf9f --- /dev/null +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java @@ -0,0 +1,15 @@ +package com.hedera.pbj.runtime.grpc; + +import java.util.concurrent.Flow; + +/** + * Represents a pipeline of data that is being processed by a gRPC service. + * + * @param The subscribed item type + */ +public interface Pipeline extends Flow.Subscriber { + /** + * Called when an END_STREAM frame is received from the client. + */ + void clientEndStreamReceived(); +} diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java index 00822f96..a25f2bae 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java @@ -38,8 +38,13 @@ private Pipelines() { * * @return A No-op subscriber. */ - public static Flow.Subscriber noop() { - return new Flow.Subscriber<>() { + public static Pipeline noop() { + return new Pipeline<>() { + @Override + public void clientEndStreamReceived() { + // Nothing to do + } + private Flow.Subscription subscription; @Override public void onSubscribe(@NonNull final Flow.Subscription subscription) { @@ -157,12 +162,13 @@ public interface UnaryBuilder { UnaryBuilder respondTo(@NonNull Flow.Subscriber replies); /** - * Builds the pipeline and returns the subscriber that should be used to receive the incoming messages. + * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains + * the replies that are sent back to the client. * - * @return The subscriber to receive the incoming messages. + * @return the communication pipeline */ @NonNull - Flow.Subscriber build(); + Pipeline build(); } /** @@ -214,12 +220,13 @@ public interface BidiStreamingBuilder { BidiStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); /** - * Builds the pipeline and returns the subscriber that should be used to receive the incoming messages. + * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains + * the replies that are sent back to the client. * - * @return The subscriber to receive the incoming messages. + * @return the communication pipeline */ @NonNull - Flow.Subscriber build(); + Pipeline build(); } /** @@ -273,12 +280,13 @@ ClientStreamingBuilder method( ClientStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); /** - * Builds the pipeline and returns the subscriber that should be used to receive the incoming messages. + * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains + * the replies that are sent back to the client. * - * @return The subscriber to receive the incoming messages. + * @return the communication pipeline */ @NonNull - Flow.Subscriber build(); + Pipeline build(); } /** @@ -328,12 +336,13 @@ public interface ServerStreamingBuilder { ServerStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); /** - * Builds the pipeline and returns the subscriber that should be used to receive the incoming messages. + * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains + * the replies that are sent back to the client. * - * @return The subscriber to receive the incoming messages. + * @return the communication pipeline */ @NonNull - Flow.Subscriber build(); + Pipeline build(); } /** @@ -407,7 +416,7 @@ public interface BidiStreamingMethod * @param The type of the request message. * @param The type of the response message. */ - private abstract static class PipelineBuilderImpl implements Flow.Subscriber, Flow.Subscription { + private abstract static class PipelineBuilderImpl implements Pipeline, Flow.Subscription { protected ExceptionalFunction requestMapper; protected ExceptionalFunction responseMapper; protected Flow.Subscriber replies; @@ -501,7 +510,7 @@ public UnaryBuilder respondTo(@NonNull final Flow.Subscriber build() { + public Pipeline build() { validateParams(); if (method == null) { throw new IllegalStateException("The method must be specified."); @@ -532,6 +541,11 @@ public void onNext(@NonNull final Bytes message) { replies.onError(e); } } + + @Override + public void clientEndStreamReceived() { + // nothing to do, as onComplete is always called inside onNext + } } /** @@ -575,7 +589,7 @@ public BidiStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber build() { + public Pipeline build() { validateParams(); if (method == null) { throw new IllegalStateException("The method must be specified."); @@ -616,6 +630,12 @@ public void onComplete() { incoming.onComplete(); super.onComplete(); } + + @Override + public void clientEndStreamReceived() { + // if the client stream is ended, the entire pipeline is ended + onComplete(); + } } /** @@ -658,7 +678,7 @@ public ClientStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber @Override @NonNull - public Flow.Subscriber build() { + public Pipeline build() { validateParams(); if (method == null) { throw new IllegalStateException("The method must be specified."); @@ -694,6 +714,11 @@ public void onComplete() { incoming.onComplete(); super.onComplete(); } + + @Override + public void clientEndStreamReceived() { + onComplete(); + } } /** @@ -737,7 +762,7 @@ public ServerStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber @Override @NonNull - public Flow.Subscriber build() { + public Pipeline build() { validateParams(); if (method == null) { throw new IllegalStateException("The method must be specified."); @@ -763,6 +788,12 @@ public void onNext(@NonNull final Bytes message) { replies.onError(e); } } + + @Override + public void clientEndStreamReceived() { + // nothing to do + // the server will continue streaming, since the message coming from the client is a subscription request + } } /** diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java index 8775adc3..a7e11d67 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java @@ -141,7 +141,7 @@ interface RequestOptions { * @param responses The subscriber used by the service to push responses back to the client. */ @NonNull - Flow.Subscriber open( + Pipeline open( @NonNull Method method, @NonNull RequestOptions opts, @NonNull Flow.Subscriber responses) throws GrpcException; } From b9cfdc13cade459f23e5bb39fdad358e315ba929 Mon Sep 17 00:00:00 2001 From: jasperpotts Date: Wed, 23 Oct 2024 16:19:07 -0700 Subject: [PATCH 2/3] Fixed issue of ArrayOutOfBoundsExceptions when trying to read 4 byte integer for entity length from buffer with less than 4 bytes avilable. Signed-off-by: jasperpotts --- .../pbj/grpc/helidon/PbjProtocolHandler.java | 165 ++++++++++++------ 1 file changed, 114 insertions(+), 51 deletions(-) diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java index 837fb1c3..28f8b8a4 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java @@ -137,6 +137,30 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa */ private int entityBytesIndex = 0; + /** States for currentReadState state ,machine */ + enum ReadState { + /** + * Start state, when we are looking for first byte that says if data is compressed or not + */ + START, + /** + * State were we are reading length, can be partial length of final point when we have all + * length bytes + */ + READ_LENGTH, + /** State where we are reading the protobuf entity bytes */ + READ_ENTITY_BYTES + } + + /** State machine as we read bytes from incoming data */ + private ReadState currentReadState = ReadState.START; + + /** Number of read bytes between 0 and {@code Integer.BYTES} = 4 */ + private int numOfPartReadBytes = 0; + + /** Byte array to store bytes as we build up to a full 4 byte integer */ + private final byte[] partReadLengthBytes = new byte[Integer.BYTES]; + /** * The communication pipeline between server and client * @@ -324,54 +348,92 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa // There is some asynchronous behavior here, but in the worst case, we handle a few more // bytes before the stream is closed. while (data.available() > 0) { - // First chunk of data contains the compression flag and the length of the message - if (entityBytes == null) { - // Read whether this message is compressed. We do not currently support - // compression. - final var isCompressed = (data.read() == 1); - if (isCompressed) { - // The error will eventually result in the stream being closed - throw new GrpcException( - GrpcStatus.UNIMPLEMENTED, "Compression is not supported"); - } - // Read the length of the message. As per the grpc protocol specification, each - // message on the wire is prefixed with the number of bytes for the message. - // However, to prevent a DOS attack where the attacker sends us a very large - // length and exhausts our memory, we have a maximum message size configuration - // setting. Using that, we can detect attempts to exhaust our memory. - final long length = data.readUnsignedInt32(); - if (length > config.maxMessageSizeBytes()) { - throw new GrpcException( - GrpcStatus.INVALID_ARGUMENT, - "Message size exceeds maximum allowed size"); - } - // Create a buffer to hold the message. We sadly cannot reuse this buffer - // because once we have filled it and wrapped it in Bytes and sent it to the - // handler, some user code may grab and hold that Bytes object for an arbitrary - // amount of time, and if we were to scribble into the same byte array, we - // would break the application. So we need a new buffer each time :-( - entityBytes = new byte[(int) length]; - entityBytesIndex = 0; - } - - // By the time we get here, entityBytes is no longer null. It may be empty, or it - // may already have been partially populated from a previous iteration. It may be - // that the number of bytes available to be read is larger than just this one - // message. So we need to be careful to read, from what is available, only up to - // the message length, and to leave the rest for the next iteration. - final int available = data.available(); - final int numBytesToRead = - Math.min(entityBytes.length - entityBytesIndex, available); - data.read(entityBytes, entityBytesIndex, numBytesToRead); - entityBytesIndex += numBytesToRead; - - // If we have completed reading the message, then we can proceed. - if (entityBytesIndex == entityBytes.length) { - // Grab and wrap the bytes and reset to being reading the next message - final var bytes = Bytes.wrap(entityBytes); - pipeline.onNext(bytes); - entityBytesIndex = 0; - entityBytes = null; + switch (currentReadState) { + case START: + { + // Read whether this message is compressed. We do not currently support + // compression. + final var isCompressed = (data.read() == 1); + if (isCompressed) { + // The error will eventually result in the stream being closed + throw new GrpcException( + GrpcStatus.UNIMPLEMENTED, "Compression is not supported"); + } + currentReadState = ReadState.READ_LENGTH; + numOfPartReadBytes = 0; + break; + } + case READ_LENGTH: + { + // if I have not read a full int yet then read more from available bytes + if (numOfPartReadBytes < Integer.BYTES) { + // we do not have enough bytes yet to read a 4 byte int + // read the bytes we do have and store them for next time + final int bytesToRead = + Math.min( + data.available(), + Integer.BYTES - numOfPartReadBytes); + data.read(partReadLengthBytes, numOfPartReadBytes, bytesToRead); + numOfPartReadBytes += bytesToRead; + } + // check if we have read all the 4 bytes of the length int32 + if (numOfPartReadBytes == Integer.BYTES) { + final long length = + ((long) partReadLengthBytes[0] & 0xFF) << 24 + | ((long) partReadLengthBytes[1] & 0xFF) << 16 + | ((long) partReadLengthBytes[2] & 0xFF) << 8 + | ((long) partReadLengthBytes[3] & 0xFF); + if (length > config.maxMessageSizeBytes()) { + throw new GrpcException( + GrpcStatus.INVALID_ARGUMENT, + "Message size exceeds maximum allowed size"); + } + // Create a buffer to hold the message. We sadly cannot reuse this + // buffer + // because once we have filled it and wrapped it in Bytes and sent + // it to the + // handler, some user code may grab and hold that Bytes object for + // an arbitrary + // amount of time, and if we were to scribble into the same byte + // array, we + // would break the application. So we need a new buffer each time + // :-( + entityBytes = new byte[(int) length]; + entityBytesIndex = 0; + // done with length now, so move on to next state + currentReadState = ReadState.READ_ENTITY_BYTES; + } + break; + } + case READ_ENTITY_BYTES: + { + // By the time we get here, entityBytes is no longer null. It may be + // empty, or it + // may already have been partially populated from a previous iteration. + // It may be + // that the number of bytes available to be read is larger than just + // this one + // message. So we need to be careful to read, from what is available, + // only up to + // the message length, and to leave the rest for the next iteration. + final int available = data.available(); + final int numBytesToRead = + Math.min(entityBytes.length - entityBytesIndex, available); + data.read(entityBytes, entityBytesIndex, numBytesToRead); + entityBytesIndex += numBytesToRead; + + // If we have completed reading the message, then we can proceed. + if (entityBytesIndex == entityBytes.length) { + currentReadState = ReadState.START; + // Grab and wrap the bytes and reset to being reading the next + // message + final var bytes = Bytes.wrap(entityBytes); + pipeline.onNext(bytes); + entityBytesIndex = 0; + entityBytes = null; + } + break; + } } } @@ -393,9 +455,10 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa } /** - * An error has occurred. Cancel the deadline future if it's still active, and set the stream state accordingly. - *

- * May be called by different threads concurrently. + * An error has occurred. Cancel the deadline future if it's still active, and set the stream + * state accordingly. + * + *

May be called by different threads concurrently. */ private void error() { // Canceling a future that has already completed has no effect. So by canceling here, we are From ff71839f85d4a15de7bae5d2817c0a0d84475f37 Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Mon, 4 Nov 2024 10:36:37 -0700 Subject: [PATCH 3/3] fix: First draft of registering a callback to remedy 00303 Signed-off-by: Matt Peterson --- .../pbj/grpc/helidon/PbjProtocolHandler.java | 18 ++++- .../pbj/grpc/helidon/GreeterService.java | 10 +-- .../com/hedera/pbj/grpc/helidon/PbjTest.java | 32 ++++++--- .../com/hedera/pbj/runtime/grpc/Pipeline.java | 23 ++++++- .../hedera/pbj/runtime/grpc/Pipelines.java | 67 ++++++++++++++----- .../pbj/runtime/grpc/ServiceInterface.java | 2 +- .../pbj/runtime/grpc/PipelinesTest.java | 26 ++++--- 7 files changed, 131 insertions(+), 47 deletions(-) diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java index 28f8b8a4..9e62707f 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java @@ -292,7 +292,7 @@ public void init() { // Setup the subscribers. The "outgoing" subscriber will send messages to the client. // This is given to the "open" method on the service to allow it to send messages to // the client. - final Flow.Subscriber outgoing = new SendToClientSubscriber(); + final Pipeline outgoing = new SendToClientSubscriber(); pipeline = route.service().open(route.method(), options, outgoing); } catch (final GrpcException grpcException) { route.failedGrpcRequestCounter().increment(); @@ -678,7 +678,10 @@ protected void send( * The implementation of {@link Flow.Subscriber} used to send messages to the client. It * receives bytes from the handlers to send to the client. */ - private final class SendToClientSubscriber implements Flow.Subscriber { + private final class SendToClientSubscriber implements Pipeline { + + private Runnable onCancelRunnable; + @Override public void onSubscribe(@NonNull final Flow.Subscription subscription) { // FUTURE: Add support for flow control @@ -704,6 +707,9 @@ public void onNext(@NonNull final Bytes response) { new Http2FrameData(header, bufferData), flowControl.outbound()); } catch (final Exception e) { LOGGER.log(ERROR, "Failed to respond to grpc request: " + route.method(), e); + if (onCancelRunnable != null) { + onCancelRunnable.run(); + } } } @@ -735,6 +741,14 @@ public void onComplete() { return Http2StreamState.CLOSED; }); } + + @Override + public void clientEndStreamReceived() {} + + @Override + public void registerCallbackHandler(Runnable runnable) { + onCancelRunnable = runnable; + } } /** Simple implementation of the {@link ServiceInterface.RequestOptions} interface. */ diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java index ae4b4caf..01ac4759 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java @@ -25,10 +25,10 @@ import edu.umd.cs.findbugs.annotations.NonNull; import greeter.HelloReply; import greeter.HelloRequest; + import java.util.Arrays; import java.util.List; import java.util.Objects; -import java.util.concurrent.Flow; /** * This service doesn't rely on any PBJ objects, because the build right now doesn't have a good way @@ -48,14 +48,14 @@ enum GreeterMethod implements Method { // A stream of messages coming from the client, with a single response from the server. Pipeline sayHelloStreamRequest( - Flow.Subscriber replies); + Pipeline replies); // A single request from the client, with a stream of responses from the server. - void sayHelloStreamReply(HelloRequest request, Flow.Subscriber replies); + void sayHelloStreamReply(HelloRequest request, Pipeline replies); // A bidirectional stream of requests and responses between the client and the server. Pipeline sayHelloStreamBidi( - Flow.Subscriber replies); + Pipeline replies); @NonNull default String serviceName() { @@ -77,7 +77,7 @@ default List methods() { default Pipeline open( final @NonNull Method method, final @NonNull RequestOptions options, - final @NonNull Flow.Subscriber replies) { + final @NonNull Pipeline replies) { final var m = (GreeterMethod) method; try { diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java index 95705827..36e92671 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java @@ -513,7 +513,7 @@ void exceptionThrownWhileOpening() { public Pipeline open( @NonNull Method method, @NonNull RequestOptions options, - @NonNull Flow.Subscriber replies) { + @NonNull Pipeline replies) { throw ex; } }; @@ -806,7 +806,7 @@ public HelloReply sayHello(HelloRequest request) { // Streams of stuff coming from the client, with a single response. @Override public Pipeline sayHelloStreamRequest( - Flow.Subscriber replies) { + Pipeline replies) { final var names = new ArrayList(); return new Pipeline<>() { @Override @@ -814,6 +814,11 @@ public void clientEndStreamReceived() { onComplete(); } + @Override + public void registerCallbackHandler(Runnable runnable) { + + } + @Override public void onSubscribe(Flow.Subscription subscription) { subscription.request(Long.MAX_VALUE); // turn off flow control @@ -843,7 +848,7 @@ public void onComplete() { @Override public void sayHelloStreamReply( - HelloRequest request, Flow.Subscriber replies) { + HelloRequest request, Pipeline replies) { for (int i = 0; i < 10; i++) { replies.onNext(HelloReply.newBuilder().setMessage("Hello!").build()); } @@ -853,7 +858,7 @@ public void sayHelloStreamReply( @Override public Pipeline sayHelloStreamBidi( - Flow.Subscriber replies) { + Pipeline replies) { // Here we receive info from the client. In this case, it is a stream of requests with // names. We will respond with a stream of replies. return new Pipeline<>() { @@ -862,6 +867,11 @@ public void clientEndStreamReceived() { onComplete(); } + @Override + public void registerCallbackHandler(Runnable runnable) { + + } + @Override public void onSubscribe(Flow.Subscription subscription) { subscription.request(Long.MAX_VALUE); // turn off flow control @@ -894,17 +904,17 @@ default HelloReply sayHello(HelloRequest request) { @Override default Pipeline sayHelloStreamRequest( - Flow.Subscriber replies) { + Pipeline replies) { return null; } @Override default void sayHelloStreamReply( - HelloRequest request, Flow.Subscriber replies) {} + HelloRequest request, Pipeline replies) {} @Override default Pipeline sayHelloStreamBidi( - Flow.Subscriber replies) { + Pipeline replies) { return null; } } @@ -921,20 +931,20 @@ public HelloReply sayHello(HelloRequest request) { @Override @NonNull public Pipeline sayHelloStreamRequest( - Flow.Subscriber replies) { + Pipeline replies) { return svc.sayHelloStreamRequest(replies); } @Override public void sayHelloStreamReply( - HelloRequest request, Flow.Subscriber replies) { + HelloRequest request, Pipeline replies) { svc.sayHelloStreamReply(request, replies); } @Override @NonNull public Pipeline sayHelloStreamBidi( - Flow.Subscriber replies) { + Pipeline replies) { return svc.sayHelloStreamBidi(replies); } @@ -943,7 +953,7 @@ public Pipeline sayHelloStreamBidi( public Pipeline open( @NonNull Method method, @NonNull RequestOptions options, - @NonNull Flow.Subscriber replies) { + @NonNull Pipeline replies) { return svc.open(method, options, replies); } } diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java index 3360cf9f..69482be3 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java @@ -1,3 +1,19 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.hedera.pbj.runtime.grpc; import java.util.concurrent.Flow; @@ -8,8 +24,9 @@ * @param The subscribed item type */ public interface Pipeline extends Flow.Subscriber { - /** - * Called when an END_STREAM frame is received from the client. - */ + /** Called when an END_STREAM frame is received from the client. */ void clientEndStreamReceived(); + + /** Called when an exception is thrown */ + void registerCallbackHandler(Runnable runnable); } diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java index a25f2bae..782503d0 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java @@ -45,6 +45,9 @@ public void clientEndStreamReceived() { // Nothing to do } + @Override + public void registerCallbackHandler(Runnable runnable) {} + private Flow.Subscription subscription; @Override public void onSubscribe(@NonNull final Flow.Subscription subscription) { @@ -159,7 +162,7 @@ public interface UnaryBuilder { * @return This builder. */ @NonNull - UnaryBuilder respondTo(@NonNull Flow.Subscriber replies); + UnaryBuilder respondTo(@NonNull Pipeline replies); /** * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains @@ -217,7 +220,7 @@ public interface BidiStreamingBuilder { * @return This builder. */ @NonNull - BidiStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); + BidiStreamingBuilder respondTo(@NonNull Pipeline replies); /** * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains @@ -277,7 +280,7 @@ ClientStreamingBuilder method( * @return This builder. */ @NonNull - ClientStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); + ClientStreamingBuilder respondTo(@NonNull Pipeline replies); /** * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains @@ -333,7 +336,7 @@ public interface ServerStreamingBuilder { * @return This builder. */ @NonNull - ServerStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); + ServerStreamingBuilder respondTo(@NonNull Pipeline replies); /** * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains @@ -373,7 +376,7 @@ public interface ExceptionalFunction { */ @FunctionalInterface public interface ClientStreamingMethod - extends ExceptionalFunction, Flow.Subscriber> {} + extends ExceptionalFunction, Pipeline> {} /** * A function that handles a server streaming gRPC service method. A single request is received from the client, @@ -390,7 +393,7 @@ public interface ServerStreamingMethod { * @param replies The subscriber to send responses to. * @throws Exception If an error occurs during processing. */ - void apply(@NonNull T request, @NonNull Flow.Subscriber replies) throws Exception; + void apply(@NonNull T request, @NonNull Pipeline replies) throws Exception; } /** @@ -401,7 +404,7 @@ public interface ServerStreamingMethod { * @param The type of the response message. */ public interface BidiStreamingMethod - extends ExceptionalFunction, Flow.Subscriber> {} + extends ExceptionalFunction, Pipeline> {} /** * A convenient base class for the different builders. All builders have to hold state for request and @@ -419,7 +422,7 @@ public interface BidiStreamingMethod private abstract static class PipelineBuilderImpl implements Pipeline, Flow.Subscription { protected ExceptionalFunction requestMapper; protected ExceptionalFunction responseMapper; - protected Flow.Subscriber replies; + protected Pipeline replies; private Flow.Subscription sourceSubscription; protected boolean completed = false; @@ -503,7 +506,7 @@ public UnaryBuilder mapResponse(@NonNull final ExceptionalFunction respondTo(@NonNull final Flow.Subscriber replies) { + public UnaryBuilder respondTo(@NonNull final Pipeline replies) { this.replies = requireNonNull(replies); return this; } @@ -546,6 +549,11 @@ public void onNext(@NonNull final Bytes message) { public void clientEndStreamReceived() { // nothing to do, as onComplete is always called inside onNext } + + @Override + public void registerCallbackHandler(Runnable runnable) { + + } } /** @@ -557,7 +565,7 @@ public void clientEndStreamReceived() { private static final class BidiStreamingBuilderImpl extends PipelineBuilderImpl implements BidiStreamingBuilder { private BidiStreamingMethod method; - private Flow.Subscriber incoming; + private Pipeline incoming; @Override @NonNull @@ -582,7 +590,7 @@ public BidiStreamingBuilderImpl mapResponse(@NonNull final ExceptionalFunc @Override @NonNull - public BidiStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber replies) { + public BidiStreamingBuilderImpl respondTo(@NonNull final Pipeline replies) { this.replies = replies; return this; } @@ -636,6 +644,11 @@ public void clientEndStreamReceived() { // if the client stream is ended, the entire pipeline is ended onComplete(); } + + @Override + public void registerCallbackHandler(Runnable runnable) { + + } } /** @@ -646,7 +659,7 @@ public void clientEndStreamReceived() { private static final class ClientStreamingBuilderImpl extends PipelineBuilderImpl implements ClientStreamingBuilder { private ClientStreamingMethod method; - private Flow.Subscriber incoming; + private Pipeline incoming; @Override @NonNull @@ -671,7 +684,7 @@ public ClientStreamingBuilderImpl mapResponse(@NonNull final ExceptionalFu @Override @NonNull - public ClientStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber replies) { + public ClientStreamingBuilderImpl respondTo(@NonNull final Pipeline replies) { this.replies = replies; return this; } @@ -719,6 +732,11 @@ public void onComplete() { public void clientEndStreamReceived() { onComplete(); } + + @Override + public void registerCallbackHandler(Runnable runnable) { + + } } /** @@ -730,7 +748,7 @@ public void clientEndStreamReceived() { private static final class ServerStreamingBuilderImpl extends PipelineBuilderImpl implements ServerStreamingBuilder { private ServerStreamingMethod method; - private Flow.Subscriber responseConverter; + private Pipeline responseConverter; @Override @NonNull @@ -755,7 +773,7 @@ public ServerStreamingBuilderImpl mapResponse(@NonNull final ExceptionalFu @Override @NonNull - public ServerStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber replies) { + public ServerStreamingBuilderImpl respondTo(@NonNull final Pipeline replies) { this.replies = replies; return this; } @@ -794,6 +812,11 @@ public void clientEndStreamReceived() { // nothing to do // the server will continue streaming, since the message coming from the client is a subscription request } + + @Override + public void registerCallbackHandler(Runnable runnable) { + + } } /** @@ -805,8 +828,8 @@ public void clientEndStreamReceived() { * @param The type of the input. * @param The type of the output. */ - private record MapSubscriber(Flow.Subscriber next, ExceptionalFunction mapper) - implements Flow.Subscriber, Flow.Subscription { + private record MapSubscriber(Pipeline next, ExceptionalFunction mapper) + implements Pipeline, Flow.Subscription { private MapSubscriber { next.onSubscribe(this); @@ -846,5 +869,15 @@ public void onError(Throwable throwable) { public void onComplete() { next.onComplete(); } + + @Override + public void clientEndStreamReceived() { + + } + + @Override + public void registerCallbackHandler(Runnable runnable) { + next.registerCallbackHandler(runnable); + } } } diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java index a7e11d67..367164a2 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java @@ -142,6 +142,6 @@ interface RequestOptions { */ @NonNull Pipeline open( - @NonNull Method method, @NonNull RequestOptions opts, @NonNull Flow.Subscriber responses) + @NonNull Method method, @NonNull RequestOptions opts, @NonNull Pipeline responses) throws GrpcException; } diff --git a/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/grpc/PipelinesTest.java b/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/grpc/PipelinesTest.java index 7acd3900..921d224b 100644 --- a/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/grpc/PipelinesTest.java +++ b/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/grpc/PipelinesTest.java @@ -71,7 +71,7 @@ void noopOnCompleteDoesNothing() { @Nested @ExtendWith(MockitoExtension.class) class UnaryTest { - @Mock Flow.Subscriber replies; + @Mock Pipeline replies; @Mock Flow.Subscription subscription; @Test @@ -184,8 +184,8 @@ void positive() { @Nested @ExtendWith(MockitoExtension.class) class BidiTest { - @Mock Flow.Subscriber client; - @Mock Flow.Subscriber replies; + @Mock Pipeline client; + @Mock Pipeline replies; @Mock Flow.Subscription subscription; @Test @@ -341,7 +341,7 @@ void positive() { @Nested @ExtendWith(MockitoExtension.class) class ServerStreamingTest { - @Mock Flow.Subscriber replies; + @Mock Pipeline replies; @Mock Flow.Subscription subscription; @Test @@ -470,7 +470,7 @@ void positive() { @Nested @ExtendWith(MockitoExtension.class) class ClientStreamingTest { - @Mock Flow.Subscriber replies; + @Mock Pipeline replies; @Mock Flow.Subscription subscription; @Test @@ -595,11 +595,11 @@ void positive() { verify(replies).onNext(Bytes.wrap("hello:world")); } - private static final class ConcatenatingHandler implements Flow.Subscriber { + private static final class ConcatenatingHandler implements Pipeline { private final List strings = new ArrayList<>(); - private final Flow.Subscriber sink; + private final Pipeline sink; - private ConcatenatingHandler(Flow.Subscriber sink) { + private ConcatenatingHandler(Pipeline sink) { this.sink = sink; } @@ -622,6 +622,16 @@ public void onError(Throwable throwable) { public void onComplete() { sink.onNext(String.join(":", strings)); } + + @Override + public void clientEndStreamReceived() { + + } + + @Override + public void registerCallbackHandler(Runnable runnable) { + + } } } }