Skip to content

Commit

Permalink
fix: First draft of callback registration
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <[email protected]>
  • Loading branch information
mattp-swirldslabs committed Nov 2, 2024
1 parent b9cfdc1 commit aa392ce
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public void init() {
// Setup the subscribers. The "outgoing" subscriber will send messages to the client.
// This is given to the "open" method on the service to allow it to send messages to
// the client.
final Flow.Subscriber<? super Bytes> outgoing = new SendToClientSubscriber();
final Pipeline<? super Bytes> outgoing = new SendToClientSubscriber();
pipeline = route.service().open(route.method(), options, outgoing);
} catch (final GrpcException grpcException) {
route.failedGrpcRequestCounter().increment();
Expand Down Expand Up @@ -678,7 +678,10 @@ protected void send(
* The implementation of {@link Flow.Subscriber} used to send messages to the client. It
* receives bytes from the handlers to send to the client.
*/
private final class SendToClientSubscriber implements Flow.Subscriber<Bytes> {
private final class SendToClientSubscriber implements Pipeline<Bytes> {

private Runnable onCancelRunnable;

@Override
public void onSubscribe(@NonNull final Flow.Subscription subscription) {
// FUTURE: Add support for flow control
Expand All @@ -704,6 +707,9 @@ public void onNext(@NonNull final Bytes response) {
new Http2FrameData(header, bufferData), flowControl.outbound());
} catch (final Exception e) {
LOGGER.log(ERROR, "Failed to respond to grpc request: " + route.method(), e);
if (onCancelRunnable != null) {
onCancelRunnable.run();
}
}
}

Expand Down Expand Up @@ -735,6 +741,14 @@ public void onComplete() {
return Http2StreamState.CLOSED;
});
}

@Override
public void clientEndStreamReceived() {}

@Override
public void registerCallbackHandler(Runnable runnable) {
onCancelRunnable = runnable;
}
}

/** Simple implementation of the {@link ServiceInterface.RequestOptions} interface. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import greeter.HelloReply;
import greeter.HelloRequest;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Flow;

/**
* This service doesn't rely on any PBJ objects, because the build right now doesn't have a good way
Expand All @@ -48,14 +48,14 @@ enum GreeterMethod implements Method {

// A stream of messages coming from the client, with a single response from the server.
Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies);
Pipeline<? super HelloReply> replies);

// A single request from the client, with a stream of responses from the server.
void sayHelloStreamReply(HelloRequest request, Flow.Subscriber<? super HelloReply> replies);
void sayHelloStreamReply(HelloRequest request, Pipeline<? super HelloReply> replies);

// A bidirectional stream of requests and responses between the client and the server.
Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies);
Pipeline<? super HelloReply> replies);

@NonNull
default String serviceName() {
Expand All @@ -77,7 +77,7 @@ default List<Method> methods() {
default Pipeline<? super Bytes> open(
final @NonNull Method method,
final @NonNull RequestOptions options,
final @NonNull Flow.Subscriber<? super Bytes> replies) {
final @NonNull Pipeline<? super Bytes> replies) {

final var m = (GreeterMethod) method;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ void exceptionThrownWhileOpening() {
public Pipeline<? super Bytes> open(
@NonNull Method method,
@NonNull RequestOptions options,
@NonNull Flow.Subscriber<? super Bytes> replies) {
@NonNull Pipeline<? super Bytes> replies) {
throw ex;
}
};
Expand Down Expand Up @@ -806,14 +806,19 @@ public HelloReply sayHello(HelloRequest request) {
// Streams of stuff coming from the client, with a single response.
@Override
public Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
final var names = new ArrayList<String>();
return new Pipeline<>() {
@Override
public void clientEndStreamReceived() {
onComplete();
}

@Override
public void registerCallbackHandler(Runnable runnable) {

}

@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE); // turn off flow control
Expand Down Expand Up @@ -843,7 +848,7 @@ public void onComplete() {

@Override
public void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {
HelloRequest request, Pipeline<? super HelloReply> replies) {
for (int i = 0; i < 10; i++) {
replies.onNext(HelloReply.newBuilder().setMessage("Hello!").build());
}
Expand All @@ -853,7 +858,7 @@ public void sayHelloStreamReply(

@Override
public Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
// Here we receive info from the client. In this case, it is a stream of requests with
// names. We will respond with a stream of replies.
return new Pipeline<>() {
Expand All @@ -862,6 +867,11 @@ public void clientEndStreamReceived() {
onComplete();
}

@Override
public void registerCallbackHandler(Runnable runnable) {

}

@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE); // turn off flow control
Expand Down Expand Up @@ -894,17 +904,17 @@ default HelloReply sayHello(HelloRequest request) {

@Override
default Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
return null;
}

@Override
default void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {}
HelloRequest request, Pipeline<? super HelloReply> replies) {}

@Override
default Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
return null;
}
}
Expand All @@ -921,20 +931,20 @@ public HelloReply sayHello(HelloRequest request) {
@Override
@NonNull
public Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
return svc.sayHelloStreamRequest(replies);
}

@Override
public void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {
HelloRequest request, Pipeline<? super HelloReply> replies) {
svc.sayHelloStreamReply(request, replies);
}

@Override
@NonNull
public Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
Pipeline<? super HelloReply> replies) {
return svc.sayHelloStreamBidi(replies);
}

Expand All @@ -943,7 +953,7 @@ public Pipeline<? super HelloRequest> sayHelloStreamBidi(
public Pipeline<? super Bytes> open(
@NonNull Method method,
@NonNull RequestOptions options,
@NonNull Flow.Subscriber<? super Bytes> replies) {
@NonNull Pipeline<? super Bytes> replies) {
return svc.open(method, options, replies);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.pbj.runtime.grpc;

import java.util.concurrent.Flow;
Expand All @@ -8,8 +24,9 @@
* @param <T> The subscribed item type
*/
public interface Pipeline<T> extends Flow.Subscriber<T> {
/**
* Called when an END_STREAM frame is received from the client.
*/
/** Called when an END_STREAM frame is received from the client. */
void clientEndStreamReceived();

/** Called when an exception is thrown */
void registerCallbackHandler(Runnable runnable);
}
Loading

0 comments on commit aa392ce

Please sign in to comment.