diff --git a/hello-grpc-java/client_pom.xml b/hello-grpc-java/client_pom.xml index 25897d8..12da893 100644 --- a/hello-grpc-java/client_pom.xml +++ b/hello-grpc-java/client_pom.xml @@ -6,23 +6,25 @@ org.feuyeux.grpc hello-grpc-java-client 1.0.0-SNAPSHOT + 21 - 1.58.0 + 1.65.0 3.24.3 0.6.1 - 1.7.0 + 1.7.1 6.0.53 3.3.0 - 1.18.28 - 5.10.0 + 5.10.2 1.19.0 + + 3.13.0 - + io.grpc grpc-netty @@ -78,9 +80,8 @@ io.etcd jetcd-core - 0.7.6 + 0.8.2 - slf4j-api org.slf4j @@ -91,7 +92,7 @@ com.google.guava guava - 32.1.1-jre + 33.2.1-jre @@ -99,61 +100,65 @@ org.apache.logging.log4j log4j-slf4j-impl - 2.20.0 + 2.23.1 - - - lombok - org.projectlombok - provided - ${lombok.version} - org.apache.tomcat annotations-api ${annotations-api.version} provided + + + + io.netty + netty-resolver-dns-native-macos + 4.1.111.Final + osx-x86_64 + + org.junit.jupiter junit-jupiter ${junit.version} + test + - system-rules com.github.stefanbirkner + system-rules ${system-rules.version} test - - - io.netty - netty-resolver-dns-native-macos - 4.1.87.Final - osx-x86_64 - + - os-maven-plugin kr.motd.maven + os-maven-plugin ${os-maven-plugin.version} ${project.name} + org.apache.maven.plugins maven-compiler-plugin - 3.11.0 + ${maven-compiler-plugin.version} UTF-8 - ${jdk.version} + ${jdk.version} + ${jdk.version} + + --add-exports + jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED + @@ -179,6 +184,22 @@ + + + com.spotify.fmt + fmt-maven-plugin + 2.23 + + + + + + + check + + + + org.apache.maven.plugins maven-shade-plugin diff --git a/hello-grpc-java/pom.xml b/hello-grpc-java/pom.xml index 402ff38..f74ff81 100644 --- a/hello-grpc-java/pom.xml +++ b/hello-grpc-java/pom.xml @@ -10,7 +10,7 @@ 21 - 1.64.0 + 1.65.0 3.24.3 0.6.1 @@ -24,7 +24,7 @@ - + io.grpc grpc-netty @@ -82,7 +82,6 @@ jetcd-core 0.8.2 - slf4j-api org.slf4j @@ -110,45 +109,56 @@ ${annotations-api.version} provided + + + + io.netty + netty-resolver-dns-native-macos + 4.1.111.Final + osx-x86_64 + + org.junit.jupiter junit-jupiter ${junit.version} + test + - system-rules com.github.stefanbirkner + system-rules ${system-rules.version} test - - - io.netty - netty-resolver-dns-native-macos - 4.1.111.Final - osx-x86_64 - + - os-maven-plugin kr.motd.maven + os-maven-plugin ${os-maven-plugin.version} ${project.name} + org.apache.maven.plugins maven-compiler-plugin ${maven-compiler-plugin.version} UTF-8 - ${jdk.version} + ${jdk.version} + ${jdk.version} + + --add-exports + jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED + @@ -174,14 +184,17 @@ - + - com.coveo + com.spotify.fmt fmt-maven-plugin - 2.13 + 2.23 + + + format diff --git a/hello-grpc-java/server_pom.xml b/hello-grpc-java/server_pom.xml index 053b8ea..af48807 100644 --- a/hello-grpc-java/server_pom.xml +++ b/hello-grpc-java/server_pom.xml @@ -6,23 +6,25 @@ org.feuyeux.grpc hello-grpc-java-server 1.0.0-SNAPSHOT + 21 - 1.58.0 + 1.65.0 3.24.3 0.6.1 - 1.7.0 + 1.7.1 6.0.53 3.3.0 - 1.18.28 - 5.10.0 + 5.10.2 1.19.0 + + 3.13.0 - + io.grpc grpc-netty @@ -78,9 +80,8 @@ io.etcd jetcd-core - 0.7.6 + 0.8.2 - slf4j-api org.slf4j @@ -91,7 +92,7 @@ com.google.guava guava - 32.1.1-jre + 33.2.1-jre @@ -99,61 +100,65 @@ org.apache.logging.log4j log4j-slf4j-impl - 2.20.0 + 2.23.1 - - - lombok - org.projectlombok - provided - ${lombok.version} - org.apache.tomcat annotations-api ${annotations-api.version} provided + + + + io.netty + netty-resolver-dns-native-macos + 4.1.111.Final + osx-x86_64 + + org.junit.jupiter junit-jupiter ${junit.version} + test + - system-rules com.github.stefanbirkner + system-rules ${system-rules.version} test - - - io.netty - netty-resolver-dns-native-macos - 4.1.87.Final - osx-x86_64 - + - os-maven-plugin kr.motd.maven + os-maven-plugin ${os-maven-plugin.version} ${project.name} + org.apache.maven.plugins maven-compiler-plugin - 3.11.0 + ${maven-compiler-plugin.version} UTF-8 - ${jdk.version} + ${jdk.version} + ${jdk.version} + + --add-exports + jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED + @@ -179,6 +184,22 @@ + + + com.spotify.fmt + fmt-maven-plugin + 2.23 + + + + + + + check + + + + org.apache.maven.plugins maven-shade-plugin diff --git a/hello-grpc-java/sever_start.sh b/hello-grpc-java/sever_start.sh index 220c5f1..1f915c4 100755 --- a/hello-grpc-java/sever_start.sh +++ b/hello-grpc-java/sever_start.sh @@ -6,5 +6,4 @@ SCRIPT_PATH="$( )" cd "$SCRIPT_PATH" || exit sh build.sh -export JAVA_HOME=/usr/local/opt/openjdk/libexec/openjdk.jdk/Contents/Home mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.server.ProtoServer" diff --git a/hello-grpc-java/src/main/java/org/feuyeux/grpc/client/ProtoClient.java b/hello-grpc-java/src/main/java/org/feuyeux/grpc/client/ProtoClient.java index 9f87ee1..b78ed02 100644 --- a/hello-grpc-java/src/main/java/org/feuyeux/grpc/client/ProtoClient.java +++ b/hello-grpc-java/src/main/java/org/feuyeux/grpc/client/ProtoClient.java @@ -25,16 +25,12 @@ public class ProtoClient { private static final Logger log = LoggerFactory.getLogger("ProtoClient"); - private final ManagedChannel channel; - private final LandingServiceGrpc.LandingServiceBlockingStub blockingStub; - private final LandingServiceGrpc.LandingServiceStub asyncStub; + private ManagedChannel channel; + private LandingServiceGrpc.LandingServiceBlockingStub blockingStub; + private LandingServiceGrpc.LandingServiceStub asyncStub; public ProtoClient() throws SSLException { - channel = Connection.getChannel(); - ClientInterceptor interceptor = new HeaderClientInterceptor(); - Channel interceptChannel = ClientInterceptors.intercept(channel, interceptor); - blockingStub = LandingServiceGrpc.newBlockingStub(interceptChannel); - asyncStub = LandingServiceGrpc.newStub(interceptChannel); + connect(Connection.getChannel()); } public static void main(String[] args) { @@ -79,7 +75,7 @@ public static void main(String[] args) { } } - private static void printResponse(TalkResponse response) { + public static void printResponse(TalkResponse response) { response .getResultsList() .forEach( @@ -97,10 +93,6 @@ private static void printResponse(TalkResponse response) { }); } - public void shutdown() throws InterruptedException { - channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); - } - public TalkResponse talk(TalkRequest talkRequest) { return blockingStub.talk(talkRequest); } @@ -189,4 +181,21 @@ public void onCompleted() { } }; } + + public void connect(ManagedChannel channel) throws SSLException { + ClientInterceptor interceptor = new HeaderClientInterceptor(); + Channel interceptChannel = ClientInterceptors.intercept(channel, interceptor); + blockingStub = LandingServiceGrpc.newBlockingStub(interceptChannel); + asyncStub = LandingServiceGrpc.newStub(interceptChannel); + } + + public void shutdown() throws InterruptedException { + if (channel != null) { + channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } + } + + public ManagedChannel getChannel() { + return channel; + } } diff --git a/hello-grpc-java/src/main/java/org/feuyeux/grpc/client/ProtoClientWithReconnect.java b/hello-grpc-java/src/main/java/org/feuyeux/grpc/client/ProtoClientWithReconnect.java new file mode 100644 index 0000000..4968102 --- /dev/null +++ b/hello-grpc-java/src/main/java/org/feuyeux/grpc/client/ProtoClientWithReconnect.java @@ -0,0 +1,128 @@ +package org.feuyeux.grpc.client; + +import static org.feuyeux.grpc.client.ProtoClient.printResponse; +import static org.feuyeux.grpc.common.Connection.*; +import static org.feuyeux.grpc.common.HelloUtils.buildLinkRequests; + +import io.grpc.*; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.net.ssl.SSLException; +import org.feuyeux.grpc.common.Connection; +import org.feuyeux.grpc.proto.TalkRequest; +import org.feuyeux.grpc.proto.TalkResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProtoClientWithReconnect { + private static final Logger log = LoggerFactory.getLogger("ProtoClientWithReconnect"); + private final int maxReconnectBackoffMillis; + private final int initialReconnectBackoffMillis; + private final double backoffMultiplier; + private final int maxReconnectAttempts; + private volatile int reconnectAttempts; + private ProtoClient protoClient; + + public ProtoClientWithReconnect(ProtoClient protoClient) throws SSLException { + this.protoClient = protoClient; + // 最大重连退避时间(毫秒) + maxReconnectBackoffMillis = 30000; + // 初始重连退避时间(毫秒) + initialReconnectBackoffMillis = 1000; + // 退避乘数 + backoffMultiplier = 2.0; + // 最大重连尝试次数 + maxReconnectAttempts = 5; + } + + public static void main(String[] args) throws Exception { + ProtoClientWithReconnect protoClient = new ProtoClientWithReconnect(new ProtoClient()); + protoClient.start(); + } + + public void start() { + try { + talking(); + } catch (Exception e) { + log.error("", e); + } finally { + if (protoClient != null) { + try { + protoClient.shutdown(); + } catch (Exception e) { + log.error("", e); + } + } + } + } + + private ManagedChannel getChannel() { + return this.protoClient.getChannel(); + } + + private void talking() { + try { + do { + log.info("Unary RPC"); + TalkRequest talkRequest = TalkRequest.newBuilder().setMeta("JAVA").setData("0").build(); + log.info("Request data:{},meta:{}", talkRequest.getData(), talkRequest.getMeta()); + TalkResponse response = protoClient.talk(talkRequest); + printResponse(response); + + log.info("Server streaming RPC"); + talkRequest = TalkRequest.newBuilder().setMeta("JAVA").setData("0,1,2").build(); + log.info("Request data:{},meta:{}", talkRequest.getData(), talkRequest.getMeta()); + List talkResponses = protoClient.talkOneAnswerMore(talkRequest); + talkResponses.forEach(ProtoClient::printResponse); + + log.info("Client streaming RPC"); + protoClient.talkMoreAnswerOne(buildLinkRequests()); + + log.info("Bidirectional streaming RPC"); + protoClient.talkBidirectional(buildLinkRequests()); + log.info("=========="); + TimeUnit.SECONDS.sleep(3); + } while (reconnectAttempts < maxReconnectAttempts); + } catch (Exception e) { + ManagedChannel channel = getChannel(); + if (channel == null || channel.isShutdown()) { + if (reconnect()) { + talking(); + } + } else { + log.error("", e); + } + } + } + + private synchronized boolean reconnect() { + try { + if (reconnectAttempts < maxReconnectAttempts) { + long backoffTime = + Math.min( + initialReconnectBackoffMillis + * (long) Math.pow(backoffMultiplier, reconnectAttempts), + maxReconnectBackoffMillis); + reconnectAttempts++; + try { + TimeUnit.MILLISECONDS.sleep(backoffTime); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + log.info("Reconnecting to server. Attempt {}", reconnectAttempts); + this.protoClient.connect(Connection.getChannel()); + return true; + } else { + log.error("Max reconnect attempts reached. Exiting."); + return false; + } + } catch (SSLException e) { + log.error("", e); + return false; + } + } + + public void shutdown() throws InterruptedException { + this.protoClient.shutdown(); + } +} diff --git a/hello-grpc-java/src/main/java/org/feuyeux/grpc/common/Connection.java b/hello-grpc-java/src/main/java/org/feuyeux/grpc/common/Connection.java index 1f969da..4f14526 100644 --- a/hello-grpc-java/src/main/java/org/feuyeux/grpc/common/Connection.java +++ b/hello-grpc-java/src/main/java/org/feuyeux/grpc/common/Connection.java @@ -1,12 +1,16 @@ package org.feuyeux.grpc.common; +import static org.feuyeux.grpc.common.HelloUtils.getVersion; + import com.google.common.base.Charsets; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; +import io.etcd.jetcd.Lease; import io.etcd.jetcd.lease.LeaseKeepAliveResponse; import io.etcd.jetcd.options.PutOption; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.NameResolverRegistry; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; @@ -26,8 +30,6 @@ public class Connection { private static final Logger log = LoggerFactory.getLogger("Connection"); - public static String version = "grpc.version=1.58.0,protoc.version=3.24.3"; - public static final String GRPC_HELLO_SECURE = "GRPC_HELLO_SECURE"; public static final String GRPC_SERVER = "GRPC_SERVER"; public static final String GRPC_SERVER_PORT = "GRPC_SERVER_PORT"; @@ -39,11 +41,11 @@ public class Connection { private static final int port = 9996; // https://myssl.com/create_test_cert.html - private static String cert = "/var/hello_grpc/client_certs/cert.pem"; - private static String certKey = "/var/hello_grpc/client_certs/private.pkcs8.key"; - private static String certChain = "/var/hello_grpc/client_certs/full_chain.pem"; - private static String rootCert = "/var/hello_grpc/client_certs/myssl_root.cer"; - private static String serverName = "hello.grpc.io"; + private static final String cert = "/var/hello_grpc/client_certs/cert.pem"; + private static final String certKey = "/var/hello_grpc/client_certs/private.pkcs8.key"; + private static final String certChain = "/var/hello_grpc/client_certs/full_chain.pem"; + private static final String rootCert = "/var/hello_grpc/client_certs/myssl_root.cer"; + private static final String serverName = "hello.grpc.io"; public static String server = System.getenv(GRPC_SERVER); public static String currentPort = System.getenv(GRPC_SERVER_PORT); @@ -113,25 +115,23 @@ public static ManagedChannel getChannel() throws SSLException { List endpoints = new ArrayList<>(); endpoints.add(URI.create(getDiscoveryEndpoint())); EtcdNameResolverProvider nameResolver = EtcdNameResolverProvider.forEndpoints(endpoints); - builder = - ManagedChannelBuilder.forTarget(target) - .nameResolverFactory(nameResolver) - .defaultLoadBalancingPolicy(LB_ROUND_ROBIN); + builder = ManagedChannelBuilder.forTarget(target).defaultLoadBalancingPolicy(LB_ROUND_ROBIN); + NameResolverRegistry.getDefaultRegistry().register(nameResolver); } else { builder = NettyChannelBuilder.forAddress(connectTo, port); } if (secure == null || !secure.equals("Y")) { if (isDiscovery()) { - log.info("Connect with InSecure({}) [{}]", target, version); + log.info("Connect with InSecure({}) [{}]", target, getVersion()); } else { - log.info("Connect with InSecure({}:{}) [{}]", connectTo, port, version); + log.info("Connect with InSecure({}:{}) [{}]", connectTo, port, getVersion()); } return builder.usePlaintext().build(); } else { if (isDiscovery()) { - log.info("Connect with TLS({}) [{}]", target, version); + log.info("Connect with TLS({}) [{}]", target, getVersion()); } else { - log.info("Connect with TLS({}:{}) [{}]", connectTo, port, version); + log.info("Connect with TLS({}:{}) [{}]", connectTo, port, getVersion()); } return ((NettyChannelBuilder) builder) .overrideAuthority(serverName) /* Only for using provided test certs. */ @@ -150,39 +150,40 @@ private static String getDiscoveryEndpoint() { return endpoint; } - public static void register(Client etcd) throws ExecutionException, InterruptedException { + public static void register() throws ExecutionException, InterruptedException { if (isDiscovery()) { final URI uri = URI.create("http://" + getGrcServerHost() + ":" + getGrcServerPort()); - etcd = Client.builder().endpoints(URI.create(getDiscoveryEndpoint())).build(); + Client etcd = Client.builder().endpoints(URI.create(getDiscoveryEndpoint())).build(); long leaseId = etcd.getLeaseClient().grant(TTL).get().getID(); ByteSequence key = ByteSequence.from(SVC_DISC_NAME + "/" + uri.toASCIIString(), Charsets.US_ASCII); ByteSequence value = ByteSequence.from(Long.toString(leaseId), Charsets.US_ASCII); PutOption option = PutOption.builder().withLeaseId(leaseId).build(); etcd.getKVClient().put(key, value, option); - etcd.getLeaseClient() - .keepAlive( - leaseId, - new StreamObserver<>() { - @Override - public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) { - log.debug("got renewal for lease: " + leaseKeepAliveResponse.getID()); - } - - @Override - public void onError(Throwable throwable) { - log.error("", throwable); - } - - @Override - public void onCompleted() { - log.info("lease completed"); - } - }); + try (Lease leaseClient = etcd.getLeaseClient()) { + leaseClient.keepAlive( + leaseId, + new StreamObserver<>() { + @Override + public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) { + log.debug("got renewal for lease: " + leaseKeepAliveResponse.getID()); + } + + @Override + public void onError(Throwable throwable) { + log.error("", throwable); + } + + @Override + public void onCompleted() { + log.info("lease completed"); + } + }); + } } } private static boolean isDiscovery() { - return discovery != null && "etcd".equals(discovery); + return "etcd".equals(discovery); } } diff --git a/hello-grpc-java/src/main/java/org/feuyeux/grpc/common/HelloUtils.java b/hello-grpc-java/src/main/java/org/feuyeux/grpc/common/HelloUtils.java index eaf66de..e7247fd 100644 --- a/hello-grpc-java/src/main/java/org/feuyeux/grpc/common/HelloUtils.java +++ b/hello-grpc-java/src/main/java/org/feuyeux/grpc/common/HelloUtils.java @@ -2,6 +2,7 @@ import static java.util.stream.Collectors.toList; +import io.grpc.internal.GrpcUtil; import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -49,4 +50,12 @@ public static List getRandomIds(int max) { public static String getRandomId() { return String.valueOf(random.nextInt(5)); } + + public static String getVersion() { + try { + return String.format("grpc.version=%s", GrpcUtil.IMPLEMENTATION_VERSION); + } catch (Exception e) { + return ""; + } + } } diff --git a/hello-grpc-java/src/main/java/org/feuyeux/grpc/server/ProtoServer.java b/hello-grpc-java/src/main/java/org/feuyeux/grpc/server/ProtoServer.java index 87cb1c4..6174620 100644 --- a/hello-grpc-java/src/main/java/org/feuyeux/grpc/server/ProtoServer.java +++ b/hello-grpc-java/src/main/java/org/feuyeux/grpc/server/ProtoServer.java @@ -1,8 +1,8 @@ package org.feuyeux.grpc.server; import static org.feuyeux.grpc.common.Connection.*; +import static org.feuyeux.grpc.common.HelloUtils.getVersion; -import io.etcd.jetcd.Client; import io.grpc.*; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NettyServerBuilder; @@ -23,13 +23,12 @@ public class ProtoServer { // https://myssl.com/create_test_cert.html - private static String cert = "/var/hello_grpc/server_certs/cert.pem"; - private static String certKey = "/var/hello_grpc/server_certs/private.pkcs8.key"; - private static String certChain = "/var/hello_grpc/server_certs/full_chain.pem"; - private static String rootCert = "/var/hello_grpc/server_certs/myssl_root.cer"; + private static final String cert = "/var/hello_grpc/server_certs/cert.pem"; + private static final String certKey = "/var/hello_grpc/server_certs/private.pkcs8.key"; + private static final String certChain = "/var/hello_grpc/server_certs/full_chain.pem"; + private static final String rootCert = "/var/hello_grpc/server_certs/myssl_root.cer"; private static ManagedChannel channel; private final Server server; - private Client etcd; private static final Logger log = LoggerFactory.getLogger("ProtoServer"); @@ -73,10 +72,10 @@ public void transportTerminated(Attributes transportAttrs) { } }); if (secure == null || !secure.equals("Y")) { - log.info("Start GRPC Server :{} [{}]", port, version); + log.info("Start GRPC Server :{} [{}]", port, getVersion()); return serverBuilder.build(); } else { - log.info("Start GRPC TLS Server :{} [{}]", port, version); + log.info("Start GRPC TLS Server :{} [{}]", port, getVersion()); return serverBuilder.sslContext(getSslContextBuilder().build()).build(); } } @@ -91,17 +90,20 @@ private SslContextBuilder getSslContextBuilder() { private void start() throws IOException, ExecutionException, InterruptedException { server.start(); - register(etcd); + register(); Runtime.getRuntime() .addShutdownHook( new Thread( () -> { log.warn("shutting down Google RPC Server since JVM is shutting down"); ProtoServer.this.stop(); - log.warn("Google RPC Server shut down"); })); } + public void stop() { + log.warn("Google RPC Server shut down"); + } + public void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); @@ -110,11 +112,4 @@ public void blockUntilShutdown() throws InterruptedException { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } } - - public void stop() { - if (etcd != null) { - etcd.close(); - } - server.shutdown(); - } } diff --git a/hello-grpc-java/src/test/java/org/feuyeux/grpc/ProtoTest.java b/hello-grpc-java/src/test/java/org/feuyeux/grpc/ProtoTest.java index 988afbf..e155799 100644 --- a/hello-grpc-java/src/test/java/org/feuyeux/grpc/ProtoTest.java +++ b/hello-grpc-java/src/test/java/org/feuyeux/grpc/ProtoTest.java @@ -9,7 +9,9 @@ import java.net.SocketException; import java.util.Enumeration; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.feuyeux.grpc.client.ProtoClient; +import org.feuyeux.grpc.client.ProtoClientWithReconnect; import org.feuyeux.grpc.proto.TalkRequest; import org.feuyeux.grpc.proto.TalkResponse; import org.feuyeux.grpc.server.LandingServiceImpl; @@ -25,7 +27,8 @@ public class ProtoTest { @Rule public final EnvironmentVariables environmentVariables = new EnvironmentVariables(); - public static String getLocalIp() { + @Test + public void testGetLocalIp() { try { Enumeration allNetInterfaces = NetworkInterface.getNetworkInterfaces(); InetAddress ip; @@ -42,22 +45,23 @@ public static String getLocalIp() { String t = ip.getHostAddress(); if (!"127.0.0.1".equals(t)) { // 只返回不是本地的IP - return t; + log.info("IP:{}", t); } } } } - return null; } catch (SocketException e) { log.error("", e); - return null; } } @Test() public void testProto() throws InterruptedException, IOException, ExecutionException { environmentVariables.set("GRPC_HELLO_SECURE", "Y"); + log.info("Start server"); ProtoServer protoServer = new ProtoServer(new LandingServiceImpl()); + TimeUnit.SECONDS.sleep(3); + log.info("Start client"); ProtoClient protoClient = new ProtoClient(); TalkRequest talkRequest = TalkRequest.newBuilder().setMeta("id=" + System.nanoTime()).setData("eric").build(); @@ -70,4 +74,17 @@ public void testProto() throws InterruptedException, IOException, ExecutionExcep protoClient.shutdown(); protoServer.stop(); } + + @Test() + public void testReconnect() throws InterruptedException, IOException, ExecutionException { + ProtoServer protoServer; + ProtoClientWithReconnect protoClient = new ProtoClientWithReconnect(new ProtoClient()); + protoClient.start(); + protoServer = new ProtoServer(new LandingServiceImpl()); + for (int i = 0; i < 3; i++) { + TimeUnit.SECONDS.sleep(5); + protoServer.stop(); + } + protoClient.shutdown(); + } }