From 66066c72427f35eb8ceb6604ceb9b810e9fc6f81 Mon Sep 17 00:00:00 2001 From: Richard Bair Date: Tue, 28 May 2024 18:28:35 -0700 Subject: [PATCH] Initial upload of a GRPC implementation for Helidon Signed-off-by: Richard Bair --- pbj-core/gradle/modules.properties | 1 + pbj-core/pbj-grpc-helidon/README.md | 3 + pbj-core/pbj-grpc-helidon/build.gradle.kts | 38 ++ .../pbj/grpc/helidon/DeadlineDetector.java | 7 + .../hedera/pbj/grpc/helidon/GrpcHeaders.java | 10 + .../hedera/pbj/grpc/helidon/GrpcStatus.java | 150 ++++++++ .../hedera/pbj/grpc/helidon/PbjConfig.java | 328 +++++++++++++++++ .../pbj/grpc/helidon/PbjConfigBlueprint.java | 54 +++ .../grpc/helidon/PbjErrorProtocolHandler.java | 63 ++++ .../pbj/grpc/helidon/PbjMethodRoute.java | 49 +++ .../helidon/PbjProtocolConfigProvider.java | 38 ++ .../pbj/grpc/helidon/PbjProtocolHandler.java | 340 ++++++++++++++++++ .../pbj/grpc/helidon/PbjProtocolProvider.java | 36 ++ .../pbj/grpc/helidon/PbjProtocolSelector.java | 118 ++++++ .../com/hedera/pbj/grpc/helidon/PbjRoute.java | 28 ++ .../hedera/pbj/grpc/helidon/PbjRouting.java | 111 ++++++ .../pbj/grpc/helidon/PbjServiceRoute.java | 49 +++ .../pbj/grpc/helidon/encoding/Encoding.java | 8 + .../grpc/helidon/encoding/GzipEncoding.java | 11 + .../helidon/encoding/IdentityEncoding.java | 8 + .../src/main/java/module-info.java | 14 + .../META-INF/helidon/config-metadata.json | 1 + .../helidon/feature-metadata.properties | 6 + ...bserver.http2.spi.Http2SubProtocolProvider | 2 + ...lidon.webserver.spi.ProtocolConfigProvider | 2 + .../src/test/java/http/HttpTest.java | 23 ++ .../src/test/java/pbj/ConsensusService.java | 122 +++++++ .../src/test/java/pbj/PbjTest.java | 337 +++++++++++++++++ .../hedera/pbj/runtime/ServiceInterface.java | 117 ++++++ pbj-core/settings.gradle.kts | 8 + 30 files changed, 2082 insertions(+) create mode 100644 pbj-core/pbj-grpc-helidon/README.md create mode 100644 pbj-core/pbj-grpc-helidon/build.gradle.kts create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/DeadlineDetector.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/GrpcHeaders.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/GrpcStatus.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfig.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfigBlueprint.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjErrorProtocolHandler.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjMethodRoute.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolConfigProvider.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolProvider.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolSelector.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjRoute.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjRouting.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjServiceRoute.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/Encoding.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/GzipEncoding.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/IdentityEncoding.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/module-info.java create mode 100644 pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/helidon/config-metadata.json create mode 100644 pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/helidon/feature-metadata.properties create mode 100644 pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/services/io.helidon.webserver.http2.spi.Http2SubProtocolProvider create mode 100644 pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/services/io.helidon.webserver.spi.ProtocolConfigProvider create mode 100644 pbj-core/pbj-grpc-helidon/src/test/java/http/HttpTest.java create mode 100644 pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java create mode 100644 pbj-core/pbj-grpc-helidon/src/test/java/pbj/PbjTest.java create mode 100644 pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/ServiceInterface.java diff --git a/pbj-core/gradle/modules.properties b/pbj-core/gradle/modules.properties index 8c6cd6a4..5046b2c8 100644 --- a/pbj-core/gradle/modules.properties +++ b/pbj-core/gradle/modules.properties @@ -1,3 +1,4 @@ com.github.spotbugs.annotations=com.github.spotbugs:spotbugs-annotations com.google.protobuf=com.google.protobuf:protobuf-java org.antlr.antlr4.runtime=org.antlr:antlr4-runtime +com.hedera.node.hapi=com.hedera.hashgraph:hapi diff --git a/pbj-core/pbj-grpc-helidon/README.md b/pbj-core/pbj-grpc-helidon/README.md new file mode 100644 index 00000000..013534ee --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/README.md @@ -0,0 +1,3 @@ +# PBJ GRPC Helidon + +This project produces a module for Helidon that enables native support for PBJ gRPC services. \ No newline at end of file diff --git a/pbj-core/pbj-grpc-helidon/build.gradle.kts b/pbj-core/pbj-grpc-helidon/build.gradle.kts new file mode 100644 index 00000000..3901e88a --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/build.gradle.kts @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2022-2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id("java-library") + id("com.hedera.pbj.conventions") + id("com.google.protobuf") // protobuf plugin is only used for tests + id("me.champeau.jmh") +} + +testModuleInfo { + requires("org.assertj.core") + requires("org.junit.jupiter.api") + requires("org.junit.jupiter.params") + requires("io.helidon.webclient") + requires("io.helidon.webserver") + requires("io.helidon.webserver.http2") + requires("io.helidon.webclient.http2") + requires("com.hedera.node.hapi") + requiresStatic("com.github.spotbugs.annotations") +} + +tasks.named("compileJava") { + dependsOn(":pbj-runtime:jar") +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/DeadlineDetector.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/DeadlineDetector.java new file mode 100644 index 00000000..6e8ae207 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/DeadlineDetector.java @@ -0,0 +1,7 @@ +package com.hedera.pbj.grpc.helidon; + +import java.util.concurrent.ScheduledFuture; + +interface DeadlineDetector { + ScheduledFuture scheduleDeadline(long deadline, Runnable onDeadlineExceeded); +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/GrpcHeaders.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/GrpcHeaders.java new file mode 100644 index 00000000..d91e18ea --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/GrpcHeaders.java @@ -0,0 +1,10 @@ +package com.hedera.pbj.grpc.helidon; + +import io.helidon.http.HeaderName; +import io.helidon.http.HeaderNames; + +final class GrpcHeaders { + static final HeaderName GRPC_TIMEOUT = HeaderNames.create("grpc-timeout"); + static final HeaderName GRPC_ENCODING = HeaderNames.create("grpc-encoding"); + static final HeaderName GRPC_ACCEPT_ENCODING = HeaderNames.create("grpc-accept-encoding"); +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/GrpcStatus.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/GrpcStatus.java new file mode 100644 index 00000000..f7757802 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/GrpcStatus.java @@ -0,0 +1,150 @@ +package com.hedera.pbj.grpc.helidon; + +import io.helidon.http.Header; +import io.helidon.http.HeaderName; +import io.helidon.http.HeaderNames; +import io.helidon.http.HeaderValues; + +/** + * Status headers for defined grpc states. + * + * @see GRPC Status codes + */ +public final class GrpcStatus { + /** + * grpc status header name. + */ + public static final HeaderName STATUS_NAME = HeaderNames.createFromLowercase("grpc-status"); + /** + * grpc status message header name. + */ + public static final HeaderName MESSAGE_NAME = HeaderNames.createFromLowercase("grpc-message"); + /** + * The operation completed successfully. + */ + public static final Header OK = HeaderValues.createCached(STATUS_NAME, 0); + /** + * The operation was cancelled (typically by the caller). + */ + public static final Header CANCELLED = HeaderValues.createCached(STATUS_NAME, 1); + /** + * Unknown error. An example of where this error may be returned is + * if a Status value received from another address space belongs to + * an error-space that is not known in this address space. Also + * errors raised by APIs that do not return enough error information + * may be converted to this error. + */ + public static final Header UNKNOWN = HeaderValues.createCached(STATUS_NAME, 2); + /** + * Client specified an invalid argument. Note that this differs + * from FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments + * that are problematic regardless of the state of the system + * (e.g., a malformed file name). + */ + public static final Header INVALID_ARGUMENT = HeaderValues.createCached(STATUS_NAME, 3); + /** + * Deadline expired before operation could complete. For operations + * that change the state of the system, this error may be returned + * even if the operation has completed successfully. For example, a + * successful response from a server could have been delayed long + * enough for the deadline to expire. + */ + public static final Header DEADLINE_EXCEEDED = HeaderValues.createCached(STATUS_NAME, 4); + /** + * Some requested entity (e.g., file or directory) was not found. + */ + public static final Header NOT_FOUND = HeaderValues.createCached(STATUS_NAME, 5); + /** + * Some entity that we attempted to create (e.g., file or directory) already exists. + */ + public static final Header ALREADY_EXISTS = HeaderValues.createCached(STATUS_NAME, 6); + /** + * The caller does not have permission to execute the specified + * operation. PERMISSION_DENIED must not be used for rejections + * caused by exhausting some resource (use RESOURCE_EXHAUSTED + * instead for those errors). PERMISSION_DENIED must not be + * used if the caller cannot be identified (use UNAUTHENTICATED + * instead for those errors). + */ + public static final Header PERMISSION_DENIED = HeaderValues.createCached(STATUS_NAME, 7); + /** + * Some resource has been exhausted, perhaps a per-user quota, or + * perhaps the entire file system is out of space. + */ + public static final Header RESOURCE_EXHAUSTED = HeaderValues.createCached(STATUS_NAME, 8); + /** + * Operation was rejected because the system is not in a state + * required for the operation's execution. For example, directory + * to be deleted may be non-empty, an rmdir operation is applied to + * a non-directory, etc. + * + *

A litmus test that may help a service implementor in deciding + * between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE: + * (a) Use UNAVAILABLE if the client can retry just the failing call. + * (b) Use ABORTED if the client should retry at a higher-level + * (e.g., restarting a read-modify-write sequence). + * (c) Use FAILED_PRECONDITION if the client should not retry until + * the system state has been explicitly fixed. E.g., if an "rmdir" + * fails because the directory is non-empty, FAILED_PRECONDITION + * should be returned since the client should not retry unless + * they have first fixed up the directory by deleting files from it. + */ + public static final Header FAILED_PRECONDITION = HeaderValues.createCached(STATUS_NAME, 9); + /** + * The operation was aborted, typically due to a concurrency issue + * like sequencer check failures, transaction aborts, etc. + * + *

See litmus test above for deciding between FAILED_PRECONDITION, + * ABORTED, and UNAVAILABLE. + */ + public static final Header ABORTED = HeaderValues.createCached(STATUS_NAME, 10); + /** + * Operation was attempted past the valid range. E.g., seeking or + * reading past end of file. + * + *

Unlike INVALID_ARGUMENT, this error indicates a problem that may + * be fixed if the system state changes. For example, a 32-bit file + * system will generate INVALID_ARGUMENT if asked to read at an + * offset that is not in the range [0,2^32-1], but it will generate + * OUT_OF_RANGE if asked to read from an offset past the current + * file size. + * + *

There is a fair bit of overlap between FAILED_PRECONDITION and OUT_OF_RANGE. + * We recommend using OUT_OF_RANGE (the more specific error) when it applies + * so that callers who are iterating through + * a space can easily look for an OUT_OF_RANGE error to detect when they are done. + */ + public static final Header OUT_OF_RANGE = HeaderValues.createCached(STATUS_NAME, 11); + /** + * Operation is not implemented or not supported/enabled in this service. + */ + public static final Header UNIMPLEMENTED = HeaderValues.createCached(STATUS_NAME, 12); + /** + * Internal errors. Means some invariants expected by underlying + * system has been broken. If you see one of these errors, + * something is very broken. + */ + public static final Header INTERNAL = HeaderValues.createCached(STATUS_NAME, 13); + /** + * The service is currently unavailable. This is a most likely a + * transient condition and may be corrected by retrying with + * a backoff. Note that it is not always safe to retry + * non-idempotent operations. + * + *

See litmus test above for deciding between FAILED_PRECONDITION, + * ABORTED, and UNAVAILABLE. + */ + public static final Header UNAVAILABLE = HeaderValues.createCached(STATUS_NAME, 14); + /** + * Unrecoverable data loss or corruption. + */ + public static final Header DATA_LOSS = HeaderValues.createCached(STATUS_NAME, 15); + /** + * The request does not have valid authentication credentials for the + * operation. + */ + public static final Header UNAUTHENTICATED = HeaderValues.createCached(STATUS_NAME, 16); + + private GrpcStatus() { + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfig.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfig.java new file mode 100644 index 00000000..711108b0 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfig.java @@ -0,0 +1,328 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.pbj.grpc.helidon; + +import io.helidon.builder.api.Prototype; +import io.helidon.common.Errors; +import io.helidon.common.Generated; +import io.helidon.common.config.Config; +import java.util.Objects; +import java.util.Optional; + +// NOTE: This file was originally generated from a Maven build patterned after that found in the Helidon project. +// But I am not sure how to integrate that into the build process, so for now, I have just copy/pasted it here. + +/** + * Interface generated from definition. Please add javadoc to the definition interface. + * + * @see #builder() + * @see #create() + */ +@Generated(value = "io.helidon.builder.codegen.BuilderCodegen", trigger = "com.hedera.hashgraph.pbj.PbjConfigBlueprint") +public interface PbjConfig extends PbjConfigBlueprint, Prototype.Api { + + /** + * Create a new fluent API builder to customize configuration. + * + * @return a new builder + */ + static Builder builder() { + return new Builder(); + } + + /** + * Create a new fluent API builder from an existing instance. + * + * @param instance an existing instance used as a base for the builder + * @return a builder based on an instance + */ + static Builder builder(PbjConfig instance) { + return PbjConfig.builder().from(instance); + } + + /** + * Create a new instance from configuration. + * + * @param config used to configure the new instance + * @return a new instance configured from configuration + */ + static PbjConfig create(Config config) { + return PbjConfig.builder().config(config).buildPrototype(); + } + + /** + * Create a new instance with default values. + * + * @return a new instance + */ + static PbjConfig create() { + return PbjConfig.builder().buildPrototype(); + } + + /** + * Fluent API builder base for {@link PbjConfig}. + * + * @param type of the builder extending this abstract builder + * @param type of the prototype interface that would be built by {@link #buildPrototype()} + */ + abstract class BuilderBase, PROTOTYPE extends PbjConfig> implements Prototype.ConfiguredBuilder { + + private Config config; + private int maxMessageSize = 10240; + private int maxResponseBufferSize = 10240; + private String name; + + /** + * Protected to support extensibility. + */ + protected BuilderBase() { + } + + /** + * Update this builder from an existing prototype instance. This method disables automatic service discovery. + * + * @param prototype existing prototype to update this builder from + * @return updated builder instance + */ + public BUILDER from(PbjConfig prototype) { + maxMessageSize(prototype.maxMessageSize()); + maxResponseBufferSize(prototype.maxResponseBufferSize()); + name(prototype.name()); + return self(); + } + + /** + * Update this builder from an existing prototype builder instance. + * + * @param builder existing builder prototype to update this builder from + * @return updated builder instance + */ + public BUILDER from(BuilderBase builder) { + maxMessageSize(builder.maxMessageSize()); + maxResponseBufferSize(builder.maxResponseBufferSize()); + builder.name().ifPresent(this::name); + return self(); + } + + /** + * Update builder from configuration (node of this type). + * If a value is present in configuration, it would override currently configured values. + * + * @param config configuration instance used to obtain values to update this builder + * @return updated builder instance + */ + @Override + public BUILDER config(Config config) { + Objects.requireNonNull(config); + this.config = config; + config.get("max-message-size").as(Integer.class).ifPresent(this::maxMessageSize); + config.get("max-response-buffer-size").as(Integer.class).ifPresent(this::maxResponseBufferSize); + return self(); + } + + /** + * Maximum size of any message in bytes. + * Defaults to {@value #DEFAULT_MAX_MESSAGE_SIZE}. + * + * @param maxMessageSize the maximum number of bytes a single message can be + * @return updated builder instance + * @see #maxMessageSize() + */ + public BUILDER maxMessageSize(int maxMessageSize) { + this.maxMessageSize = maxMessageSize; + return self(); + } + + /** + * Maximum size of the response buffer in bytes. + * Defaults to {@value #DEFAULT_MAX_RESPONSE_BUFFER_SIZE}. + * + * @param maxResponseBufferSize the maximum number of bytes a response can be + * @return updated builder instance + * @see #maxResponseBufferSize() + */ + public BUILDER maxResponseBufferSize(int maxResponseBufferSize) { + this.maxResponseBufferSize = maxResponseBufferSize; + return self(); + } + + /** + * + * + * @param name + * @return updated builder instance + * @see #name() + */ + public BUILDER name(String name) { + Objects.requireNonNull(name); + this.name = name; + return self(); + } + + /** + * Maximum size of any message in bytes. + * Defaults to {@value #DEFAULT_MAX_MESSAGE_SIZE}. + * + * @return the max message size + */ + public int maxMessageSize() { + return maxMessageSize; + } + + /** + * Maximum size of the response buffer in bytes. + * Defaults to {@value #DEFAULT_MAX_RESPONSE_BUFFER_SIZE}. + * + * @return the max response buffer size + */ + public int maxResponseBufferSize() { + return maxResponseBufferSize; + } + + /** + * + * + * @return the name + */ + public Optional name() { + return Optional.ofNullable(name); + } + + /** + * If this instance was configured, this would be the config instance used. + * + * @return config node used to configure this builder, or empty if not configured + */ + public Optional config() { + return Optional.ofNullable(config); + } + + @Override + public String toString() { + return "PbjConfigBuilder{" + + "maxMessageSize=" + maxMessageSize + "," + + "maxResponseBufferSize=" + maxResponseBufferSize + "," + + "name=" + name + + "}"; + } + + /** + * Handles providers and decorators. + */ + protected void preBuildPrototype() { + } + + /** + * Validates required properties. + */ + protected void validatePrototype() { + Errors.Collector collector = Errors.collector(); + if (name == null) { + collector.fatal(getClass(), "Property \"name\" must not be null, but not set"); + } + collector.collect().checkValid(); + } + + /** + * Generated implementation of the prototype, can be extended by descendant prototype implementations. + */ + protected static class PbjConfigImpl implements PbjConfig { + + private final int maxMessageSize; + private final int maxResponseBufferSize; + private final String name; + + /** + * Create an instance providing a builder. + * + * @param builder extending builder base of this prototype + */ + protected PbjConfigImpl(BuilderBase builder) { + this.maxMessageSize = builder.maxMessageSize(); + this.maxResponseBufferSize = builder.maxResponseBufferSize(); + this.name = builder.name().get(); + } + + @Override + public int maxMessageSize() { + return maxMessageSize; + } + + @Override + public int maxResponseBufferSize() { + return maxResponseBufferSize; + } + + @Override + public String name() { + return name; + } + + @Override + public String toString() { + return "PbjConfig{" + + "maxMessageSize=" + maxMessageSize + "," + + "maxResponseBufferSize=" + maxResponseBufferSize + "," + + "name=" + name + + "}"; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof PbjConfig other)) { + return false; + } + return maxMessageSize == other.maxMessageSize() + && maxResponseBufferSize == other.maxResponseBufferSize() + && Objects.equals(name, other.name()); + } + + @Override + public int hashCode() { + return Objects.hash(maxMessageSize, maxResponseBufferSize, name); + } + + } + + } + + /** + * Fluent API builder for {@link PbjConfig}. + */ + class Builder extends BuilderBase implements io.helidon.common.Builder { + + private Builder() { + } + + @Override + public PbjConfig buildPrototype() { + preBuildPrototype(); + validatePrototype(); + return new PbjConfigImpl(this); + } + + @Override + public PbjConfig build() { + return buildPrototype(); + } + + } + +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfigBlueprint.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfigBlueprint.java new file mode 100644 index 00000000..02d6fc30 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjConfigBlueprint.java @@ -0,0 +1,54 @@ +package com.hedera.pbj.grpc.helidon; + +import com.hedera.pbj.runtime.ServiceInterface; +import io.helidon.builder.api.Option; +import io.helidon.builder.api.Prototype; +import io.helidon.webserver.spi.ProtocolConfig; + +@Prototype.Blueprint +@Prototype.Configured +@Prototype.Provides(ProtocolConfig.class) +interface PbjConfigBlueprint extends ProtocolConfig { + /** + * Default maximum message size in bytes ({@value}). + * + * @see #maxMessageSize() + */ + int DEFAULT_MAX_MESSAGE_SIZE = 1024*10; // 10KB + + /** + * The size of the response buffer to make available to the {@link ServiceInterface}. + * + * @see #maxResponseBufferSize() + */ + int DEFAULT_MAX_RESPONSE_BUFFER_SIZE = 1024*10; // 10KB + + /** + * Maximum size of any message in bytes. + * Defaults to {@value #DEFAULT_MAX_MESSAGE_SIZE}. + * + * @return the maximum number of bytes a single message can be + */ + @Option.DefaultInt(DEFAULT_MAX_MESSAGE_SIZE) + @Option.Configured + int maxMessageSize(); + + /** + * Maximum size of the response buffer in bytes. + * Defaults to {@value #DEFAULT_MAX_RESPONSE_BUFFER_SIZE}. + * + * @return the maximum number of bytes a response can be + */ + @Option.DefaultInt(DEFAULT_MAX_RESPONSE_BUFFER_SIZE) + @Option.Configured + int maxResponseBufferSize(); + + /** + * Protocol configuration type. + * + * @return type of this configuration + */ + default String type() { + return PbjProtocolProvider.CONFIG_NAME; + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjErrorProtocolHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjErrorProtocolHandler.java new file mode 100644 index 00000000..c38e94eb --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjErrorProtocolHandler.java @@ -0,0 +1,63 @@ +package com.hedera.pbj.grpc.helidon; + +import io.helidon.common.buffers.BufferData; +import io.helidon.http.WritableHeaders; +import io.helidon.http.http2.FlowControl; +import io.helidon.http.http2.Http2Flag; +import io.helidon.http.http2.Http2FrameHeader; +import io.helidon.http.http2.Http2Headers; +import io.helidon.http.http2.Http2RstStream; +import io.helidon.http.http2.Http2StreamState; +import io.helidon.http.http2.Http2StreamWriter; +import io.helidon.http.http2.Http2WindowUpdate; +import io.helidon.webserver.http2.spi.Http2SubProtocolSelector; +import java.util.function.Consumer; + +class PbjErrorProtocolHandler implements Http2SubProtocolSelector.SubProtocolHandler { + private final Http2StreamWriter streamWriter; + private final int streamId; + private final Consumer> headerCallback; + private Http2StreamState currentStreamState; + + PbjErrorProtocolHandler(final Http2StreamWriter streamWriter, + final int streamId, + final Http2StreamState currentStreamState, + final Consumer> headerCallback) { + this.streamWriter = streamWriter; + this.streamId = streamId; + this.currentStreamState = currentStreamState; + this.headerCallback = headerCallback; + } + + @Override + public void init() { + WritableHeaders writable = WritableHeaders.create(); + headerCallback.accept(writable); + Http2Headers http2Headers = Http2Headers.create(writable); + streamWriter.writeHeaders(http2Headers, + streamId, + Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM), + FlowControl.Outbound.NOOP); + currentStreamState = Http2StreamState.HALF_CLOSED_LOCAL; + } + + @Override + public Http2StreamState streamState() { + return currentStreamState; + } + + @Override + public void rstStream(Http2RstStream rstStream) { + // No-op + } + + @Override + public void windowUpdate(Http2WindowUpdate update) { + // No-op + } + + @Override + public void data(Http2FrameHeader header, BufferData data) { + // No-op + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjMethodRoute.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjMethodRoute.java new file mode 100644 index 00000000..d9682a62 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjMethodRoute.java @@ -0,0 +1,49 @@ +package com.hedera.pbj.grpc.helidon; + +import static java.util.Objects.requireNonNull; + +import com.hedera.pbj.runtime.ServiceInterface; +import edu.umd.cs.findbugs.annotations.NonNull; +import io.helidon.http.HttpPrologue; +import io.helidon.http.PathMatcher; +import io.helidon.http.PathMatchers; + +/** + * Represents a route in a {@link PbjRouting} that corresponds to a specific gRPC service method. + */ +final class PbjMethodRoute extends PbjRoute { + private static final String SEP = "/"; + private final ServiceInterface service; + private final ServiceInterface.Method method; + private final PathMatcher pathMatcher; + + PbjMethodRoute( + final @NonNull ServiceInterface service, + final @NonNull ServiceInterface.Method method) { + this.service = requireNonNull(service); + this.method = requireNonNull(method); + this.pathMatcher = PathMatchers.exact(service.fullName() + SEP + method.name()); + } + + @Override + @NonNull + PbjMethodRoute toPbjMethodRoute(final @NonNull HttpPrologue grpcPrologue) { + return this; + } + + @Override + @NonNull + PathMatchers.MatchResult accepts(final @NonNull HttpPrologue prologue) { + return pathMatcher.match(prologue.uriPath()); + } + + /** The {@link ServiceInterface.Method} that this route represents. */ + ServiceInterface.Method method() { + return method; + } + + /** The {@link ServiceInterface} that this route represents. */ + ServiceInterface service() { + return service; + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolConfigProvider.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolConfigProvider.java new file mode 100644 index 00000000..18e12251 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolConfigProvider.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.pbj.grpc.helidon; + +import io.helidon.common.config.Config; +import io.helidon.webserver.spi.ProtocolConfigProvider; + +/** + * Implementation of a service provider interface to create grpc protocol configuration. + */ +public class PbjProtocolConfigProvider implements ProtocolConfigProvider { + @Override + public String configKey() { + return PbjProtocolProvider.CONFIG_NAME; + } + + @Override + public PbjConfig create(Config config, String name) { + return PbjConfig.builder() + .config(config) + .name(name) + .build(); + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java new file mode 100644 index 00000000..2f5a6374 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java @@ -0,0 +1,340 @@ +package com.hedera.pbj.grpc.helidon; + +import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_ENCODING; +import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_TIMEOUT; +import static java.lang.System.Logger.Level.ERROR; +import static java.util.Objects.requireNonNull; + +import com.hedera.pbj.grpc.helidon.encoding.Encoding; +import com.hedera.pbj.runtime.ServiceInterface; +import com.hedera.pbj.runtime.io.buffer.Bytes; +import edu.umd.cs.findbugs.annotations.NonNull; +import io.helidon.common.buffers.BufferData; +import io.helidon.http.Header; +import io.helidon.http.HeaderNames; +import io.helidon.http.HeaderValues; +import io.helidon.http.HttpMediaType; +import io.helidon.http.HttpPrologue; +import io.helidon.http.WritableHeaders; +import io.helidon.http.http2.Http2Flag; +import io.helidon.http.http2.Http2FrameData; +import io.helidon.http.http2.Http2FrameHeader; +import io.helidon.http.http2.Http2FrameTypes; +import io.helidon.http.http2.Http2Headers; +import io.helidon.http.http2.Http2RstStream; +import io.helidon.http.http2.Http2Settings; +import io.helidon.http.http2.Http2StreamState; +import io.helidon.http.http2.Http2StreamWriter; +import io.helidon.http.http2.Http2WindowUpdate; +import io.helidon.http.http2.StreamFlowControl; +import io.helidon.webserver.http2.spi.Http2SubProtocolSelector; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Delayed; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +/** + * Implementation of gRPC relying on PBJ. This class specifically contains the glue logic for bridging between + * Helidon and the generated PBJ service handler endpoints. An instance of this class is created for each new + * connection, and each connection is made to a specific method endpoint. + */ +final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHandler { + private static final System.Logger LOGGER = System.getLogger(PbjProtocolHandler.class.getName()); + private static final Header GRPC_ENCODING_IDENTITY = HeaderValues.createCached("grpc-encoding", "identity"); + + private static final HttpMediaType APPLICATION_GRPC = HttpMediaType.create("application/grpc"); + private static final HttpMediaType APPLICATION_GRPC_PROTO = HttpMediaType.create("application/grpc+proto"); + private static final HttpMediaType APPLICATION_GRPC_JSON = HttpMediaType.create("application/grpc+json"); + + private static final String GRPC_TIMEOUT_REGEX = "(\\d{1,8})([HMSmun])"; + private static final Pattern GRPC_TIMEOUT_PATTERN = Pattern.compile(GRPC_TIMEOUT_REGEX); + + // Helidon-specific fields related to the connection itself + private final HttpPrologue prologue; + private final Http2Headers headers; + private final Http2StreamWriter streamWriter; + private final int streamId; + private final Http2Settings serverSettings; + private final Http2Settings clientSettings; + private final StreamFlowControl flowControl; + private Http2StreamState currentStreamState; + + /** The service method that this connection was created for */ + private final PbjMethodRoute route; + /** + * If there is a timeout defined for the request, then this detected is used to determine when the timeout + * deadline has been met. + */ + private final DeadlineDetector deadlineDetector; + /** A future representing the background task detecting deadlines. */ + private ScheduledFuture deadlineFuture = new NoopScheduledFuture(); + /** Whether the next incoming message is compressed. */ + private boolean isCompressed; + /** The encoding as determined by the grpc-encoding header. Will not be null. */ + private Encoding encoding; + /** The current index into {@link #entityBytes} into which data is to be read. */ + private int entityBytesIndex = 0; + /** + * The bytes of the next incoming message. This is created dynamically as a message is received, and is never + * larger than the system configured {@link PbjConfigBlueprint#maxMessageSize()}. + */ + private byte[] entityBytes = null; + private BlockingQueue incomingMessages; + + /** Create a new instance */ + PbjProtocolHandler(final @NonNull HttpPrologue prologue, + final @NonNull Http2Headers headers, + final @NonNull Http2StreamWriter streamWriter, + final int streamId, + final @NonNull Http2Settings serverSettings, + final @NonNull Http2Settings clientSettings, + final @NonNull StreamFlowControl flowControl, + final @NonNull Http2StreamState currentStreamState, + final @NonNull PbjMethodRoute route, + final @NonNull DeadlineDetector deadlineDetector) { + + this.prologue = requireNonNull(prologue); + this.headers = requireNonNull(headers); + this.streamWriter = requireNonNull(streamWriter); + this.streamId = streamId; + this.serverSettings = requireNonNull(serverSettings); + this.clientSettings = requireNonNull(clientSettings); + this.flowControl = requireNonNull(flowControl); + this.currentStreamState = requireNonNull(currentStreamState); + this.route = requireNonNull(route); + this.deadlineDetector = requireNonNull(deadlineDetector); + } + + @Override + public void init() { + try { + // If the grpc-timeout header is present, determine when that timeout would occur. + final var timeout = headers.httpHeaders().value(GRPC_TIMEOUT); + if (timeout.isPresent()) { + final var matcher = GRPC_TIMEOUT_PATTERN.matcher(timeout.get()); + if (matcher.matches()) { + final var num = Integer.parseInt(matcher.group(0)); + final var unit = matcher.group(1); + final var deadline = System.nanoTime() + num * switch (unit) { + case "H" -> 3600_000_000_000L; + case "M" -> 60_000_000_000L; + case "S" -> 1_000_000_000L; + case "m" -> 1_000_000L; + case "u" -> 1_000L; + case "n" -> 1L; + default -> throw new IllegalArgumentException("Invalid unit: " + unit); + }; + deadlineFuture = deadlineDetector.scheduleDeadline(deadline, () -> { + close(GrpcStatus.DEADLINE_EXCEEDED); + }); + } + } + + // Get the encoding to use. We always use one, even if it is just "identity". This implementation currently + // only supports receiving compressed / encoded messages, it always responds with "identity" messages. + // This could be modified in the future, there is no reason not to support compression. + final var encodingHeader = headers.httpHeaders().value(GRPC_ENCODING).orElse("identity"); + encoding = switch (encodingHeader) { + case "identity" -> Encoding.IDENTITY; + case "gzip" -> Encoding.GZIP; + default -> throw new IllegalArgumentException("Unsupported encoding: " + encodingHeader); + }; + + // We know the content type has been set and starts with "application/grpc", otherwise this handler would + // not have been called. But we don't know whether it is "application/grpc" or "application/grpc+proto" or + // "application/grpc+json". Normalize "application/grpc" to "application/grpc+proto", and otherwise just + // pass whatever the content type is along to the service handler. Maybe it will support something + final var contentType = headers.httpHeaders().contentType().orElseThrow(); + final var normalizedContentType = contentType.equals(APPLICATION_GRPC) ? APPLICATION_GRPC_PROTO : contentType; + final var contentSubType = normalizedContentType.subtype(); + final var contentTypeExt = contentSubType.substring(contentSubType.indexOf('+') + 1); + + // todo Extract any custom metadata to pass along as well + + // Create the "options" to make available to the service handler. These options are used by the service + // handler to decide on the best way to parse or handle the request. + final var options = new ServiceInterface.RequestOptions() { + @Override + public boolean isProtobuf() { + return contentTypeExt.equals(ServiceInterface.RequestOptions.APPLICATION_GRPC_PROTO); + } + + @Override + public boolean isJson() { + return contentTypeExt.equals(ServiceInterface.RequestOptions.APPLICATION_GRPC_JSON); + } + + @Override + public String contentType() { + return contentTypeExt; + } + }; + + incomingMessages = new ArrayBlockingQueue<>(10); // TODO Take from config + route.service().open(options, route.method(), incomingMessages, new ServiceInterface.ResponseCallback() { + @Override + public void start() { + // todo ignoring headers, just sending required response headers + WritableHeaders writable = WritableHeaders.create(); + writable.set(HeaderNames.CONTENT_TYPE, normalizedContentType.text()); // Respond with the same content type we received + writable.set(GRPC_ENCODING_IDENTITY); + + Http2Headers http2Headers = Http2Headers.create(writable); + http2Headers.status(io.helidon.http.Status.OK_200); + streamWriter.writeHeaders(http2Headers, + streamId, + Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS), + flowControl.outbound()); + } + + @Override + public void send(Bytes response) { + try { + final int length = (int) response.length(); + BufferData bufferData = BufferData.create(5 + length); + bufferData.write(0); + bufferData.writeUnsignedInt32(length); + bufferData.write(response.toByteArray()); + + // todo flags based on method type + // end flag should be sent when last message is sent (or just rst stream if we cannot determine this) + + Http2FrameHeader header = Http2FrameHeader.create(bufferData.available(), + Http2FrameTypes.DATA, + Http2Flag.DataFlags.create(0), + streamId); + + streamWriter.writeData(new Http2FrameData(header, bufferData), flowControl.outbound()); + } catch (Exception e) { + LOGGER.log(ERROR, "Failed to respond to grpc request: " + route.method(), e); + } + + } + + @Override + public void close() { + // If the deadline has not already been reached, then go ahead and cancel it. + deadlineFuture.cancel(false); + // If the deadline was not canceled, then it means it was already done before we got here, + // which means a separate close has already happened, so we cannot close again. + if (!deadlineFuture.isCancelled()) { + PbjProtocolHandler.this.close(GrpcStatus.OK); + } + } + }); + } catch (Throwable e) { + LOGGER.log(ERROR, "Failed to initialize grpc protocol handler", e); + throw e; + } + + } + + @Override + public Http2StreamState streamState() { + return currentStreamState; + } + + @Override + public void rstStream(Http2RstStream rstStream) { +// listener.onComplete(); + } + + @Override + public void windowUpdate(Http2WindowUpdate update) { + + } + + /** + * Called by the webserver whenever some additional data is available on the stream. The data comes in chunks, + * it may be that an entire message is available in the chunk, or it may be that the data is broken out over + * multiple chunks. + */ + @Override + public void data(Http2FrameHeader header, BufferData data) { + try { + while (data.available() > 0) { + // First chunk of data contains the compression flag and the length of the message + if (entityBytes == null) { + // Read whether this message is compressed. We do not currently support compression. + isCompressed = (data.read() == 1); + if (isCompressed) { + // TODO Proper logging and error handling + throw new IllegalArgumentException("Compression is not supported"); + } + // Read the length of the message. As per the grpc protocol specification, each message on the + // wire is prefixed with the number of bytes for the message. However, to prevent a DOS attack + // where the attacker sends us a very large length and exhausts our memory, we have a maximum + // message size configuration setting. Using that, we can detect attempts to exhaust our memory. + final long length = data.readUnsignedInt32(); + if (length > PbjConfigBlueprint.DEFAULT_MAX_MESSAGE_SIZE) { // TODO Needs proper config + // TODO Proper logging and error handling + throw new IllegalArgumentException("Message size exceeds maximum allowed size: " + + length + " > " + PbjConfigBlueprint.DEFAULT_MAX_MESSAGE_SIZE); + } + // Create a buffer to hold the message. We sadly cannot reuse this buffer because once we have + // filled it and wrapped it in Bytes and sent it to the handler, some user code may grab and hold + // that Bytes object for an arbitrary amount of time, and if we were to scribble into the same + // byte array, we would break the application. So we need a new buffer each time :-( + entityBytes = new byte[(int) length]; + entityBytesIndex = 0; + } + + // By the time we get here, entityBytes is no longer null. It may be empty, or it may already have + // been partially populated from a previous iteration. It may be that the number of bytes available + // to be read is larger than just this one message. So we need to be careful to read, from what is + // available, only up to the message length, and to leave the rest for the next iteration. + final int available = data.available(); + final int numBytesToRead = Math.min(entityBytes.length - entityBytesIndex, available); + data.read(entityBytes, entityBytesIndex, numBytesToRead); + entityBytesIndex += numBytesToRead; + + // If we have completed reading the message, then we can proceed. + if (entityBytesIndex == entityBytes.length) { + // Grab and wrap the bytes and reset to being reading the next message + final var bytes = Bytes.wrap(entityBytes); + incomingMessages.put(bytes); + entityBytesIndex = 0; + entityBytes = null; + } + } + + // The end of the stream has been reached! It is possible that a bad client will send end of stream before + // all the message data we sent. In that case, it is as if the message were never sent. + if (header.flags(Http2FrameTypes.DATA).endOfStream()) { + entityBytesIndex = 0; + entityBytes = null; +// listener.onHalfClose(); + currentStreamState = Http2StreamState.HALF_CLOSED_LOCAL; + } + } catch (Exception e) { + LOGGER.log(ERROR, "Failed to process grpc request: " + data.debugDataHex(true), e); + } + } + + private void close(Header status) { + WritableHeaders writable = WritableHeaders.create(); + writable.set(status); + + Http2Headers http2Headers = Http2Headers.create(writable); + streamWriter.writeHeaders(http2Headers, + streamId, + Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM), + flowControl.outbound()); + currentStreamState = Http2StreamState.HALF_CLOSED_LOCAL; + } + + private static final class NoopScheduledFuture extends CompletableFuture implements ScheduledFuture { + @Override + public long getDelay(TimeUnit unit) { + return 0; + } + + @Override + public int compareTo(Delayed o) { + return 0; + } + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolProvider.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolProvider.java new file mode 100644 index 00000000..8a3aa1ae --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolProvider.java @@ -0,0 +1,36 @@ +package com.hedera.pbj.grpc.helidon; + +import io.helidon.webserver.ProtocolConfigs; +import io.helidon.webserver.http2.spi.Http2SubProtocolProvider; +import io.helidon.webserver.http2.spi.Http2SubProtocolSelector; + +/** + * {@link java.util.ServiceLoader} provider implementation of pbj sub-protocol of HTTP/2. + */ +public class PbjProtocolProvider implements Http2SubProtocolProvider { + static final String CONFIG_NAME = "pbj"; + + /** + * Default constructor required by Java {@link java.util.ServiceLoader}. + * + * @deprecated please do not use directly outside of testing, this is reserved for Java {@link java.util.ServiceLoader} + */ + @Deprecated + public PbjProtocolProvider() { + } + + @Override + public String protocolType() { + return CONFIG_NAME; + } + + @Override + public Class protocolConfigType() { + return PbjConfig.class; + } + + @Override + public Http2SubProtocolSelector create(PbjConfig config, ProtocolConfigs configs) { + return new PbjProtocolSelector(); + } +} \ No newline at end of file diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolSelector.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolSelector.java new file mode 100644 index 00000000..13224dd4 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolSelector.java @@ -0,0 +1,118 @@ +package com.hedera.pbj.grpc.helidon; + +import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_ACCEPT_ENCODING; +import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_ENCODING; + +import io.helidon.http.HeaderNames; +import io.helidon.http.HttpPrologue; +import io.helidon.http.Method; +import io.helidon.http.Status; +import io.helidon.http.http2.Http2Headers; +import io.helidon.http.http2.Http2Settings; +import io.helidon.http.http2.Http2StreamState; +import io.helidon.http.http2.Http2StreamWriter; +import io.helidon.http.http2.StreamFlowControl; +import io.helidon.webserver.ConnectionContext; +import io.helidon.webserver.Router; +import io.helidon.webserver.http2.spi.Http2SubProtocolSelector; +import io.helidon.webserver.http2.spi.SubProtocolResult; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Sub-protocol selector for HTTP/2. This is the main entry point into the PBJ implementation of gRPC. The web + * server will use this class to determine if a request is a gRPC request and if so, how to handle it. + */ +public class PbjProtocolSelector implements Http2SubProtocolSelector { + private final DeadlineDetector deadlineDetector; + + /** + * Create a new PBJ based grpc protocol selector (default). Access restricted to be package-private so as + * to limit instantiation to the {@link PbjProtocolProvider}. + */ + PbjProtocolSelector() { + deadlineDetector = new DeadlineDetector() { + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + @Override + public ScheduledFuture scheduleDeadline(long deadline, Runnable onDeadlineExceeded) { + return executorService.schedule(onDeadlineExceeded, deadline, TimeUnit.NANOSECONDS); + } + }; + } + + /** + * Called by Helidon to create the sub-protocol for PBJ gRPC requests. The {@link SubProtocolResult} returned + * will be responsible for handling the request. + */ + @Override + public SubProtocolResult subProtocol(ConnectionContext ctx, + HttpPrologue prologue, + Http2Headers headers, + Http2StreamWriter streamWriter, + int streamId, + Http2Settings serverSettings, + Http2Settings clientSettings, + StreamFlowControl flowControl, + Http2StreamState currentStreamState, + Router router) { + // As per the specification, only POST requests are supported. I would have thought that the code here should + // return a response code of 405 (Method Not Allowed) if the method is not POST, but the code here just returns + // NOT_SUPPORTED. I'm not sure if this is technically correct. + if (prologue.method() != Method.POST) { + return NOT_SUPPORTED; + } + + // If Content-Type does not begin with "application/grpc", gRPC servers SHOULD respond with HTTP status of + // 415 (Unsupported Media Type). This will prevent other HTTP/2 clients from interpreting a gRPC error + // response, which uses status 200 (OK), as successful. + // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md + final var httpHeaders = headers.httpHeaders(); + final var contentType = httpHeaders.value(HeaderNames.CONTENT_TYPE).orElse(""); + if (!contentType.startsWith("application/grpc")) { + return new SubProtocolResult(true, + new PbjErrorProtocolHandler(streamWriter, streamId, currentStreamState, h -> + h.set(Http2Headers.STATUS_NAME, Status.UNSUPPORTED_MEDIA_TYPE_415.code()))); + } + + // Look up the route based on the path. If that route does not exist, we return a 200 OK response with + // a gRPC status of NOT_FOUND. + PbjRouting routing = router.routing(PbjRouting.class, PbjRouting.EMPTY); + PbjMethodRoute route = routing.findRoute(prologue); + if (route == null) { + return new SubProtocolResult(true, + new PbjErrorProtocolHandler(streamWriter, streamId, currentStreamState, h -> { + h.set(Http2Headers.STATUS_NAME, Status.OK_200.code()); + h.set(GrpcStatus.NOT_FOUND); + })); + } + + // This implementation currently only supports "identity" and "gzip" compression. As per the documentation: + // If a client message is compressed by an algorithm that is not supported by a server, the message will result + // in an UNIMPLEMENTED error status on the server. The server will include a grpc-accept-encoding header [in] + // the response which specifies the algorithms that the server accepts. + final var encoding = httpHeaders.value(GRPC_ENCODING).orElse("identity"); + if (!"identity".equals(encoding) && !"gzip".equals(encoding)) { + return new SubProtocolResult(true, + new PbjErrorProtocolHandler(streamWriter, streamId, currentStreamState, h -> { + h.set(GrpcStatus.UNIMPLEMENTED); + h.set(GRPC_ACCEPT_ENCODING, "identity,gzip"); + })); + } + + // This looks like a valid call! We will return a new PbjProtocolHandler to handle the request. + return new SubProtocolResult(true, + new PbjProtocolHandler(prologue, + headers, + streamWriter, + streamId, + serverSettings, + clientSettings, + flowControl, + currentStreamState, + route, + deadlineDetector)); + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjRoute.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjRoute.java new file mode 100644 index 00000000..80f82cf8 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjRoute.java @@ -0,0 +1,28 @@ +package com.hedera.pbj.grpc.helidon; + +import com.hedera.pbj.runtime.ServiceInterface; +import edu.umd.cs.findbugs.annotations.NonNull; +import io.helidon.http.HttpPrologue; +import io.helidon.http.PathMatchers; +import io.helidon.webserver.Route; + +/** + * This base class represents a route in a {@link PbjRouting}. The route could represent an entire + * {@link ServiceInterface}, or a specific method within a service. + */ +abstract class PbjRoute implements Route { + /** + * Given a {@link HttpPrologue}, locate and return the appropriate {@link PbjMethodRoute} object that represents + * the gRPC service and method that should be invoked. + * @param grpcPrologue The prologue of the HTTP request. + * @return The {@link PbjMethodRoute} object that represents the gRPC service and method that should be invoked, or null. + */ + abstract /*@Nullable*/ PbjMethodRoute toPbjMethodRoute(final @NonNull HttpPrologue grpcPrologue); + + /** + * Given a {@link HttpPrologue}, determine if this route should accept the request. + * @param prologue The prologue of the HTTP request. + * @return A {@link PathMatchers.MatchResult} that indicates if this route should accept the request. + */ + abstract @NonNull PathMatchers.MatchResult accepts(final @NonNull HttpPrologue prologue); +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjRouting.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjRouting.java new file mode 100644 index 00000000..1e31e044 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjRouting.java @@ -0,0 +1,111 @@ +package com.hedera.pbj.grpc.helidon; + +import com.hedera.pbj.runtime.ServiceInterface; +import edu.umd.cs.findbugs.annotations.NonNull; +import io.helidon.http.HttpPrologue; +import io.helidon.webserver.Routing; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +/** + * A Helidon {@link Routing} used for constructing the routes for PBJ-based gRPC services. + * + *

+ * {@code
+ *     WebServer.builder()
+ *                 .port(8080)
+ *                 .addRouting(PbjRouting.builder().service(new HelloServiceImpl()))
+ *                 .build()
+ *                 .start();
+ * }
+ * 
+ */ +public class PbjRouting implements Routing { + /** This routing has absolutely no routes. */ + static final PbjRouting EMPTY = PbjRouting.builder().build(); + + /** The list of routes. */ + private final List routes; + + /** Create a new instance. This is private, so it can only be created using the builder method. */ + private PbjRouting(final @NonNull Builder builder) { + this.routes = new ArrayList<>(builder.routes); + } + + @Override + public Class routingType() { + return PbjRouting.class; + } + + @Override + public void beforeStart() { + for (final PbjRoute route : routes) { + route.beforeStart(); + } + } + + @Override + public void afterStop() { + for (final PbjRoute route : routes) { + route.afterStop(); + } + } + + /** + * Find a route that matches the given prologue. A prologue would be the first part of the path, for instance. + * When registered, a route may have wildcard matches to paths, etc. + * + * @param prologue the prologue to match + * @return the route that matches the prologue, or {@code null} if no route matches + */ + PbjMethodRoute findRoute(final HttpPrologue prologue) { + for (final PbjRoute route : routes) { + final var accepts = route.accepts(prologue); + if (accepts.accepted()) { + return route.toPbjMethodRoute(prologue); + } + } + + return null; + } + + /** + * Create a new builder instance to be used to construct a {@link PbjRouting} instance. + * + * @return a new builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Fluent API builder for {@link PbjRouting}. A single {@link PbjRouting} may contain multiple services. + */ + public static class Builder implements io.helidon.common.Builder { + private final List routes = new LinkedList<>(); + + private Builder() { + } + + @Override + public PbjRouting build() { + return new PbjRouting(this); + } + + /** + * Configure grpc service. + * + * @param service service to add + * @return updated builder + */ + public Builder service(final ServiceInterface service) { + return route(new PbjServiceRoute(service)); + } + + private Builder route(PbjRoute route) { + routes.add(route); + return this; + } + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjServiceRoute.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjServiceRoute.java new file mode 100644 index 00000000..db63a1cf --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjServiceRoute.java @@ -0,0 +1,49 @@ +package com.hedera.pbj.grpc.helidon; + +import static java.util.Objects.requireNonNull; + +import com.hedera.pbj.runtime.ServiceInterface; +import edu.umd.cs.findbugs.annotations.NonNull; +import io.helidon.http.HttpPrologue; +import io.helidon.http.PathMatchers; +import java.util.List; + +/** + * An implementation of {@link PbjRoute} that represents an entire service. + */ +class PbjServiceRoute extends PbjRoute { + /** The name of the service. */ + private final String serviceName; + /** One {@link PbjMethodRoute} for each method in the service. */ + private final List routes; + + PbjServiceRoute(final @NonNull ServiceInterface service) { + this.serviceName = requireNonNull(service).serviceName(); + this.routes = service.methods().stream().map(method -> new PbjMethodRoute(service, method)).toList(); + } + + @Override + @NonNull + PbjMethodRoute toPbjMethodRoute(final @NonNull HttpPrologue prologue) { + for (final PbjMethodRoute route : routes) { + final var accepts = route.accepts(prologue); + if (accepts.accepted()) { + return route; + } + } + throw new IllegalStateException("PbjServiceRoute(" + serviceName + ") accepted prologue, " + + "but cannot provide route: " + prologue); + } + + @Override + @NonNull + PathMatchers.MatchResult accepts(final @NonNull HttpPrologue prologue) { + for (final PbjMethodRoute route : routes) { + final var accepts = route.accepts(prologue); + if (accepts.accepted()) { + return accepts; + } + } + return PathMatchers.MatchResult.notAccepted(); + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/Encoding.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/Encoding.java new file mode 100644 index 00000000..8c96289d --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/Encoding.java @@ -0,0 +1,8 @@ +package com.hedera.pbj.grpc.helidon.encoding; + +public interface Encoding { + GzipEncoding GZIP = new GzipEncoding(); + IdentityEncoding IDENTITY = new IdentityEncoding(); + + byte[] decode(byte[] data) throws Exception; +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/GzipEncoding.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/GzipEncoding.java new file mode 100644 index 00000000..af0e004e --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/GzipEncoding.java @@ -0,0 +1,11 @@ +package com.hedera.pbj.grpc.helidon.encoding; + +import java.io.ByteArrayInputStream; +import java.util.zip.GZIPInputStream; + +public class GzipEncoding implements Encoding { + @Override + public byte[] decode(byte[] data) throws Exception { + return new GZIPInputStream(new ByteArrayInputStream(data)).readAllBytes(); + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/IdentityEncoding.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/IdentityEncoding.java new file mode 100644 index 00000000..59e17663 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/IdentityEncoding.java @@ -0,0 +1,8 @@ +package com.hedera.pbj.grpc.helidon.encoding; + +public class IdentityEncoding implements Encoding { + @Override + public byte[] decode(byte[] data) { + return data; + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/module-info.java b/pbj-core/pbj-grpc-helidon/src/main/java/module-info.java new file mode 100644 index 00000000..e7a469d8 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/java/module-info.java @@ -0,0 +1,14 @@ +/** Runtime module of code needed by PBJ generated code at runtime. */ +module com.hedera.pbj.grpc.helidon { + requires static com.github.spotbugs.annotations; + requires com.hedera.pbj.runtime; + requires io.helidon.webserver; + requires io.helidon.webserver.http2; + + exports com.hedera.pbj.grpc.helidon; + + provides io.helidon.webserver.http2.spi.Http2SubProtocolProvider with + com.hedera.pbj.grpc.helidon.PbjProtocolProvider; + provides io.helidon.webserver.spi.ProtocolConfigProvider with + com.hedera.pbj.grpc.helidon.PbjProtocolConfigProvider; +} diff --git a/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/helidon/config-metadata.json b/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/helidon/config-metadata.json new file mode 100644 index 00000000..172fa629 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/helidon/config-metadata.json @@ -0,0 +1 @@ +[{"module":"com.hedera.pbj.grpc.helidon","types":[{"annotatedType":"com.hedera.pbj.grpc.helidon.PbjConfig","type":"com.hedera.pbj.grpc.helidon.PbjConfig","producers":["com.hedera.pbj.grpc.helidon.PbjConfig#create(io.helidon.common.config.Config)","com.hedera.pbj.grpc.helidon.PbjConfig#builder()"],"provides":["io.helidon.webserver.spi.ProtocolConfig"],"options":[{"defaultValue":"10240","description":"Maximum size of any message in bytes.\n Defaults to {@value #DEFAULT_MAX_MESSAGE_SIZE}.\n\n @return the maximum number of bytes a single message can be","key":"max-message-size","method":"com.hedera.pbj.grpc.helidon.PbjConfig.Builder#maxMessageSize(int)","type":"java.lang.Integer"},{"defaultValue":"10240","description":"Maximum size of the response buffer in bytes.\n Defaults to {@value #DEFAULT_MAX_RESPONSE_BUFFER_SIZE}.\n\n @return the maximum number of bytes a response can be","key":"max-response-buffer-size","method":"com.hedera.pbj.grpc.helidon.PbjConfig.Builder#maxResponseBufferSize(int)","type":"java.lang.Integer"}]}]}] \ No newline at end of file diff --git a/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/helidon/feature-metadata.properties b/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/helidon/feature-metadata.properties new file mode 100644 index 00000000..1c6688fc --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/helidon/feature-metadata.properties @@ -0,0 +1,6 @@ +m=com.hedera.pbj.grpc.helidon +n=PBJ-GRPC +d=WebServer PBJ gRPC Support +in=SE +p=WebServer,PBJ,GRPC +pr=true diff --git a/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/services/io.helidon.webserver.http2.spi.Http2SubProtocolProvider b/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/services/io.helidon.webserver.http2.spi.Http2SubProtocolProvider new file mode 100644 index 00000000..c0a553e3 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/services/io.helidon.webserver.http2.spi.Http2SubProtocolProvider @@ -0,0 +1,2 @@ +# This file shouldn't be needed, because the module declaration? +com.hedera.pbj.grpc.helidon.PbjProtocolProvider diff --git a/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/services/io.helidon.webserver.spi.ProtocolConfigProvider b/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/services/io.helidon.webserver.spi.ProtocolConfigProvider new file mode 100644 index 00000000..018e2071 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/main/resources/META-INF/services/io.helidon.webserver.spi.ProtocolConfigProvider @@ -0,0 +1,2 @@ +# This file shouldn't be needed, because the module declaration? +com.hedera.pbj.grpc.helidon.PbjProtocolConfigProvider diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/http/HttpTest.java b/pbj-core/pbj-grpc-helidon/src/test/java/http/HttpTest.java new file mode 100644 index 00000000..81db59cf --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/test/java/http/HttpTest.java @@ -0,0 +1,23 @@ +package http; + +import io.helidon.webclient.api.WebClient; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import org.junit.jupiter.api.Test; + +public class HttpTest { + @Test + void simpleHttpCall() throws InterruptedException { + final var pool = Executors.newFixedThreadPool(100); + final var latch = new CountDownLatch(1000); + for (int i=0; i<1000; i++) { + pool.submit(() -> { + final var client = WebClient.builder().baseUri("http://localhost:8080").build(); + System.out.println(client.get().path("/greet").request().as(String.class)); + latch.countDown(); + }); + } + + latch.await(); + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java new file mode 100644 index 00000000..0f3d342b --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java @@ -0,0 +1,122 @@ +package pbj; + +import com.hedera.hapi.node.base.Transaction; +import com.hedera.hapi.node.transaction.Query; +import com.hedera.hapi.node.transaction.Response; +import com.hedera.hapi.node.transaction.TransactionResponse; +import com.hedera.pbj.runtime.ServiceInterface; +import com.hedera.pbj.runtime.io.buffer.Bytes; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +public interface ConsensusService extends ServiceInterface { + enum ConsensusMethod implements Method { + createTopic, + updateTopic, + deleteTopic, + submitMessage, + getTopicInfo; + } + + TransactionResponse createTopic(Transaction tx); + TransactionResponse updateTopic(Transaction tx); + TransactionResponse deleteTopic(Transaction tx); + TransactionResponse submitMessage(Transaction tx); + Response getTopicInfo(Query q); + + default String serviceName() { + return "ConsensusService"; + } + + default String fullName() { + return "proto.ConsensusService"; + } + + default List methods() { + return List.of( + ConsensusMethod.createTopic, + ConsensusMethod.updateTopic, + ConsensusMethod.deleteTopic, + ConsensusMethod.submitMessage, + ConsensusMethod.getTopicInfo); + } + + @Override + default void open( + final /*@NonNull*/ RequestOptions options, + final /*@NonNull*/ Method method, + final /*@NonNull*/ BlockingQueue messages, + final /*@NonNull*/ ResponseCallback callback) { + + final var m = (ConsensusMethod) method; + Thread.ofVirtual().start(() -> { + try { + switch (m) { + case ConsensusMethod.createTopic -> { + // Unary method + final var message = messages.take(); + callback.start(); + final var messageBytes = options.isProtobuf() // What if it isn't JSON or PROTOBUF? + ? Transaction.PROTOBUF.parse(message) + : Transaction.JSON.parse(message); + final var response = createTopic(messageBytes); + final var responseBytes = TransactionResponse.PROTOBUF.toBytes(response); + callback.send(responseBytes); + callback.close(); + } + case ConsensusMethod.updateTopic -> { + // Unary method + final var message = messages.take(); + callback.start(); + final var messageBytes = options.isProtobuf() + ? Transaction.PROTOBUF.parse(message) + : Transaction.JSON.parse(message); + final var response = updateTopic(messageBytes); + final var responseBytes = TransactionResponse.PROTOBUF.toBytes(response); + callback.send(responseBytes); + callback.close(); + } + case ConsensusMethod.deleteTopic -> { + // Unary method + final var message = messages.take(); + callback.start(); + final var messageBytes = options.isProtobuf() + ? Transaction.PROTOBUF.parse(message) + : Transaction.JSON.parse(message); + final var response = deleteTopic(messageBytes); + final var responseBytes = TransactionResponse.PROTOBUF.toBytes(response); + callback.send(responseBytes); + callback.close(); + } + case ConsensusMethod.submitMessage -> { + // Unary method + final var message = messages.take(); + callback.start(); + final var messageBytes = options.isProtobuf() + ? Transaction.PROTOBUF.parse(message) + : Transaction.JSON.parse(message); + final var response = submitMessage(messageBytes); + final var responseBytes = TransactionResponse.PROTOBUF.toBytes(response); + callback.send(responseBytes); + callback.close(); + } + case ConsensusMethod.getTopicInfo -> { + // Unary method + final var message = messages.take(); + callback.start(); + final var messageBytes = options.isProtobuf() + ? Query.PROTOBUF.parse(message) + : Query.JSON.parse(message); + final var response = getTopicInfo(messageBytes); + final var responseBytes = Response.PROTOBUF.toBytes(response); + callback.send(responseBytes); + callback.close(); + } + } + } catch (Exception e) { + e.printStackTrace(); + callback.close(); + } + }); + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/PbjTest.java b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/PbjTest.java new file mode 100644 index 00000000..e069c5df --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/PbjTest.java @@ -0,0 +1,337 @@ +package pbj; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.hedera.hapi.node.base.Transaction; +import com.hedera.hapi.node.transaction.Query; +import com.hedera.hapi.node.transaction.Response; +import com.hedera.hapi.node.transaction.TransactionResponse; +import com.hedera.pbj.grpc.helidon.GrpcStatus; +import com.hedera.pbj.grpc.helidon.PbjRouting; +import com.hedera.pbj.runtime.io.buffer.Bytes; +import com.hedera.pbj.runtime.io.stream.ReadableStreamingData; +import com.hedera.pbj.runtime.io.stream.WritableStreamingData; +import io.helidon.common.media.type.MediaType; +import io.helidon.http.HttpMediaType; +import io.helidon.http.Method; +import io.helidon.webclient.api.WebClient; +import io.helidon.webserver.WebServer; +import java.io.ByteArrayOutputStream; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class PbjTest { + private static final MediaType APPLICATION_GRPC = HttpMediaType.create("application/grpc"); + private static final MediaType APPLICATION_GRPC_PROTO = HttpMediaType.create("application/grpc+proto"); + private static final MediaType APPLICATION_GRPC_JSON = HttpMediaType.create("application/grpc+json"); + private static final MediaType APPLICATION_RANDOM = HttpMediaType.create("application/random"); + private static WebClient CLIENT; + private static final String SUBMIT_MESSAGE_PATH = "/proto.ConsensusService/submitMessage"; + + @BeforeAll + static void setup() { + // Set up the server + WebServer.builder() + .port(8080) + .addRouting(PbjRouting.builder().service(new ConsensusServiceImpl())) + .build() + .start(); + + CLIENT = WebClient.builder() + .baseUri("http://localhost:8080") + .build(); + } + + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // HTTP2 Path + // + // SPEC: + // + // Path is case-sensitive. Some gRPC implementations may allow the Path format shown above to be overridden, but + // this functionality is strongly discouraged. gRPC does not go out of its way to break users that are using this + // kind of override, but we do not actively support it, and some functionality (e.g., service config support) will + // not work when the path is not of the form shown above. + // + // TESTS: + // - Verify the path is case-sensitive + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + @Test + void badCaseOnPathIsNotFound() { + try (var response = CLIENT.post() + .protocolId("h2") + .contentType(APPLICATION_GRPC_PROTO) + .path(SUBMIT_MESSAGE_PATH.toUpperCase()) + .submit(messageBytes(Transaction.DEFAULT))) { + assertThat(response.status().code()).isEqualTo(200); + assertThat(response.headers().get(GrpcStatus.STATUS_NAME)).isEqualTo(GrpcStatus.NOT_FOUND); + } + } + + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // HTTP2 Method + // + // SPEC: + // + // Only POST can be used for gRPC calls. + // + // TESTS: + // - Verify that only POST is supported + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + @ParameterizedTest + @ValueSource(strings = { "GET", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD", "TRACE" }) + void mustUsePost(final String methodName) { + try (var response = CLIENT.method(Method.create(methodName)) + .protocolId("h2") + .contentType(APPLICATION_GRPC_PROTO) + .path(SUBMIT_MESSAGE_PATH) + .request()) { + + // This is consistent with existing behavior on Helidon, but I would have expected the response code + // to be 405 Method Not Allowed instead. See PbjProtocolSelector for the check for POST. + assertThat(response.status().code()).isEqualTo(404); + } + } + + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Content-Type + // + // SPEC: + // + // If Content-Type does not begin with "application/grpc", gRPC servers SHOULD respond with HTTP status of 415 + // (Unsupported Media Type). This will prevent other HTTP/2 clients from interpreting a gRPC error response, which + // uses status 200 (OK), as successful. + // + // TESTS: + // - Verify that the server responds with 415 when the Content-Type is not specified + // - Verify that the server responds with 415 when the Content-Type is not "application/grpc" + // - Verify that both "application/grpc+proto" and "application/grpc+json" are accepted + // - Verify that if "application/grpc" is used, it defaults to "application/grpc+proto" + // - Verify that the response is encoded as JSON when "application/grpc+json" is used + // - Verify that the response is encoded as protobuf when "application/grpc+proto" or "application/grpc" is used + // - Verify that a custom encoding "application/grpc+custom" can work + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + @Test + void contentTypeMustBeSet() { + try (var response = CLIENT.post() + .protocolId("h2") + .path(SUBMIT_MESSAGE_PATH) + .submit(messageBytes(Transaction.DEFAULT))) { + + assertThat(response.status().code()).isEqualTo(415); + } + } + + @Test + void contentTypeMustStartWithApplicationGrpc() { + try (var response = CLIENT.post() + .protocolId("h2") + .path(SUBMIT_MESSAGE_PATH) + .contentType(APPLICATION_RANDOM) + .submit(messageBytes(Transaction.DEFAULT))) { + + assertThat(response.status().code()).isEqualTo(415); + } + } + + @Test + void contentTypeCanBeJSON() { + try (var response = CLIENT.post() + .protocolId("h2") + .path(SUBMIT_MESSAGE_PATH) + .contentType(APPLICATION_GRPC_JSON) + .submit(messageBytesJson(Transaction.DEFAULT))) { + + // TODO Assert that the response is also encoded as JSON + assertThat(response.status().code()).isEqualTo(200); + assertThat(response.headers().contentType().orElseThrow().text()).isEqualTo("application/grpc+json"); + } + } + + @Test + void contentTypeWithoutProtoDefaultsToProto() { + + } + + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // METADATA + // + // SPEC: + // + // Custom-Metadata is an arbitrary set of key-value pairs defined by the application layer. Header names starting + // with "grpc-" but not listed here are reserved for future GRPC use and should not be used by applications as + // Custom-Metadata. + // + // Note that HTTP2 does not allow arbitrary octet sequences for header values so binary header values must be + // encoded using Base64 as per https://tools.ietf.org/html/rfc4648#section-4. Implementations MUST accept padded + // and un-padded values and should emit un-padded values. Applications define binary headers by having their names + // end with "-bin". Runtime libraries use this suffix to detect binary headers and properly apply base64 encoding & + // decoding as headers are sent and received. + // + // Custom-Metadata header order is not guaranteed to be preserved except for values with duplicate header names. + // Duplicate header names may have their values joined with "," as the delimiter and be considered semantically + // equivalent. Implementations must split Binary-Headers on "," before decoding the Base64-encoded values. + // + // ASCII-Value should not have leading or trailing whitespace. If it contains leading or trailing whitespace, it + // may be stripped. The ASCII-Value character range defined is stricter than HTTP. Implementations must not error + // due to receiving an invalid ASCII-Value that's a valid field-value in HTTP, but the precise behavior is not + // strictly defined: they may throw the value away or accept the value. If accepted, care must be taken to make + // sure that the application is permitted to echo the value back as metadata. For example, if the metadata is + // provided to the application as a list in a request, the application should not trigger an error by providing + // that same list as the metadata in the response. + // + // TESTS: + // - TBD + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Request-Headers + // + // SPEC: + // + // Servers may limit the size of Request-Headers, with a default of 8 KiB suggested. Implementations are encouraged + // to compute total header size like HTTP/2's SETTINGS_MAX_HEADER_LIST_SIZE: the sum of all header fields, for each + // field the sum of the uncompressed field name and value lengths plus 32, with binary values' lengths being + // post-Base64. + // + // TESTS: + // - TBD + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Compression + // + // SPEC: + // + // The repeated sequence of Length-Prefixed-Message items is delivered in DATA frames + // + // - Length-Prefixed-Message → Compressed-Flag Message-Length Message + // - Compressed-Flag → 0 / 1 # encoded as 1 byte unsigned integer + // - Message-Length → {length of Message} # encoded as 4 byte unsigned integer (big endian) + // - Message → *{binary octet} + // + // A Compressed-Flag value of 1 indicates that the binary octet sequence of Message is compressed using the + // mechanism declared by the Message-Encoding header. A value of 0 indicates that no encoding of Message bytes has + // occurred. Compression contexts are NOT maintained over message boundaries, implementations must create a new + // context for each message in the stream. If the Message-Encoding header is omitted then the Compressed-Flag must + // be 0. + // + // TESTS: + // - TBD + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Unary Method Calls + // + // TESTS: + // - A correct unary call should return a 200 OK response with a gRPC status of OK + // - A correct unary call to a failed method should return a 200 OK response with an error code + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + @Test + void unaryCall() throws Exception { + try (var response = CLIENT.post() + .protocolId("h2") + .contentType(APPLICATION_GRPC_PROTO) + .path(SUBMIT_MESSAGE_PATH) + .submit(messageBytes(Transaction.DEFAULT))) { + + assertThat(response.status().code()).isEqualTo(200); + assertThat(response.headers().get(GrpcStatus.STATUS_NAME)).isEqualTo(GrpcStatus.OK); + + final var rsd = new ReadableStreamingData(response.inputStream()); + assertThat(rsd.readByte()).isEqualTo((byte) 0); // No Compression (we didn't ask for it) + + final var responseLength = (int) rsd.readUnsignedInt(); + assertThat(responseLength).isPositive(); + + final var responseData = new byte[responseLength]; + rsd.readBytes(responseData); + final var txr = TransactionResponse.PROTOBUF.parse(Bytes.wrap(responseData)); + + assertThat(txr).isEqualTo(TransactionResponse.DEFAULT); + } + } + + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Server Streaming Method Calls + // + // TESTS: + // - TBD + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Client Streaming Method Calls + // + // TESTS: + // - TBD + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Bidi Streaming Calls + // + // TESTS: + // - TBD + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Utility methods + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + private byte[] messageBytes(Transaction tx) { + final var data = Transaction.PROTOBUF.toBytes(tx).toByteArray(); + final var out = new ByteArrayOutputStream(); + final WritableStreamingData wsd = new WritableStreamingData(out); + wsd.writeByte((byte) 0); + wsd.writeUnsignedInt(data.length); + wsd.writeBytes(data); + return out.toByteArray(); + } + + private byte[] messageBytesJson(Transaction tx) { + final var data = Transaction.JSON.toBytes(tx).toByteArray(); + final var out = new ByteArrayOutputStream(); + final WritableStreamingData wsd = new WritableStreamingData(out); + wsd.writeByte((byte) 0); + wsd.writeUnsignedInt(data.length); + wsd.writeBytes(data); + return out.toByteArray(); + } + + private static final class ConsensusServiceImpl implements ConsensusService { + @Override + public TransactionResponse createTopic(Transaction tx) { + // TODO Test when one of these returns null!! + System.out.println("Creating topic"); + return TransactionResponse.DEFAULT; + } + + @Override + public TransactionResponse updateTopic(Transaction tx) { + System.out.println("Updating topic"); + return TransactionResponse.DEFAULT; + } + + @Override + public TransactionResponse deleteTopic(Transaction tx) { + System.out.println("Deleting topic"); + return TransactionResponse.DEFAULT; + } + + @Override + public TransactionResponse submitMessage(Transaction tx) { + System.out.println("Submitting message"); + return TransactionResponse.DEFAULT; + } + + @Override + public Response getTopicInfo(Query q) { + System.out.println("Getting topic info"); + return Response.DEFAULT; + } + } +} diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/ServiceInterface.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/ServiceInterface.java new file mode 100644 index 00000000..5e4940ee --- /dev/null +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/ServiceInterface.java @@ -0,0 +1,117 @@ +package com.hedera.pbj.runtime; + +import com.hedera.pbj.runtime.io.buffer.Bytes; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +/** + * Defines a common interface for all implementations of a gRPC {@code service}. PBJ will generate a subinterface + * for each {@code service} in the protobuf schema definition files, with default implementations of each of the + * given methods in this interface. + * + *

For example, suppose I have the following protobuf file: + *

+ * {@code
+ * package example;
+ *
+ * service HelloService {
+ *   rpc SayHello (HelloRequest) returns (HelloResponse);
+ * }
+ *
+ * message HelloRequest {
+ *   string greeting = 1;
+ * }
+ *
+ * message HelloResponse {
+ *   string reply = 1;
+ * }
+ * }
+ * 
+ * + *

From this file, PBJ will generate a {@code HelloService} interface that extends {@code ServiceInterface}: + *

+ * {@code
+ * public interface HelloService extends ServiceInterface {
+ *    // ...
+ *
+ *    @NonNull
+ *    HelloResponse sayHello(final @NonNull HelloRequest request);
+ *
+ *    default String serviceName() { return "HelloService"; }
+ *    default String fullName() { return "example.HelloService"; }
+ *
+ *    // ...
+ * }
+ * }
+ * 
+ * + * In the application code, you will simply create a new class implementing the {@code HelloService} interface, and + * register it with your webserver in whatever way is appropriate for your webserver. + */ +public interface ServiceInterface { + interface Method { + String name(); + } + + interface RequestOptions { + String APPLICATION_GRPC_PROTO = "proto"; + String APPLICATION_GRPC_JSON = "json"; + + boolean isProtobuf(); + boolean isJson(); + String contentType(); + } + + /** + * Through this interface the {@link ServiceInterface} implementation will send responses back to the client. + * The {@link #start()} method is called before any responses are sent, and the {@link #close()} method + * is called after all responses have been sent. + * + *

It is not common for an application to implement or use this interface. It is typically implemented by + * a webserver to integrate PBJ into that server. + */ + interface ResponseCallback { + /** + * Called by the {@link ServiceInterface} implementation to before any responses have been sent to the client. + * This must be called before {@link #send(Bytes)} is called. + */ + void start(); + + /** + * Called to send a single response message to the client. For unary methods, this will be called once. For + * server-side streaming or bidi-streaming, this may be called many times. + * + * @param response A response message to send to the client. + */ + void send(/*@NonNull*/ Bytes response); + + /** + * Called to close the connection with the client, signaling that no more responses will be sent. + */ + void close(); + } + + /** Gets the simple name of the service. For example, "HelloService". */ + /*@NonNull*/ String serviceName(); + /** Gets the full name of the service. For example, "example.HelloService". */ + /*@NonNull*/ String fullName(); + /** Gets a list of each method in the service. This list may be empty but should never be null. */ + /*@NonNull*/ List methods(); + + /** + * Called by the webserver to open a new connection between the client and the service. This method may be called + * many times concurrently, once per connection. The implementation must therefore be thread-safe. A default + * implementation is provided by the generated PBJ code, which will handle the dispatching of messages to the + * appropriate methods in the correct way (unary, server-side streaming, etc.). + * + * @param opts Any options from the request, such as the content type. + * @param method The method that was called by the client. + * @param messages A blocking queue of messages sent by the client. + * @param callback A callback to send responses back to the client. + */ + void open( + /*@NonNull*/ RequestOptions opts, + /*@NonNull*/ Method method, + /*@NonNull*/ BlockingQueue messages, + /*@NonNull*/ ResponseCallback callback); +} diff --git a/pbj-core/settings.gradle.kts b/pbj-core/settings.gradle.kts index 12cc5a28..5659ba9c 100644 --- a/pbj-core/settings.gradle.kts +++ b/pbj-core/settings.gradle.kts @@ -23,6 +23,7 @@ plugins { include(":pbj-runtime") include(":pbj-compiler") +include(":pbj-grpc-helidon") gradleEnterprise { buildScan { @@ -39,6 +40,13 @@ dependencyResolutionManagement { version("org.antlr.antlr4.runtime", "4.11.1") version("com.github.spotbugs.annotations", "4.8.6") + // The libs of this catalog are used by the PBJ library + version("io.helidon.webserver", "4.0.8") + version("io.helidon.webserver.http2", "4.0.8") + version("io.helidon.webclient", "4.0.8") // for testing + version("io.helidon.webclient.http2", "4.0.8") // for testing + version("com.hedera.node.hapi", "0.48.0") // for testing + // Testing only versions version("com.google.protobuf", "3.21.9") version("org.assertj.core", "3.23.1")