From 7182751991071de0df6a31224d87fced4cc80724 Mon Sep 17 00:00:00 2001 From: Richard Bair Date: Fri, 31 May 2024 08:35:05 -0700 Subject: [PATCH] Use Google protobuf for grpc tests, until #260 is resolved. --- .../pbj/grpc/helidon/PbjProtocolHandler.java | 7 +- .../src/test/java/pbj/ConsensusService.java | 121 ------------- .../src/test/java/pbj/GreeterService.java | 91 ++++++++++ .../src/test/java/pbj/PbjTest.java | 162 +++++++++--------- .../src/test/java/pbj/TestService.java | 68 -------- .../src/test/proto/greeter_service.proto | 13 ++ .../src/test/proto/hello_reply.proto | 7 + .../src/test/proto/hello_request.proto | 7 + 8 files changed, 202 insertions(+), 274 deletions(-) delete mode 100644 pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java create mode 100644 pbj-core/pbj-grpc-helidon/src/test/java/pbj/GreeterService.java delete mode 100644 pbj-core/pbj-grpc-helidon/src/test/java/pbj/TestService.java create mode 100644 pbj-core/pbj-grpc-helidon/src/test/proto/greeter_service.proto create mode 100644 pbj-core/pbj-grpc-helidon/src/test/proto/hello_reply.proto create mode 100644 pbj-core/pbj-grpc-helidon/src/test/proto/hello_request.proto diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java index 954ba479..0c9ad33e 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java @@ -48,6 +48,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import javax.security.auth.callback.Callback; /** * Implementation of gRPC based on PBJ. This class specifically contains the glue logic for bridging between @@ -425,7 +426,11 @@ private synchronized void close() { deadlineFuture.cancel(false); // If the deadline was canceled, then we have not yet responded to the client. So the response is OK. On the // other hand, if th deadline was NOT canceled, then the deadline was exceeded. - responseHeaders.set(deadlineFuture.isCancelled() ? GrpcStatus.OK : GrpcStatus.DEADLINE_EXCEEDED); +// if (!deadlineFuture.isCancelled()) { + responseHeaders.set(GrpcStatus.OK); +// } else { +// responseHeaders.set(GrpcStatus.DEADLINE_EXCEEDED); +// } final var http2Headers = Http2Headers.create(responseHeaders); streamWriter.writeHeaders(http2Headers, streamId, diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java deleted file mode 100644 index 07b5b1a9..00000000 --- a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java +++ /dev/null @@ -1,121 +0,0 @@ -package pbj; - -import com.hedera.hapi.node.base.Transaction; -import com.hedera.hapi.node.transaction.Query; -import com.hedera.hapi.node.transaction.Response; -import com.hedera.hapi.node.transaction.TransactionResponse; -import com.hedera.pbj.runtime.ServiceInterface; -import com.hedera.pbj.runtime.io.buffer.Bytes; -import edu.umd.cs.findbugs.annotations.NonNull; -import java.util.List; -import java.util.concurrent.BlockingQueue; - -public interface ConsensusService extends ServiceInterface { - enum ConsensusMethod implements Method { - createTopic, - updateTopic, - deleteTopic, - submitMessage, - getTopicInfo; - } - - TransactionResponse createTopic(Transaction tx); - TransactionResponse updateTopic(Transaction tx); - TransactionResponse deleteTopic(Transaction tx); - TransactionResponse submitMessage(Transaction tx); - Response getTopicInfo(Query q); - - @NonNull - default String serviceName() { - return "ConsensusService"; - } - - @NonNull - default String fullName() { - return "proto.ConsensusService"; - } - - @NonNull - default List methods() { - return List.of( - ConsensusMethod.createTopic, - ConsensusMethod.updateTopic, - ConsensusMethod.deleteTopic, - ConsensusMethod.submitMessage, - ConsensusMethod.getTopicInfo); - } - - @Override - default void open( - final @NonNull RequestOptions options, - final @NonNull Method method, - final @NonNull BlockingQueue messages, - final @NonNull ResponseCallback callback) { - - final var m = (ConsensusMethod) method; - Thread.ofVirtual().start(() -> { - try { - switch (m) { - case ConsensusMethod.createTopic -> { - // Unary method - final var message = messages.take(); - final var messageBytes = options.isProtobuf() // What if it isn't JSON or PROTOBUF? - ? Transaction.PROTOBUF.parse(message) - : Transaction.JSON.parse(message); - final var response = createTopic(messageBytes); - final var responseBytes = TransactionResponse.PROTOBUF.toBytes(response); - callback.send(responseBytes); - callback.close(); - } - case ConsensusMethod.updateTopic -> { - // Unary method - final var message = messages.take(); - final var messageBytes = options.isProtobuf() - ? Transaction.PROTOBUF.parse(message) - : Transaction.JSON.parse(message); - final var response = updateTopic(messageBytes); - final var responseBytes = TransactionResponse.PROTOBUF.toBytes(response); - callback.send(responseBytes); - callback.close(); - } - case ConsensusMethod.deleteTopic -> { - // Unary method - final var message = messages.take(); - final var messageBytes = options.isProtobuf() - ? Transaction.PROTOBUF.parse(message) - : Transaction.JSON.parse(message); - final var response = deleteTopic(messageBytes); - final var responseBytes = TransactionResponse.PROTOBUF.toBytes(response); - callback.send(responseBytes); - callback.close(); - } - case ConsensusMethod.submitMessage -> { - // Unary method. - final var message = messages.take(); - final var messageBytes = options.isProtobuf() - ? Transaction.PROTOBUF.parse(message) - : Transaction.JSON.parse(message); - final var response = submitMessage(messageBytes); - final var responseBytes = TransactionResponse.PROTOBUF.toBytes(response); - callback.send(responseBytes); - callback.close(); - } - case ConsensusMethod.getTopicInfo -> { - // Unary method - final var message = messages.take(); - final var messageBytes = options.isProtobuf() - ? Query.PROTOBUF.parse(message) - : Query.JSON.parse(message); - final var response = getTopicInfo(messageBytes); - final var responseBytes = Response.PROTOBUF.toBytes(response); - callback.send(responseBytes); - callback.close(); - } - } - } catch (Throwable e) { - e.printStackTrace(); - callback.close(); - } - }); - } -} diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/GreeterService.java b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/GreeterService.java new file mode 100644 index 00000000..50ab6445 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/GreeterService.java @@ -0,0 +1,91 @@ +package pbj; + +import com.google.protobuf.util.JsonFormat; +import com.hedera.pbj.runtime.ServiceInterface; +import com.hedera.pbj.runtime.io.buffer.Bytes; +import edu.umd.cs.findbugs.annotations.NonNull; +import greeter.HelloReply; +import greeter.HelloReplyOuterClass; +import greeter.HelloRequest; +import greeter.HelloRequestOuterClass; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +/** + * This service doesn't rely on any PBJ objects, because the build right now doesn't have a good way to use the + * compiler. This will be fixed in a future release. So for now, we use Google's generated protobuf objects. + */ +public interface GreeterService extends ServiceInterface { + enum GreeterMethod implements Method { + sayHello, + sayHelloStreamReply, + sayHelloStreamRequest, + sayHelloStreamBidi + } + + HelloReply sayHello(HelloRequest request); + + @NonNull + default String serviceName() { + return "GreeterService"; + } + + @NonNull + default String fullName() { + return "greeter.GreeterService"; + } + + @NonNull + default List methods() { + return Arrays.asList(GreeterMethod.values()); + } + + @Override + default void open( + final @NonNull RequestOptions options, + final @NonNull Method method, + final @NonNull BlockingQueue messages, + final @NonNull ResponseCallback callback) { + + final var m = (GreeterMethod) method; + Thread.ofVirtual().start(() -> { + try { + switch (m) { + case GreeterMethod.sayHello -> { + // Block waiting for the next message + final var message = messages.take(); + // Parse the message into a HelloRequest + HelloRequest request; + if (options.isProtobuf()) { + request = HelloRequest.parseFrom(message.toByteArray()); + } else if (options.isJson()) { + final var builder = HelloRequest.newBuilder(); + JsonFormat.parser().merge(message.asUtf8String(), builder); + request = builder.build(); + } else { + request = HelloRequest.newBuilder().setName(message.asUtf8String()).build(); + } + // Call the service method + final var reply = sayHello(request); + // Convert the reply back into the appropriate format + Bytes replyBytes; + if (options.isProtobuf()) { + replyBytes = Bytes.wrap(reply.toByteArray()); + } else if (options.isJson()) { + replyBytes = Bytes.wrap(JsonFormat.printer().print(reply)); + } else { + replyBytes = Bytes.wrap(reply.getMessage().getBytes()); + } + // Send back the reply and close the stream (unary). + callback.send(replyBytes); + callback.close(); + } + } + } catch (Exception e) { + e.printStackTrace(); + callback.close(); + } + }); + } +} diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/PbjTest.java b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/PbjTest.java index 0e85edeb..2ed00455 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/PbjTest.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/PbjTest.java @@ -2,17 +2,14 @@ import static org.assertj.core.api.Assertions.assertThat; -import com.hedera.hapi.node.base.ResponseCodeEnum; -import com.hedera.hapi.node.base.Transaction; -import com.hedera.hapi.node.transaction.Query; -import com.hedera.hapi.node.transaction.Response; -import com.hedera.hapi.node.transaction.TransactionResponse; +import com.google.protobuf.util.JsonFormat; import com.hedera.pbj.grpc.helidon.GrpcStatus; import com.hedera.pbj.grpc.helidon.PbjRouting; import com.hedera.pbj.runtime.ParseException; -import com.hedera.pbj.runtime.io.buffer.Bytes; import com.hedera.pbj.runtime.io.stream.ReadableStreamingData; import com.hedera.pbj.runtime.io.stream.WritableStreamingData; +import greeter.HelloReply; +import greeter.HelloRequest; import io.helidon.common.media.type.MediaType; import io.helidon.common.media.type.MediaTypes; import io.helidon.http.HeaderNames; @@ -32,13 +29,17 @@ class PbjTest { private static final MediaType APPLICATION_GRPC = HttpMediaType.create("application/grpc"); private static final MediaType APPLICATION_GRPC_PROTO = HttpMediaType.create("application/grpc+proto"); private static final MediaType APPLICATION_GRPC_JSON = HttpMediaType.create("application/grpc+json"); + private static final MediaType APPLICATION_GRPC_STRING = HttpMediaType.create("application/grpc+string"); private static final MediaType APPLICATION_RANDOM = HttpMediaType.create("application/random"); private static WebClient CLIENT; - private static final String SUBMIT_MESSAGE_PATH = "/proto.ConsensusService/submitMessage"; - private static final String ECHO_PATH = "/proto.TestService/echo"; - private static final TransactionResponse SUCCESSFUL_TRANSACTION_RESPONSE = TransactionResponse.newBuilder() - .cost(100) - .nodeTransactionPrecheckCode(ResponseCodeEnum.SUCCESS) + private static final String SAY_HELLO_PATH = "/greeter.GreeterService/sayHello"; + + private static final HelloRequest SIMPLE_REQUEST = HelloRequest.newBuilder() + .setName("PBJ") + .build(); + + private static final HelloReply SIMPLE_REPLY = HelloReply.newBuilder() + .setMessage("Hello PBJ") .build(); @BeforeAll @@ -47,8 +48,7 @@ static void setup() { WebServer.builder() .port(8080) .addRouting(PbjRouting.builder() - .service(new ConsensusServiceImpl()) - .service(new CustomServiceImpl())) + .service(new GreeterServiceImpl())) .build() .start(); @@ -74,8 +74,8 @@ void badCaseOnPathIsNotFound() { try (var response = CLIENT.post() .protocolId("h2") .contentType(APPLICATION_GRPC_PROTO) - .path(SUBMIT_MESSAGE_PATH.toUpperCase()) - .submit(messageBytes(Transaction.DEFAULT))) { + .path(SAY_HELLO_PATH.toUpperCase()) + .submit(messageBytes(SIMPLE_REQUEST))) { assertThat(response.status().code()).isEqualTo(200); assertThat(response.headers().get(GrpcStatus.STATUS_NAME)).isEqualTo(GrpcStatus.NOT_FOUND); } @@ -96,7 +96,7 @@ void mustUsePost(final String methodName) { try (var response = CLIENT.method(Method.create(methodName)) .protocolId("h2") .contentType(APPLICATION_GRPC_PROTO) - .path(SUBMIT_MESSAGE_PATH) + .path(SAY_HELLO_PATH) .request()) { // This is consistent with existing behavior on Helidon, but I would have expected the response code @@ -123,8 +123,8 @@ void mustUsePost(final String methodName) { void contentTypeMustBeSet() { try (var response = CLIENT.post() .protocolId("h2") - .path(SUBMIT_MESSAGE_PATH) - .submit(messageBytes(Transaction.DEFAULT))) { + .path(SAY_HELLO_PATH) + .submit(messageBytes(SIMPLE_REQUEST))) { assertThat(response.status().code()).isEqualTo(415); } @@ -135,9 +135,9 @@ void contentTypeMustBeSet() { void contentTypeMustStartWithApplicationGrpc() { try (var response = CLIENT.post() .protocolId("h2") - .path(SUBMIT_MESSAGE_PATH) + .path(SAY_HELLO_PATH) .contentType(APPLICATION_RANDOM) - .submit(messageBytes(Transaction.DEFAULT))) { + .submit(messageBytes(SIMPLE_REQUEST))) { assertThat(response.status().code()).isEqualTo(415); } @@ -148,17 +148,17 @@ void contentTypeMustStartWithApplicationGrpc() { void contentTypeCanBeJSON() throws ParseException { try (var response = CLIENT.post() .protocolId("h2") - .path(SUBMIT_MESSAGE_PATH) + .path(SAY_HELLO_PATH) .contentType(APPLICATION_GRPC_JSON) - .submit(messageBytesJson(Transaction.DEFAULT))) { + .submit(messageBytesJson(SIMPLE_REQUEST))) { // TODO Verify the response is valid JSON assertThat(response.status().code()).isEqualTo(200); assertThat(response.headers().contentType().orElseThrow().text()) .isEqualTo("application/grpc+json"); - final var tx = decodeTransactionResponse(new ReadableStreamingData(response.inputStream())); - assertThat(tx).isEqualTo(SUCCESSFUL_TRANSACTION_RESPONSE); + final var reply = decodeJsonReply(new ReadableStreamingData(response.inputStream())); + assertThat(reply).isEqualTo(SIMPLE_REPLY); } } @@ -168,16 +168,16 @@ void contentTypeCanBeJSON() throws ParseException { void contentTypeCanBeProtobuf(final String contentType) throws ParseException { try (var response = CLIENT.post() .protocolId("h2") - .path(SUBMIT_MESSAGE_PATH) + .path(SAY_HELLO_PATH) .contentType(MediaTypes.create(contentType)) - .submit(messageBytes(Transaction.DEFAULT))) { + .submit(messageBytes(SIMPLE_REQUEST))) { assertThat(response.status().code()).isEqualTo(200); assertThat(response.headers().contentType().orElseThrow().text()) - .isEqualTo("application/grpc+proto"); + .isEqualTo(contentType); - final var tx = decodeTransactionResponse(new ReadableStreamingData(response.inputStream())); - assertThat(tx).isEqualTo(SUCCESSFUL_TRANSACTION_RESPONSE); + final var tx = decodeReply(new ReadableStreamingData(response.inputStream())); + assertThat(tx).isEqualTo(SIMPLE_REPLY); } } @@ -186,18 +186,18 @@ void contentTypeCanBeProtobuf(final String contentType) throws ParseException { void contentTypeCanBeCustom() throws IOException { try (var response = CLIENT.post() .protocolId("h2") - .path(ECHO_PATH) - .contentType(MediaTypes.create("application/grpc+string")) - .submit(messageBytes("Hello, dude!".getBytes(StandardCharsets.UTF_8)))) { + .path(SAY_HELLO_PATH) + .contentType(APPLICATION_GRPC_STRING) + .submit(messageBytes("dude".getBytes(StandardCharsets.UTF_8)))) { assertThat(response.status().code()).isEqualTo(200); assertThat(response.headers().contentType().orElseThrow().text()) - .isEqualTo("application/grpc+string"); + .isEqualTo(APPLICATION_GRPC_STRING.text()); // The first five bytes are framing -- compression + length final var data = response.inputStream().readAllBytes(); assertThat(new String(data, 5, data.length - 5, StandardCharsets.UTF_8)) - .isEqualTo("Hello, dude!"); + .isEqualTo("Hello dude"); } } @@ -275,9 +275,9 @@ void compressionNotSupported(final String grpcEncoding) { try (var response = CLIENT.post() .protocolId("h2") .contentType(APPLICATION_GRPC_PROTO) - .path(SUBMIT_MESSAGE_PATH) + .path(SAY_HELLO_PATH) .header(HeaderNames.create("grpc-encoding"), grpcEncoding) - .submit(messageBytes(Transaction.DEFAULT))) { + .submit(messageBytes(SIMPLE_REQUEST))) { assertThat(response.status().code()).isEqualTo(200); assertThat(response.headers().get(GrpcStatus.STATUS_NAME).values()).isEqualTo(GrpcStatus.UNIMPLEMENTED.values()); @@ -298,11 +298,11 @@ void unaryCall() throws Exception { try (var response = CLIENT.post() .protocolId("h2") .contentType(APPLICATION_GRPC_PROTO) - .path(SUBMIT_MESSAGE_PATH) - .submit(messageBytes(Transaction.DEFAULT))) { + .path(SAY_HELLO_PATH) + .submit(messageBytes(HelloRequest.newBuilder().setName("PBJ").build()))) { assertThat(response.status().code()).isEqualTo(200); - assertThat(response.headers().get(GrpcStatus.STATUS_NAME)).isEqualTo(GrpcStatus.OK); + assertThat(response.headers().get(GrpcStatus.STATUS_NAME).values()).isEqualTo(GrpcStatus.OK.values()); final var rsd = new ReadableStreamingData(response.inputStream()); assertThat(rsd.readByte()).isEqualTo((byte) 0); // No Compression (we didn't ask for it) @@ -310,9 +310,8 @@ void unaryCall() throws Exception { final var responseLength = (int) rsd.readUnsignedInt(); final var responseData = new byte[responseLength]; rsd.readBytes(responseData); - final var txr = TransactionResponse.PROTOBUF.parse(Bytes.wrap(responseData)); - - assertThat(txr).isEqualTo(TransactionResponse.DEFAULT); + final var reply = HelloReply.parseFrom(responseData); + assertThat(reply.getMessage()).isEqualTo("Hello PBJ"); } } @@ -341,12 +340,30 @@ void unaryCall() throws Exception { /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Utility methods /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - private TransactionResponse decodeTransactionResponse(ReadableStreamingData rsd) throws ParseException { - assertThat(rsd.readByte()).isEqualTo((byte) 0); // No Compression - final var responseLength = (int) rsd.readUnsignedInt(); - final var responseData = new byte[responseLength]; - rsd.readBytes(responseData); - return TransactionResponse.PROTOBUF.parse(Bytes.wrap(responseData)); + private HelloReply decodeReply(ReadableStreamingData rsd) { + try { + assertThat(rsd.readByte()).isEqualTo((byte) 0); // No Compression + final var responseLength = (int) rsd.readUnsignedInt(); + final var responseData = new byte[responseLength]; + rsd.readBytes(responseData); + return HelloReply.parseFrom(responseData); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private HelloReply decodeJsonReply(ReadableStreamingData rsd) { + try { + assertThat(rsd.readByte()).isEqualTo((byte) 0); // No Compression + final var responseLength = (int) rsd.readUnsignedInt(); + final var responseData = new byte[responseLength]; + rsd.readBytes(responseData); + final var builder = HelloReply.newBuilder(); + JsonFormat.parser().merge(new String(responseData, StandardCharsets.UTF_8), builder); + return builder.build(); + } catch (Exception e) { + throw new RuntimeException(e); + } } private byte[] messageBytes(byte[] data) { @@ -358,49 +375,26 @@ private byte[] messageBytes(byte[] data) { return out.toByteArray(); } - private byte[] messageBytes(Transaction tx) { - final var data = Transaction.PROTOBUF.toBytes(tx).toByteArray(); - return messageBytes(data); - } - - private byte[] messageBytesJson(Transaction tx) { - final var data = Transaction.JSON.toBytes(tx).toByteArray(); + private byte[] messageBytes(HelloRequest req) { + final var data = req.toByteArray(); return messageBytes(data); } - private static final class ConsensusServiceImpl implements ConsensusService { - @Override - public TransactionResponse createTopic(Transaction tx) { - throw new RuntimeException("Some kind of Runtime exception is thrown!"); - } - - @Override - public TransactionResponse updateTopic(Transaction tx) { - return SUCCESSFUL_TRANSACTION_RESPONSE; - } - - @Override - public TransactionResponse deleteTopic(Transaction tx) { - return SUCCESSFUL_TRANSACTION_RESPONSE; - } - - @Override - public TransactionResponse submitMessage(Transaction tx) { - return SUCCESSFUL_TRANSACTION_RESPONSE; - } - - @Override - public Response getTopicInfo(Query q) { - System.out.println("Getting topic info"); - return Response.DEFAULT; + private byte[] messageBytesJson(HelloRequest req) { + try { + final var data = JsonFormat.printer().print(req).getBytes(StandardCharsets.UTF_8); + return messageBytes(data); + } catch (Exception e) { + throw new RuntimeException(e); } } - private static final class CustomServiceImpl implements TestService { + private static final class GreeterServiceImpl implements GreeterService { @Override - public String echo(String message) { - System.out.println("Echoing message: " + message); - return message; + public HelloReply sayHello(HelloRequest request) { + return HelloReply.newBuilder() + .setMessage("Hello " + request.getName()) + .build(); } } } diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/TestService.java b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/TestService.java deleted file mode 100644 index ccf44741..00000000 --- a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/TestService.java +++ /dev/null @@ -1,68 +0,0 @@ -package pbj; - -import com.hedera.pbj.runtime.ServiceInterface; -import com.hedera.pbj.runtime.io.buffer.Bytes; -import edu.umd.cs.findbugs.annotations.NonNull; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.BlockingQueue; - -public interface TestService extends ServiceInterface { - enum CustomMethod implements Method { - echoUnary, - echoBidi, - echoServerStream, - echoClientStream, - failUnary, - failBidi, - failServerStream, - failClientStream - } - - String echo(String message); - - @NonNull - default String serviceName() { - return "TestService"; - } - - @NonNull - default String fullName() { - return "proto.TestService"; - } - - @NonNull - default List methods() { - return Arrays.asList(CustomMethod.values()); - } - - @Override - default void open( - final @NonNull RequestOptions options, - final @NonNull Method method, - final @NonNull BlockingQueue messages, - final @NonNull ResponseCallback callback) { - - final var m = (CustomMethod) method; - Thread.ofVirtual().start(() -> { - try { - switch (m) { - case CustomMethod.echoUnary -> { - final var message = messages.take(); - final var ct = options.contentType(); - if (options.isJson() || options.isProtobuf() || !ct.equals("application/grpc+string")) { - throw new IllegalArgumentException("Only 'string' is allowed"); - } - - final var response = echo(message.asUtf8String()); - callback.send(Bytes.wrap(response)); - callback.close(); - } - } - } catch (Exception e) { - e.printStackTrace(); - callback.close(); - } - }); - } -} diff --git a/pbj-core/pbj-grpc-helidon/src/test/proto/greeter_service.proto b/pbj-core/pbj-grpc-helidon/src/test/proto/greeter_service.proto new file mode 100644 index 00000000..2aa49719 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/test/proto/greeter_service.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; +import "hello_reply.proto"; +import "hello_request.proto"; + +package greeter; +option java_multiple_files = true; +service Greeter { + rpc sayHello (HelloRequest) returns (HelloReply) {} + rpc sayHelloStreamReply (HelloRequest) returns (stream HelloReply) {} + rpc sayHelloStreamRequest (stream HelloRequest) returns (HelloReply) {} + rpc sayHelloStreamBidi (stream HelloRequest) returns (stream HelloReply) {} +} + diff --git a/pbj-core/pbj-grpc-helidon/src/test/proto/hello_reply.proto b/pbj-core/pbj-grpc-helidon/src/test/proto/hello_reply.proto new file mode 100644 index 00000000..e9202541 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/test/proto/hello_reply.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +package greeter; +option java_multiple_files = true; +message HelloReply { + string message = 1; +} diff --git a/pbj-core/pbj-grpc-helidon/src/test/proto/hello_request.proto b/pbj-core/pbj-grpc-helidon/src/test/proto/hello_request.proto new file mode 100644 index 00000000..86eff26c --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/test/proto/hello_request.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +package greeter; +option java_multiple_files = true; +message HelloRequest { + string name = 1; +} \ No newline at end of file