From aa392ce4b0e2ce81d1fc9f85563a41cc9b8c9c0e Mon Sep 17 00:00:00 2001 From: Matt Peterson Date: Sat, 2 Nov 2024 08:59:09 -0600 Subject: [PATCH] fix: First draft of callback registration Signed-off-by: Matt Peterson --- .../pbj/grpc/helidon/PbjProtocolHandler.java | 18 ++++- .../pbj/grpc/helidon/GreeterService.java | 10 +-- .../com/hedera/pbj/grpc/helidon/PbjTest.java | 32 ++++++--- .../com/hedera/pbj/runtime/grpc/Pipeline.java | 23 ++++++- .../hedera/pbj/runtime/grpc/Pipelines.java | 67 ++++++++++++++----- .../pbj/runtime/grpc/ServiceInterface.java | 2 +- .../pbj/runtime/grpc/PipelinesTest.java | 26 ++++--- 7 files changed, 131 insertions(+), 47 deletions(-) diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java index 28f8b8a4..9e62707f 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java @@ -292,7 +292,7 @@ public void init() { // Setup the subscribers. The "outgoing" subscriber will send messages to the client. // This is given to the "open" method on the service to allow it to send messages to // the client. - final Flow.Subscriber outgoing = new SendToClientSubscriber(); + final Pipeline outgoing = new SendToClientSubscriber(); pipeline = route.service().open(route.method(), options, outgoing); } catch (final GrpcException grpcException) { route.failedGrpcRequestCounter().increment(); @@ -678,7 +678,10 @@ protected void send( * The implementation of {@link Flow.Subscriber} used to send messages to the client. It * receives bytes from the handlers to send to the client. */ - private final class SendToClientSubscriber implements Flow.Subscriber { + private final class SendToClientSubscriber implements Pipeline { + + private Runnable onCancelRunnable; + @Override public void onSubscribe(@NonNull final Flow.Subscription subscription) { // FUTURE: Add support for flow control @@ -704,6 +707,9 @@ public void onNext(@NonNull final Bytes response) { new Http2FrameData(header, bufferData), flowControl.outbound()); } catch (final Exception e) { LOGGER.log(ERROR, "Failed to respond to grpc request: " + route.method(), e); + if (onCancelRunnable != null) { + onCancelRunnable.run(); + } } } @@ -735,6 +741,14 @@ public void onComplete() { return Http2StreamState.CLOSED; }); } + + @Override + public void clientEndStreamReceived() {} + + @Override + public void registerCallbackHandler(Runnable runnable) { + onCancelRunnable = runnable; + } } /** Simple implementation of the {@link ServiceInterface.RequestOptions} interface. */ diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java index ae4b4caf..01ac4759 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/GreeterService.java @@ -25,10 +25,10 @@ import edu.umd.cs.findbugs.annotations.NonNull; import greeter.HelloReply; import greeter.HelloRequest; + import java.util.Arrays; import java.util.List; import java.util.Objects; -import java.util.concurrent.Flow; /** * This service doesn't rely on any PBJ objects, because the build right now doesn't have a good way @@ -48,14 +48,14 @@ enum GreeterMethod implements Method { // A stream of messages coming from the client, with a single response from the server. Pipeline sayHelloStreamRequest( - Flow.Subscriber replies); + Pipeline replies); // A single request from the client, with a stream of responses from the server. - void sayHelloStreamReply(HelloRequest request, Flow.Subscriber replies); + void sayHelloStreamReply(HelloRequest request, Pipeline replies); // A bidirectional stream of requests and responses between the client and the server. Pipeline sayHelloStreamBidi( - Flow.Subscriber replies); + Pipeline replies); @NonNull default String serviceName() { @@ -77,7 +77,7 @@ default List methods() { default Pipeline open( final @NonNull Method method, final @NonNull RequestOptions options, - final @NonNull Flow.Subscriber replies) { + final @NonNull Pipeline replies) { final var m = (GreeterMethod) method; try { diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java index 95705827..36e92671 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/com/hedera/pbj/grpc/helidon/PbjTest.java @@ -513,7 +513,7 @@ void exceptionThrownWhileOpening() { public Pipeline open( @NonNull Method method, @NonNull RequestOptions options, - @NonNull Flow.Subscriber replies) { + @NonNull Pipeline replies) { throw ex; } }; @@ -806,7 +806,7 @@ public HelloReply sayHello(HelloRequest request) { // Streams of stuff coming from the client, with a single response. @Override public Pipeline sayHelloStreamRequest( - Flow.Subscriber replies) { + Pipeline replies) { final var names = new ArrayList(); return new Pipeline<>() { @Override @@ -814,6 +814,11 @@ public void clientEndStreamReceived() { onComplete(); } + @Override + public void registerCallbackHandler(Runnable runnable) { + + } + @Override public void onSubscribe(Flow.Subscription subscription) { subscription.request(Long.MAX_VALUE); // turn off flow control @@ -843,7 +848,7 @@ public void onComplete() { @Override public void sayHelloStreamReply( - HelloRequest request, Flow.Subscriber replies) { + HelloRequest request, Pipeline replies) { for (int i = 0; i < 10; i++) { replies.onNext(HelloReply.newBuilder().setMessage("Hello!").build()); } @@ -853,7 +858,7 @@ public void sayHelloStreamReply( @Override public Pipeline sayHelloStreamBidi( - Flow.Subscriber replies) { + Pipeline replies) { // Here we receive info from the client. In this case, it is a stream of requests with // names. We will respond with a stream of replies. return new Pipeline<>() { @@ -862,6 +867,11 @@ public void clientEndStreamReceived() { onComplete(); } + @Override + public void registerCallbackHandler(Runnable runnable) { + + } + @Override public void onSubscribe(Flow.Subscription subscription) { subscription.request(Long.MAX_VALUE); // turn off flow control @@ -894,17 +904,17 @@ default HelloReply sayHello(HelloRequest request) { @Override default Pipeline sayHelloStreamRequest( - Flow.Subscriber replies) { + Pipeline replies) { return null; } @Override default void sayHelloStreamReply( - HelloRequest request, Flow.Subscriber replies) {} + HelloRequest request, Pipeline replies) {} @Override default Pipeline sayHelloStreamBidi( - Flow.Subscriber replies) { + Pipeline replies) { return null; } } @@ -921,20 +931,20 @@ public HelloReply sayHello(HelloRequest request) { @Override @NonNull public Pipeline sayHelloStreamRequest( - Flow.Subscriber replies) { + Pipeline replies) { return svc.sayHelloStreamRequest(replies); } @Override public void sayHelloStreamReply( - HelloRequest request, Flow.Subscriber replies) { + HelloRequest request, Pipeline replies) { svc.sayHelloStreamReply(request, replies); } @Override @NonNull public Pipeline sayHelloStreamBidi( - Flow.Subscriber replies) { + Pipeline replies) { return svc.sayHelloStreamBidi(replies); } @@ -943,7 +953,7 @@ public Pipeline sayHelloStreamBidi( public Pipeline open( @NonNull Method method, @NonNull RequestOptions options, - @NonNull Flow.Subscriber replies) { + @NonNull Pipeline replies) { return svc.open(method, options, replies); } } diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java index 3360cf9f..69482be3 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipeline.java @@ -1,3 +1,19 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.hedera.pbj.runtime.grpc; import java.util.concurrent.Flow; @@ -8,8 +24,9 @@ * @param The subscribed item type */ public interface Pipeline extends Flow.Subscriber { - /** - * Called when an END_STREAM frame is received from the client. - */ + /** Called when an END_STREAM frame is received from the client. */ void clientEndStreamReceived(); + + /** Called when an exception is thrown */ + void registerCallbackHandler(Runnable runnable); } diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java index a25f2bae..782503d0 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/Pipelines.java @@ -45,6 +45,9 @@ public void clientEndStreamReceived() { // Nothing to do } + @Override + public void registerCallbackHandler(Runnable runnable) {} + private Flow.Subscription subscription; @Override public void onSubscribe(@NonNull final Flow.Subscription subscription) { @@ -159,7 +162,7 @@ public interface UnaryBuilder { * @return This builder. */ @NonNull - UnaryBuilder respondTo(@NonNull Flow.Subscriber replies); + UnaryBuilder respondTo(@NonNull Pipeline replies); /** * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains @@ -217,7 +220,7 @@ public interface BidiStreamingBuilder { * @return This builder. */ @NonNull - BidiStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); + BidiStreamingBuilder respondTo(@NonNull Pipeline replies); /** * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains @@ -277,7 +280,7 @@ ClientStreamingBuilder method( * @return This builder. */ @NonNull - ClientStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); + ClientStreamingBuilder respondTo(@NonNull Pipeline replies); /** * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains @@ -333,7 +336,7 @@ public interface ServerStreamingBuilder { * @return This builder. */ @NonNull - ServerStreamingBuilder respondTo(@NonNull Flow.Subscriber replies); + ServerStreamingBuilder respondTo(@NonNull Pipeline replies); /** * Builds the pipeline and returns it. The returned pipeline receives the incoming messages, and contains @@ -373,7 +376,7 @@ public interface ExceptionalFunction { */ @FunctionalInterface public interface ClientStreamingMethod - extends ExceptionalFunction, Flow.Subscriber> {} + extends ExceptionalFunction, Pipeline> {} /** * A function that handles a server streaming gRPC service method. A single request is received from the client, @@ -390,7 +393,7 @@ public interface ServerStreamingMethod { * @param replies The subscriber to send responses to. * @throws Exception If an error occurs during processing. */ - void apply(@NonNull T request, @NonNull Flow.Subscriber replies) throws Exception; + void apply(@NonNull T request, @NonNull Pipeline replies) throws Exception; } /** @@ -401,7 +404,7 @@ public interface ServerStreamingMethod { * @param The type of the response message. */ public interface BidiStreamingMethod - extends ExceptionalFunction, Flow.Subscriber> {} + extends ExceptionalFunction, Pipeline> {} /** * A convenient base class for the different builders. All builders have to hold state for request and @@ -419,7 +422,7 @@ public interface BidiStreamingMethod private abstract static class PipelineBuilderImpl implements Pipeline, Flow.Subscription { protected ExceptionalFunction requestMapper; protected ExceptionalFunction responseMapper; - protected Flow.Subscriber replies; + protected Pipeline replies; private Flow.Subscription sourceSubscription; protected boolean completed = false; @@ -503,7 +506,7 @@ public UnaryBuilder mapResponse(@NonNull final ExceptionalFunction respondTo(@NonNull final Flow.Subscriber replies) { + public UnaryBuilder respondTo(@NonNull final Pipeline replies) { this.replies = requireNonNull(replies); return this; } @@ -546,6 +549,11 @@ public void onNext(@NonNull final Bytes message) { public void clientEndStreamReceived() { // nothing to do, as onComplete is always called inside onNext } + + @Override + public void registerCallbackHandler(Runnable runnable) { + + } } /** @@ -557,7 +565,7 @@ public void clientEndStreamReceived() { private static final class BidiStreamingBuilderImpl extends PipelineBuilderImpl implements BidiStreamingBuilder { private BidiStreamingMethod method; - private Flow.Subscriber incoming; + private Pipeline incoming; @Override @NonNull @@ -582,7 +590,7 @@ public BidiStreamingBuilderImpl mapResponse(@NonNull final ExceptionalFunc @Override @NonNull - public BidiStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber replies) { + public BidiStreamingBuilderImpl respondTo(@NonNull final Pipeline replies) { this.replies = replies; return this; } @@ -636,6 +644,11 @@ public void clientEndStreamReceived() { // if the client stream is ended, the entire pipeline is ended onComplete(); } + + @Override + public void registerCallbackHandler(Runnable runnable) { + + } } /** @@ -646,7 +659,7 @@ public void clientEndStreamReceived() { private static final class ClientStreamingBuilderImpl extends PipelineBuilderImpl implements ClientStreamingBuilder { private ClientStreamingMethod method; - private Flow.Subscriber incoming; + private Pipeline incoming; @Override @NonNull @@ -671,7 +684,7 @@ public ClientStreamingBuilderImpl mapResponse(@NonNull final ExceptionalFu @Override @NonNull - public ClientStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber replies) { + public ClientStreamingBuilderImpl respondTo(@NonNull final Pipeline replies) { this.replies = replies; return this; } @@ -719,6 +732,11 @@ public void onComplete() { public void clientEndStreamReceived() { onComplete(); } + + @Override + public void registerCallbackHandler(Runnable runnable) { + + } } /** @@ -730,7 +748,7 @@ public void clientEndStreamReceived() { private static final class ServerStreamingBuilderImpl extends PipelineBuilderImpl implements ServerStreamingBuilder { private ServerStreamingMethod method; - private Flow.Subscriber responseConverter; + private Pipeline responseConverter; @Override @NonNull @@ -755,7 +773,7 @@ public ServerStreamingBuilderImpl mapResponse(@NonNull final ExceptionalFu @Override @NonNull - public ServerStreamingBuilderImpl respondTo(@NonNull final Flow.Subscriber replies) { + public ServerStreamingBuilderImpl respondTo(@NonNull final Pipeline replies) { this.replies = replies; return this; } @@ -794,6 +812,11 @@ public void clientEndStreamReceived() { // nothing to do // the server will continue streaming, since the message coming from the client is a subscription request } + + @Override + public void registerCallbackHandler(Runnable runnable) { + + } } /** @@ -805,8 +828,8 @@ public void clientEndStreamReceived() { * @param The type of the input. * @param The type of the output. */ - private record MapSubscriber(Flow.Subscriber next, ExceptionalFunction mapper) - implements Flow.Subscriber, Flow.Subscription { + private record MapSubscriber(Pipeline next, ExceptionalFunction mapper) + implements Pipeline, Flow.Subscription { private MapSubscriber { next.onSubscribe(this); @@ -846,5 +869,15 @@ public void onError(Throwable throwable) { public void onComplete() { next.onComplete(); } + + @Override + public void clientEndStreamReceived() { + + } + + @Override + public void registerCallbackHandler(Runnable runnable) { + next.registerCallbackHandler(runnable); + } } } diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java index a7e11d67..367164a2 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/ServiceInterface.java @@ -142,6 +142,6 @@ interface RequestOptions { */ @NonNull Pipeline open( - @NonNull Method method, @NonNull RequestOptions opts, @NonNull Flow.Subscriber responses) + @NonNull Method method, @NonNull RequestOptions opts, @NonNull Pipeline responses) throws GrpcException; } diff --git a/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/grpc/PipelinesTest.java b/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/grpc/PipelinesTest.java index 7acd3900..921d224b 100644 --- a/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/grpc/PipelinesTest.java +++ b/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/grpc/PipelinesTest.java @@ -71,7 +71,7 @@ void noopOnCompleteDoesNothing() { @Nested @ExtendWith(MockitoExtension.class) class UnaryTest { - @Mock Flow.Subscriber replies; + @Mock Pipeline replies; @Mock Flow.Subscription subscription; @Test @@ -184,8 +184,8 @@ void positive() { @Nested @ExtendWith(MockitoExtension.class) class BidiTest { - @Mock Flow.Subscriber client; - @Mock Flow.Subscriber replies; + @Mock Pipeline client; + @Mock Pipeline replies; @Mock Flow.Subscription subscription; @Test @@ -341,7 +341,7 @@ void positive() { @Nested @ExtendWith(MockitoExtension.class) class ServerStreamingTest { - @Mock Flow.Subscriber replies; + @Mock Pipeline replies; @Mock Flow.Subscription subscription; @Test @@ -470,7 +470,7 @@ void positive() { @Nested @ExtendWith(MockitoExtension.class) class ClientStreamingTest { - @Mock Flow.Subscriber replies; + @Mock Pipeline replies; @Mock Flow.Subscription subscription; @Test @@ -595,11 +595,11 @@ void positive() { verify(replies).onNext(Bytes.wrap("hello:world")); } - private static final class ConcatenatingHandler implements Flow.Subscriber { + private static final class ConcatenatingHandler implements Pipeline { private final List strings = new ArrayList<>(); - private final Flow.Subscriber sink; + private final Pipeline sink; - private ConcatenatingHandler(Flow.Subscriber sink) { + private ConcatenatingHandler(Pipeline sink) { this.sink = sink; } @@ -622,6 +622,16 @@ public void onError(Throwable throwable) { public void onComplete() { sink.onNext(String.join(":", strings)); } + + @Override + public void clientEndStreamReceived() { + + } + + @Override + public void registerCallbackHandler(Runnable runnable) { + + } } } }