Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Make server keep connection open after receiving ES #305

Merged
merged 3 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.hedera.pbj.grpc.helidon.config.PbjConfig;
import com.hedera.pbj.runtime.grpc.GrpcException;
import com.hedera.pbj.runtime.grpc.GrpcStatus;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.hedera.pbj.runtime.grpc.ServiceInterface;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Down Expand Up @@ -137,14 +138,14 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa
private int entityBytesIndex = 0;

/**
* The subscriber that will receive incoming messages from the client.
* The communication pipeline between server and client
*
* <p>This member isn't final because it is set in the {@link #init()} method. It should not be
* set at any other time.
*
* <p>Method calls on this object are thread-safe.
*/
private Flow.Subscriber<? super Bytes> incoming;
private Pipeline<? super Bytes> pipeline;

/** Create a new instance */
PbjProtocolHandler(
Expand Down Expand Up @@ -267,27 +268,27 @@ public void init() {
// Setup the subscribers. The "outgoing" subscriber will send messages to the client.
// This is given to the "open" method on the service to allow it to send messages to
// the client.
final var outgoing = new SendToClientSubscriber();
incoming = route.service().open(route.method(), options, outgoing);
final Flow.Subscriber<? super Bytes> outgoing = new SendToClientSubscriber();
pipeline = route.service().open(route.method(), options, outgoing);
} catch (final GrpcException grpcException) {
route.failedGrpcRequestCounter().increment();
new TrailerOnlyBuilder()
.grpcStatus(grpcException.status())
.statusMessage(grpcException.getMessage())
.send();
close();
error();
} catch (final HttpException httpException) {
route.failedHttpRequestCounter().increment();
new TrailerOnlyBuilder()
.httpStatus(httpException.status())
.grpcStatus(GrpcStatus.INVALID_ARGUMENT)
.send();
close();
error();
} catch (final Exception unknown) {
route.failedUnknownRequestCounter().increment();
LOGGER.log(ERROR, "Failed to initialize grpc protocol handler", unknown);
new TrailerOnlyBuilder().grpcStatus(GrpcStatus.UNKNOWN).send();
close();
error();
}
}

Expand All @@ -299,7 +300,7 @@ public Http2StreamState streamState() {

@Override
public void rstStream(@NonNull final Http2RstStream rstStream) {
// Nothing to do
pipeline.onComplete();
}

@Override
Expand Down Expand Up @@ -368,7 +369,7 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa
if (entityBytesIndex == entityBytes.length) {
// Grab and wrap the bytes and reset to being reading the next message
final var bytes = Bytes.wrap(entityBytes);
incoming.onNext(bytes);
pipeline.onNext(bytes);
entityBytesIndex = 0;
entityBytes = null;
}
Expand All @@ -380,19 +381,23 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa
if (header.flags(Http2FrameTypes.DATA).endOfStream()) {
entityBytesIndex = 0;
entityBytes = null;
currentStreamState.set(Http2StreamState.HALF_CLOSED_LOCAL);
incoming.onComplete();
currentStreamState.set(Http2StreamState.HALF_CLOSED_REMOTE);
pipeline.clientEndStreamReceived();
}
} catch (final Exception e) {
// I have to propagate this error through the service interface, so it can respond to
// errors in the connection, tear down resources, etc. It will also forward this on
// to the client, causing the connection to be torn down.
incoming.onError(e);
pipeline.onError(e);
}
}

/** Close the connection with the client. May be called by different threads concurrently. */
private void close() {
/**
* An error has occurred. Cancel the deadline future if it's still active, and set the stream state accordingly.
* <p>
* May be called by different threads concurrently.
*/
private void error() {
// Canceling a future that has already completed has no effect. So by canceling here, we are
// saying:
// "If you have not yet executed, never execute. If you have already executed, then just
Expand All @@ -401,7 +406,7 @@ private void close() {

// cancel is threadsafe
deadlineFuture.cancel(false);
currentStreamState.set(Http2StreamState.HALF_CLOSED_LOCAL);
currentStreamState.set(Http2StreamState.CLOSED);
}

/**
Expand Down Expand Up @@ -453,7 +458,7 @@ private ScheduledFuture<?> scheduleDeadline(@NonNull final String timeout) {
deadline,
() -> {
route.deadlineExceededCounter().increment();
incoming.onError(new GrpcException(GrpcStatus.DEADLINE_EXCEEDED));
pipeline.onError(new GrpcException(GrpcStatus.DEADLINE_EXCEEDED));
});
}

Expand Down Expand Up @@ -650,13 +655,22 @@ public void onError(@NonNull final Throwable throwable) {
LOGGER.log(ERROR, "Failed to send response", throwable);
new TrailerBuilder().grpcStatus(GrpcStatus.INTERNAL).send();
}
close();
error();
}

@Override
public void onComplete() {
new TrailerBuilder().send();
close();

deadlineFuture.cancel(false);

currentStreamState.getAndUpdate(
currentValue -> {
if (requireNonNull(currentValue) == Http2StreamState.OPEN) {
return Http2StreamState.HALF_CLOSED_LOCAL;
}
return Http2StreamState.CLOSED;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.hedera.pbj.runtime.grpc.Pipelines;
import com.hedera.pbj.runtime.grpc.ServiceInterface;
import com.hedera.pbj.runtime.io.buffer.Bytes;
Expand Down Expand Up @@ -46,14 +47,14 @@ enum GreeterMethod implements Method {
HelloReply sayHello(HelloRequest request);

// A stream of messages coming from the client, with a single response from the server.
Flow.Subscriber<? super HelloRequest> sayHelloStreamRequest(
Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies);

// A single request from the client, with a stream of responses from the server.
void sayHelloStreamReply(HelloRequest request, Flow.Subscriber<? super HelloReply> replies);

// A bidirectional stream of requests and responses between the client and the server.
Flow.Subscriber<? super HelloRequest> sayHelloStreamBidi(
Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies);

@NonNull
Expand All @@ -73,7 +74,7 @@ default List<Method> methods() {

@Override
@NonNull
default Flow.Subscriber<? super Bytes> open(
default Pipeline<? super Bytes> open(
final @NonNull Method method,
final @NonNull RequestOptions options,
final @NonNull Flow.Subscriber<? super Bytes> replies) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.protobuf.util.JsonFormat;
import com.hedera.pbj.runtime.grpc.GrpcException;
import com.hedera.pbj.runtime.grpc.GrpcStatus;
import com.hedera.pbj.runtime.grpc.Pipeline;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.hedera.pbj.runtime.io.stream.ReadableStreamingData;
import com.hedera.pbj.runtime.io.stream.WritableStreamingData;
Expand Down Expand Up @@ -509,7 +510,7 @@ void exceptionThrownWhileOpening() {
new GreeterAdapter() {
@Override
@NonNull
public Flow.Subscriber<? super Bytes> open(
public Pipeline<? super Bytes> open(
@NonNull Method method,
@NonNull RequestOptions options,
@NonNull Flow.Subscriber<? super Bytes> replies) {
Expand Down Expand Up @@ -804,10 +805,15 @@ public HelloReply sayHello(HelloRequest request) {

// Streams of stuff coming from the client, with a single response.
@Override
public Flow.Subscriber<? super HelloRequest> sayHelloStreamRequest(
public Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
final var names = new ArrayList<String>();
return new Flow.Subscriber<>() {
return new Pipeline<>() {
@Override
public void clientEndStreamReceived() {
onComplete();
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE); // turn off flow control
Expand Down Expand Up @@ -846,11 +852,16 @@ public void sayHelloStreamReply(
}

@Override
public Flow.Subscriber<? super HelloRequest> sayHelloStreamBidi(
public Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
// Here we receive info from the client. In this case, it is a stream of requests with
// names. We will respond with a stream of replies.
return new Flow.Subscriber<>() {
return new Pipeline<>() {
@Override
public void clientEndStreamReceived() {
onComplete();
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE); // turn off flow control
Expand Down Expand Up @@ -882,7 +893,7 @@ default HelloReply sayHello(HelloRequest request) {
}

@Override
default Flow.Subscriber<? super HelloRequest> sayHelloStreamRequest(
default Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
return null;
}
Expand All @@ -892,7 +903,7 @@ default void sayHelloStreamReply(
HelloRequest request, Flow.Subscriber<? super HelloReply> replies) {}

@Override
default Flow.Subscriber<? super HelloRequest> sayHelloStreamBidi(
default Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
return null;
}
Expand All @@ -909,7 +920,7 @@ public HelloReply sayHello(HelloRequest request) {

@Override
@NonNull
public Flow.Subscriber<? super HelloRequest> sayHelloStreamRequest(
public Pipeline<? super HelloRequest> sayHelloStreamRequest(
Flow.Subscriber<? super HelloReply> replies) {
return svc.sayHelloStreamRequest(replies);
}
Expand All @@ -922,14 +933,14 @@ public void sayHelloStreamReply(

@Override
@NonNull
public Flow.Subscriber<? super HelloRequest> sayHelloStreamBidi(
public Pipeline<? super HelloRequest> sayHelloStreamBidi(
Flow.Subscriber<? super HelloReply> replies) {
return svc.sayHelloStreamBidi(replies);
}

@Override
@NonNull
public Flow.Subscriber<? super Bytes> open(
public Pipeline<? super Bytes> open(
@NonNull Method method,
@NonNull RequestOptions options,
@NonNull Flow.Subscriber<? super Bytes> replies) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.hedera.pbj.runtime.grpc;

import java.util.concurrent.Flow;

/**
* Represents a pipeline of data that is being processed by a gRPC service.
*
* @param <T> The subscribed item type
*/
public interface Pipeline<T> extends Flow.Subscriber<T> {
/**
* Called when an END_STREAM frame is received from the client.
*/
void clientEndStreamReceived();
}
Loading
Loading