From f470f8cc5a0c669ea864a14127b400fb971e2e99 Mon Sep 17 00:00:00 2001 From: Richard Bair Date: Thu, 30 May 2024 13:45:37 -0700 Subject: [PATCH] WIP: writing tests Signed-off-by: Richard Bair --- .../pbj/grpc/helidon/encoding/Encoding.java | 8 --- .../grpc/helidon/encoding/GzipEncoding.java | 11 --- .../helidon/encoding/IdentityEncoding.java | 8 --- .../src/test/java/http/HttpTest.java | 23 ------- .../src/test/java/pbj/ConsensusService.java | 21 +++--- .../src/test/java/pbj/TestService.java | 68 +++++++++++++++++++ .../hedera/pbj/runtime/ServiceInterface.java | 31 ++++----- 7 files changed, 91 insertions(+), 79 deletions(-) delete mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/Encoding.java delete mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/GzipEncoding.java delete mode 100644 pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/IdentityEncoding.java delete mode 100644 pbj-core/pbj-grpc-helidon/src/test/java/http/HttpTest.java create mode 100644 pbj-core/pbj-grpc-helidon/src/test/java/pbj/TestService.java diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/Encoding.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/Encoding.java deleted file mode 100644 index 8c96289d..00000000 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/Encoding.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.hedera.pbj.grpc.helidon.encoding; - -public interface Encoding { - GzipEncoding GZIP = new GzipEncoding(); - IdentityEncoding IDENTITY = new IdentityEncoding(); - - byte[] decode(byte[] data) throws Exception; -} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/GzipEncoding.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/GzipEncoding.java deleted file mode 100644 index af0e004e..00000000 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/GzipEncoding.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.hedera.pbj.grpc.helidon.encoding; - -import java.io.ByteArrayInputStream; -import java.util.zip.GZIPInputStream; - -public class GzipEncoding implements Encoding { - @Override - public byte[] decode(byte[] data) throws Exception { - return new GZIPInputStream(new ByteArrayInputStream(data)).readAllBytes(); - } -} diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/IdentityEncoding.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/IdentityEncoding.java deleted file mode 100644 index 59e17663..00000000 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/encoding/IdentityEncoding.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.hedera.pbj.grpc.helidon.encoding; - -public class IdentityEncoding implements Encoding { - @Override - public byte[] decode(byte[] data) { - return data; - } -} diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/http/HttpTest.java b/pbj-core/pbj-grpc-helidon/src/test/java/http/HttpTest.java deleted file mode 100644 index 81db59cf..00000000 --- a/pbj-core/pbj-grpc-helidon/src/test/java/http/HttpTest.java +++ /dev/null @@ -1,23 +0,0 @@ -package http; - -import io.helidon.webclient.api.WebClient; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import org.junit.jupiter.api.Test; - -public class HttpTest { - @Test - void simpleHttpCall() throws InterruptedException { - final var pool = Executors.newFixedThreadPool(100); - final var latch = new CountDownLatch(1000); - for (int i=0; i<1000; i++) { - pool.submit(() -> { - final var client = WebClient.builder().baseUri("http://localhost:8080").build(); - System.out.println(client.get().path("/greet").request().as(String.class)); - latch.countDown(); - }); - } - - latch.await(); - } -} diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java index 0f3d342b..07b5b1a9 100644 --- a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java +++ b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/ConsensusService.java @@ -6,6 +6,7 @@ import com.hedera.hapi.node.transaction.TransactionResponse; import com.hedera.pbj.runtime.ServiceInterface; import com.hedera.pbj.runtime.io.buffer.Bytes; +import edu.umd.cs.findbugs.annotations.NonNull; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -24,14 +25,17 @@ enum ConsensusMethod implements Method { TransactionResponse submitMessage(Transaction tx); Response getTopicInfo(Query q); + @NonNull default String serviceName() { return "ConsensusService"; } + @NonNull default String fullName() { return "proto.ConsensusService"; } + @NonNull default List methods() { return List.of( ConsensusMethod.createTopic, @@ -43,10 +47,10 @@ default List methods() { @Override default void open( - final /*@NonNull*/ RequestOptions options, - final /*@NonNull*/ Method method, - final /*@NonNull*/ BlockingQueue messages, - final /*@NonNull*/ ResponseCallback callback) { + final @NonNull RequestOptions options, + final @NonNull Method method, + final @NonNull BlockingQueue messages, + final @NonNull ResponseCallback callback) { final var m = (ConsensusMethod) method; Thread.ofVirtual().start(() -> { @@ -55,7 +59,6 @@ default void open( case ConsensusMethod.createTopic -> { // Unary method final var message = messages.take(); - callback.start(); final var messageBytes = options.isProtobuf() // What if it isn't JSON or PROTOBUF? ? Transaction.PROTOBUF.parse(message) : Transaction.JSON.parse(message); @@ -67,7 +70,6 @@ default void open( case ConsensusMethod.updateTopic -> { // Unary method final var message = messages.take(); - callback.start(); final var messageBytes = options.isProtobuf() ? Transaction.PROTOBUF.parse(message) : Transaction.JSON.parse(message); @@ -79,7 +81,6 @@ default void open( case ConsensusMethod.deleteTopic -> { // Unary method final var message = messages.take(); - callback.start(); final var messageBytes = options.isProtobuf() ? Transaction.PROTOBUF.parse(message) : Transaction.JSON.parse(message); @@ -89,9 +90,8 @@ default void open( callback.close(); } case ConsensusMethod.submitMessage -> { - // Unary method + // Unary method. final var message = messages.take(); - callback.start(); final var messageBytes = options.isProtobuf() ? Transaction.PROTOBUF.parse(message) : Transaction.JSON.parse(message); @@ -103,7 +103,6 @@ default void open( case ConsensusMethod.getTopicInfo -> { // Unary method final var message = messages.take(); - callback.start(); final var messageBytes = options.isProtobuf() ? Query.PROTOBUF.parse(message) : Query.JSON.parse(message); @@ -113,7 +112,7 @@ default void open( callback.close(); } } - } catch (Exception e) { + } catch (Throwable e) { e.printStackTrace(); callback.close(); } diff --git a/pbj-core/pbj-grpc-helidon/src/test/java/pbj/TestService.java b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/TestService.java new file mode 100644 index 00000000..ccf44741 --- /dev/null +++ b/pbj-core/pbj-grpc-helidon/src/test/java/pbj/TestService.java @@ -0,0 +1,68 @@ +package pbj; + +import com.hedera.pbj.runtime.ServiceInterface; +import com.hedera.pbj.runtime.io.buffer.Bytes; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +public interface TestService extends ServiceInterface { + enum CustomMethod implements Method { + echoUnary, + echoBidi, + echoServerStream, + echoClientStream, + failUnary, + failBidi, + failServerStream, + failClientStream + } + + String echo(String message); + + @NonNull + default String serviceName() { + return "TestService"; + } + + @NonNull + default String fullName() { + return "proto.TestService"; + } + + @NonNull + default List methods() { + return Arrays.asList(CustomMethod.values()); + } + + @Override + default void open( + final @NonNull RequestOptions options, + final @NonNull Method method, + final @NonNull BlockingQueue messages, + final @NonNull ResponseCallback callback) { + + final var m = (CustomMethod) method; + Thread.ofVirtual().start(() -> { + try { + switch (m) { + case CustomMethod.echoUnary -> { + final var message = messages.take(); + final var ct = options.contentType(); + if (options.isJson() || options.isProtobuf() || !ct.equals("application/grpc+string")) { + throw new IllegalArgumentException("Only 'string' is allowed"); + } + + final var response = echo(message.asUtf8String()); + callback.send(Bytes.wrap(response)); + callback.close(); + } + } + } catch (Exception e) { + e.printStackTrace(); + callback.close(); + } + }); + } +} diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/ServiceInterface.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/ServiceInterface.java index 5e4940ee..cf6b07f6 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/ServiceInterface.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/ServiceInterface.java @@ -1,6 +1,7 @@ package com.hedera.pbj.runtime; import com.hedera.pbj.runtime.io.buffer.Bytes; +import edu.umd.cs.findbugs.annotations.NonNull; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -54,8 +55,9 @@ interface Method { } interface RequestOptions { - String APPLICATION_GRPC_PROTO = "proto"; - String APPLICATION_GRPC_JSON = "json"; + String APPLICATION_GRPC = "application/grpc"; + String APPLICATION_GRPC_PROTO = "application/grpc+proto"; + String APPLICATION_GRPC_JSON = "application/grpc+json"; boolean isProtobuf(); boolean isJson(); @@ -64,26 +66,19 @@ interface RequestOptions { /** * Through this interface the {@link ServiceInterface} implementation will send responses back to the client. - * The {@link #start()} method is called before any responses are sent, and the {@link #close()} method - * is called after all responses have been sent. + * The {@link #close()} method is called after all responses have been sent. * *

It is not common for an application to implement or use this interface. It is typically implemented by * a webserver to integrate PBJ into that server. */ interface ResponseCallback { - /** - * Called by the {@link ServiceInterface} implementation to before any responses have been sent to the client. - * This must be called before {@link #send(Bytes)} is called. - */ - void start(); - /** * Called to send a single response message to the client. For unary methods, this will be called once. For * server-side streaming or bidi-streaming, this may be called many times. * * @param response A response message to send to the client. */ - void send(/*@NonNull*/ Bytes response); + void send(@NonNull Bytes response); /** * Called to close the connection with the client, signaling that no more responses will be sent. @@ -92,11 +87,11 @@ interface ResponseCallback { } /** Gets the simple name of the service. For example, "HelloService". */ - /*@NonNull*/ String serviceName(); + @NonNull String serviceName(); /** Gets the full name of the service. For example, "example.HelloService". */ - /*@NonNull*/ String fullName(); + @NonNull String fullName(); /** Gets a list of each method in the service. This list may be empty but should never be null. */ - /*@NonNull*/ List methods(); + @NonNull List methods(); /** * Called by the webserver to open a new connection between the client and the service. This method may be called @@ -110,8 +105,8 @@ interface ResponseCallback { * @param callback A callback to send responses back to the client. */ void open( - /*@NonNull*/ RequestOptions opts, - /*@NonNull*/ Method method, - /*@NonNull*/ BlockingQueue messages, - /*@NonNull*/ ResponseCallback callback); + @NonNull RequestOptions opts, + @NonNull Method method, + @NonNull BlockingQueue messages, + @NonNull ResponseCallback callback); }