Skip to content

Commit

Permalink
fix: Make server keep connection open after receiving ES (#305)
Browse files Browse the repository at this point in the history
Signed-off-by: litt <[email protected]>
  • Loading branch information
litt3 authored Oct 21, 2024
1 parent 845c93d commit 1d6360f
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.hedera.pbj.grpc.helidon.config.PbjConfig;
import com.hedera.pbj.runtime.grpc.GrpcException;
import com.hedera.pbj.runtime.grpc.GrpcStatus;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.hedera.pbj.runtime.grpc.ServiceInterface;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Down Expand Up @@ -137,14 +138,14 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa
private int entityBytesIndex = 0;

/**
* The subscriber that will receive incoming messages from the client.
* The communication pipeline between server and client
*
* <p>This member isn't final because it is set in the {@link #init()} method. It should not be
* set at any other time.
*
* <p>Method calls on this object are thread-safe.
*/
private Flow.Subscriber<? super Bytes> incoming;
private Pipeline<? super Bytes> pipeline;

/** Create a new instance */
PbjProtocolHandler(
Expand Down Expand Up @@ -267,27 +268,27 @@ public void init() {
// Setup the subscribers. The "outgoing" subscriber will send messages to the client.
// This is given to the "open" method on the service to allow it to send messages to
// the client.
final var outgoing = new SendToClientSubscriber();
incoming = route.service().open(route.method(), options, outgoing);
final Flow.Subscriber<? super Bytes> outgoing = new SendToClientSubscriber();
pipeline = route.service().open(route.method(), options, outgoing);
} catch (final GrpcException grpcException) {
route.failedGrpcRequestCounter().increment();
new TrailerOnlyBuilder()
.grpcStatus(grpcException.status())
.statusMessage(grpcException.getMessage())
.send();
close();
error();
} catch (final HttpException httpException) {
route.failedHttpRequestCounter().increment();
new TrailerOnlyBuilder()
.httpStatus(httpException.status())
.grpcStatus(GrpcStatus.INVALID_ARGUMENT)
.send();
close();
error();
} catch (final Exception unknown) {
route.failedUnknownRequestCounter().increment();
LOGGER.log(ERROR, "Failed to initialize grpc protocol handler", unknown);
new TrailerOnlyBuilder().grpcStatus(GrpcStatus.UNKNOWN).send();
close();
error();
}
}

Expand All @@ -299,7 +300,7 @@ public Http2StreamState streamState() {

@Override
public void rstStream(@NonNull final Http2RstStream rstStream) {
// Nothing to do
pipeline.onComplete();
}

@Override
Expand Down Expand Up @@ -368,7 +369,7 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa
if (entityBytesIndex == entityBytes.length) {
// Grab and wrap the bytes and reset to being reading the next message
final var bytes = Bytes.wrap(entityBytes);
incoming.onNext(bytes);
pipeline.onNext(bytes);
entityBytesIndex = 0;
entityBytes = null;
}
Expand All @@ -380,19 +381,23 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa
if (header.flags(Http2FrameTypes.DATA).endOfStream()) {
entityBytesIndex = 0;
entityBytes = null;
currentStreamState.set(Http2StreamState.HALF_CLOSED_LOCAL);
incoming.onComplete();
currentStreamState.set(Http2StreamState.HALF_CLOSED_REMOTE);
pipeline.clientEndStreamReceived();
}
} catch (final Exception e) {
// I have to propagate this error through the service interface, so it can respond to
// errors in the connection, tear down resources, etc. It will also forward this on
// to the client, causing the connection to be torn down.
incoming.onError(e);
pipeline.onError(e);
}
}

/** Close the connection with the client. May be called by different threads concurrently. */
private void close() {
/**
* An error has occurred. Cancel the deadline future if it's still active, and set the stream state accordingly.
* <p>
* May be called by different threads concurrently.
*/
private void error() {
// Canceling a future that has already completed has no effect. So by canceling here, we are
// saying:
// "If you have not yet executed, never execute. If you have already executed, then just
Expand All @@ -401,7 +406,7 @@ private void close() {

// cancel is threadsafe
deadlineFuture.cancel(false);
currentStreamState.set(Http2StreamState.HALF_CLOSED_LOCAL);
currentStreamState.set(Http2StreamState.CLOSED);
}

/**
Expand Down Expand Up @@ -453,7 +458,7 @@ private ScheduledFuture<?> scheduleDeadline(@NonNull final String timeout) {
deadline,
() -> {
route.deadlineExceededCounter().increment();
incoming.onError(new GrpcException(GrpcStatus.DEADLINE_EXCEEDED));
pipeline.onError(new GrpcException(GrpcStatus.DEADLINE_EXCEEDED));
});
}

Expand Down Expand Up @@ -650,13 +655,22 @@ public void onError(@NonNull final Throwable throwable) {
LOGGER.log(ERROR, "Failed to send response", throwable);
new TrailerBuilder().grpcStatus(GrpcStatus.INTERNAL).send();
}
close();
error();
}

@Override
public void onComplete() {
new TrailerBuilder().send();
close();

deadlineFuture.cancel(false);

currentStreamState.getAndUpdate(
currentValue -> {
if (requireNonNull(currentValue) == Http2StreamState.OPEN) {
return Http2StreamState.HALF_CLOSED_LOCAL;
}
return Http2StreamState.CLOSED;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.hedera.pbj.runtime.grpc.Pipelines;
import com.hedera.pbj.runtime.grpc.ServiceInterface;
import com.hedera.pbj.runtime.io.buffer.Bytes;
Expand Down Expand Up @@ -46,14 +47,14 @@ enum GreeterMethod implements Method {
HelloReply sayHello(HelloRequest request);

// A stream of messages coming from the client, with a single response from the server.
Flow.Subscriber<? super HelloRequest> sayHelloStreamRequest(
Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies);

// A single request from the client, with a stream of responses from the server.
void sayHelloStreamReply(HelloRequest request, Flow.Subscriber<? super HelloReply> replies);

// A bidirectional stream of requests and responses between the client and the server.
Flow.Subscriber<? super HelloRequest> sayHelloStreamBidi(
Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies);

@NonNull
Expand All @@ -73,7 +74,7 @@ default List<Method> methods() {

@Override
@NonNull
default Flow.Subscriber<? super Bytes> open(
default Pipeline<? super Bytes> open(
final @NonNull Method method,
final @NonNull RequestOptions options,
final @NonNull Flow.Subscriber<? super Bytes> replies) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.protobuf.util.JsonFormat;
import com.hedera.pbj.runtime.grpc.GrpcException;
import com.hedera.pbj.runtime.grpc.GrpcStatus;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.hedera.pbj.runtime.io.stream.ReadableStreamingData;
import com.hedera.pbj.runtime.io.stream.WritableStreamingData;
Expand Down Expand Up @@ -509,7 +510,7 @@ void exceptionThrownWhileOpening() {
new GreeterAdapter() {
@Override
@NonNull
public Flow.Subscriber<? super Bytes> open(
public Pipeline<? super Bytes> open(
@NonNull Method method,
@NonNull RequestOptions options,
@NonNull Flow.Subscriber<? super Bytes> replies) {
Expand Down Expand Up @@ -804,10 +805,15 @@ public HelloReply sayHello(HelloRequest request) {

// Streams of stuff coming from the client, with a single response.
@Override
public Flow.Subscriber<? super HelloRequest> sayHelloStreamRequest(
public Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
final var names = new ArrayList<String>();
return new Flow.Subscriber<>() {
return new Pipeline<>() {
@Override
public void clientEndStreamReceived() {
onComplete();
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE); // turn off flow control
Expand Down Expand Up @@ -846,11 +852,16 @@ public void sayHelloStreamReply(
}

@Override
public Flow.Subscriber<? super HelloRequest> sayHelloStreamBidi(
public Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
// Here we receive info from the client. In this case, it is a stream of requests with
// names. We will respond with a stream of replies.
return new Flow.Subscriber<>() {
return new Pipeline<>() {
@Override
public void clientEndStreamReceived() {
onComplete();
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE); // turn off flow control
Expand Down Expand Up @@ -882,7 +893,7 @@ default HelloReply sayHello(HelloRequest request) {
}

@Override
default Flow.Subscriber<? super HelloRequest> sayHelloStreamRequest(
default Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
return null;
}
Expand All @@ -892,7 +903,7 @@ default void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {}

@Override
default Flow.Subscriber<? super HelloRequest> sayHelloStreamBidi(
default Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
return null;
}
Expand All @@ -909,7 +920,7 @@ public HelloReply sayHello(HelloRequest request) {

@Override
@NonNull
public Flow.Subscriber<? super HelloRequest> sayHelloStreamRequest(
public Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
return svc.sayHelloStreamRequest(replies);
}
Expand All @@ -922,14 +933,14 @@ public void sayHelloStreamReply(

@Override
@NonNull
public Flow.Subscriber<? super HelloRequest> sayHelloStreamBidi(
public Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
return svc.sayHelloStreamBidi(replies);
}

@Override
@NonNull
public Flow.Subscriber<? super Bytes> open(
public Pipeline<? super Bytes> open(
@NonNull Method method,
@NonNull RequestOptions options,
@NonNull Flow.Subscriber<? super Bytes> replies) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.hedera.pbj.runtime.grpc;

import java.util.concurrent.Flow;

/**
* Represents a pipeline of data that is being processed by a gRPC service.
*
* @param <T> The subscribed item type
*/
public interface Pipeline<T> extends Flow.Subscriber<T> {
/**
* Called when an END_STREAM frame is received from the client.
*/
void clientEndStreamReceived();
}
Loading

0 comments on commit 1d6360f

Please sign in to comment.