Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

00303 runnable registration #314

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.hedera.pbj.runtime.grpc.Pipelines;
import com.hedera.pbj.runtime.grpc.ServiceInterface;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import greeter.HelloReply;
import greeter.HelloRequest;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Flow;

/**
* This service doesn't rely on any PBJ objects, because the build right now doesn't have a good way
Expand All @@ -46,15 +47,15 @@ enum GreeterMethod implements Method {
HelloReply sayHello(HelloRequest request);

// A stream of messages coming from the client, with a single response from the server.
Flow.Subscriber<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies);
Pipeline<? super HelloRequest> sayHelloStreamRequest(
Pipeline<? super HelloReply> replies);

// A single request from the client, with a stream of responses from the server.
void sayHelloStreamReply(HelloRequest request, Flow.Subscriber<? super HelloReply> replies);
void sayHelloStreamReply(HelloRequest request, Pipeline<? super HelloReply> replies);

// A bidirectional stream of requests and responses between the client and the server.
Flow.Subscriber<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies);
Pipeline<? super HelloRequest> sayHelloStreamBidi(
Pipeline<? super HelloReply> replies);

@NonNull
default String serviceName() {
Expand All @@ -73,10 +74,10 @@ default List<Method> methods() {

@Override
@NonNull
default Flow.Subscriber<? super Bytes> open(
default Pipeline<? super Bytes> open(
final @NonNull Method method,
final @NonNull RequestOptions options,
final @NonNull Flow.Subscriber<? super Bytes> replies) {
final @NonNull Pipeline<? super Bytes> replies) {

final var m = (GreeterMethod) method;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.protobuf.util.JsonFormat;
import com.hedera.pbj.runtime.grpc.GrpcException;
import com.hedera.pbj.runtime.grpc.GrpcStatus;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.hedera.pbj.runtime.io.stream.ReadableStreamingData;
import com.hedera.pbj.runtime.io.stream.WritableStreamingData;
Expand Down Expand Up @@ -509,10 +510,10 @@ void exceptionThrownWhileOpening() {
new GreeterAdapter() {
@Override
@NonNull
public Flow.Subscriber<? super Bytes> open(
public Pipeline<? super Bytes> open(
@NonNull Method method,
@NonNull RequestOptions options,
@NonNull Flow.Subscriber<? super Bytes> replies) {
@NonNull Pipeline<? super Bytes> replies) {
throw ex;
}
};
Expand Down Expand Up @@ -804,10 +805,20 @@ public HelloReply sayHello(HelloRequest request) {

// Streams of stuff coming from the client, with a single response.
@Override
public Flow.Subscriber<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
public Pipeline<? super HelloRequest> sayHelloStreamRequest(
Pipeline<? super HelloReply> replies) {
final var names = new ArrayList<String>();
return new Flow.Subscriber<>() {
return new Pipeline<>() {
@Override
public void clientEndStreamReceived() {
onComplete();
}

@Override
public void registerCallbackHandler(Runnable runnable) {

}

@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE); // turn off flow control
Expand Down Expand Up @@ -837,7 +848,7 @@ public void onComplete() {

@Override
public void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {
HelloRequest request, Pipeline<? super HelloReply> replies) {
for (int i = 0; i < 10; i++) {
replies.onNext(HelloReply.newBuilder().setMessage("Hello!").build());
}
Expand All @@ -846,11 +857,21 @@ public void sayHelloStreamReply(
}

@Override
public Flow.Subscriber<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
public Pipeline<? super HelloRequest> sayHelloStreamBidi(
Pipeline<? super HelloReply> replies) {
// Here we receive info from the client. In this case, it is a stream of requests with
// names. We will respond with a stream of replies.
return new Flow.Subscriber<>() {
return new Pipeline<>() {
@Override
public void clientEndStreamReceived() {
onComplete();
}

@Override
public void registerCallbackHandler(Runnable runnable) {

}

@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE); // turn off flow control
Expand Down Expand Up @@ -882,18 +903,18 @@ default HelloReply sayHello(HelloRequest request) {
}

@Override
default Flow.Subscriber<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
default Pipeline<? super HelloRequest> sayHelloStreamRequest(
Pipeline<? super HelloReply> replies) {
return null;
}

@Override
default void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {}
HelloRequest request, Pipeline<? super HelloReply> replies) {}

@Override
default Flow.Subscriber<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
default Pipeline<? super HelloRequest> sayHelloStreamBidi(
Pipeline<? super HelloReply> replies) {
return null;
}
}
Expand All @@ -909,30 +930,30 @@ public HelloReply sayHello(HelloRequest request) {

@Override
@NonNull
public Flow.Subscriber<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
public Pipeline<? super HelloRequest> sayHelloStreamRequest(
Pipeline<? super HelloReply> replies) {
return svc.sayHelloStreamRequest(replies);
}

@Override
public void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {
HelloRequest request, Pipeline<? super HelloReply> replies) {
svc.sayHelloStreamReply(request, replies);
}

@Override
@NonNull
public Flow.Subscriber<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
public Pipeline<? super HelloRequest> sayHelloStreamBidi(
Pipeline<? super HelloReply> replies) {
return svc.sayHelloStreamBidi(replies);
}

@Override
@NonNull
public Flow.Subscriber<? super Bytes> open(
public Pipeline<? super Bytes> open(
@NonNull Method method,
@NonNull RequestOptions options,
@NonNull Flow.Subscriber<? super Bytes> replies) {
@NonNull Pipeline<? super Bytes> replies) {
return svc.open(method, options, replies);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.pbj.runtime.grpc;

import java.util.concurrent.Flow;

/**
* Represents a pipeline of data that is being processed by a gRPC service.
*
* @param <T> The subscribed item type
*/
public interface Pipeline<T> extends Flow.Subscriber<T> {
/** Called when an END_STREAM frame is received from the client. */
void clientEndStreamReceived();

/** Called when an exception is thrown */
void registerCallbackHandler(Runnable runnable);
}
Loading
Loading