From 3a02600e719e5c61484b91ee014ca6ac36f77841 Mon Sep 17 00:00:00 2001 From: Kaushal Sapara Date: Mon, 1 Apr 2024 22:38:51 -0300 Subject: [PATCH 1/2] Set - 1 Refactoring --- .../frontend/FrontendApplication.java | 28 +++------- .../AbstractChannelFactory.java | 52 ++++++++++++++----- 2 files changed, 48 insertions(+), 32 deletions(-) diff --git a/examples/grpc-observability/frontend/src/main/java/net/devh/boot/grpc/examples/observability/frontend/FrontendApplication.java b/examples/grpc-observability/frontend/src/main/java/net/devh/boot/grpc/examples/observability/frontend/FrontendApplication.java index 190e93d90..6d299499c 100644 --- a/examples/grpc-observability/frontend/src/main/java/net/devh/boot/grpc/examples/observability/frontend/FrontendApplication.java +++ b/examples/grpc-observability/frontend/src/main/java/net/devh/boot/grpc/examples/observability/frontend/FrontendApplication.java @@ -1,19 +1,3 @@ -/* - * Copyright (c) 2016-2023 The gRPC-Spring Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package net.devh.boot.grpc.examples.observability.frontend; import java.util.concurrent.ThreadLocalRandom; @@ -41,6 +25,10 @@ public class FrontendApplication implements CommandLineRunner { private static final Logger LOGGER = Logger.getLogger(FrontendApplication.class.getName()); + // Define constants for byte array sizes + private static final int MIN_BYTE_ARRAY_SIZE = 10240; + private static final int MAX_BYTE_ARRAY_SIZE = 20480; + public static void main(String[] args) { SpringApplication.run(FrontendApplication.class, args); } @@ -49,7 +37,7 @@ public static void main(String[] args) { private ExampleServiceStub stub; private void CallUnaryRpc() { - byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(10240, 20480)]; + byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(MIN_BYTE_ARRAY_SIZE, MAX_BYTE_ARRAY_SIZE)]; ThreadLocalRandom.current().nextBytes(bytes); UnaryRequest request = UnaryRequest.newBuilder().setMessage(new String(bytes)).build(); stub.unaryRpc(request, new StreamObserver<>() { @@ -70,7 +58,7 @@ public void onCompleted() { } private void CallClientStreamingRpc() { - byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(10240, 20480)]; + byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(MIN_BYTE_ARRAY_SIZE, MAX_BYTE_ARRAY_SIZE)]; ThreadLocalRandom.current().nextBytes(bytes); ClientStreamingRequest request = ClientStreamingRequest.newBuilder() .setMessage(new String(bytes)).build(); @@ -94,7 +82,7 @@ public void onCompleted() { } private void CallServerStreamingRpc() { - byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(10240, 20480)]; + byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(MIN_BYTE_ARRAY_SIZE, MAX_BYTE_ARRAY_SIZE)]; ThreadLocalRandom.current().nextBytes(bytes); ServerStreamingRequest request = ServerStreamingRequest.newBuilder() .setMessage(new String(bytes)).build(); @@ -115,7 +103,7 @@ public void onCompleted() { } private void CallBidStreamingRpc() { - byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(10240, 20480)]; + byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(MIN_BYTE_ARRAY_SIZE, MAX_BYTE_ARRAY_SIZE)]; ThreadLocalRandom.current().nextBytes(bytes); BidiStreamingRequest request = BidiStreamingRequest.newBuilder() .setMessage(new String(bytes)).build(); diff --git a/grpc-client-spring-boot-starter/src/main/java/net/devh/boot/grpc/client/channelfactory/AbstractChannelFactory.java b/grpc-client-spring-boot-starter/src/main/java/net/devh/boot/grpc/client/channelfactory/AbstractChannelFactory.java index 4b6bb63ed..a6d18dd80 100644 --- a/grpc-client-spring-boot-starter/src/main/java/net/devh/boot/grpc/client/channelfactory/AbstractChannelFactory.java +++ b/grpc-client-spring-boot-starter/src/main/java/net/devh/boot/grpc/client/channelfactory/AbstractChannelFactory.java @@ -213,16 +213,29 @@ protected void configureSecurity(final T builder, final String name) { final GrpcChannelProperties properties = getPropertiesFor(name); final Security security = properties.getSecurity(); - if (properties.getNegotiationType() != NegotiationType.TLS // non-default - || isNonNullAndNonBlank(security.getAuthorityOverride()) - || security.getCertificateChain() != null - || security.getPrivateKey() != null - || security.getTrustCertCollection() != null) { + if (isSecurityConfigured(properties, security)) { throw new IllegalStateException( "Security is configured but this implementation does not support security!"); } } + private boolean isSecurityConfigured(GrpcChannelProperties properties, Security security) { + return isNonDefaultNegotiationType(properties) + || isSecurityDetailsPresent(security); + } + + private boolean isNonDefaultNegotiationType(GrpcChannelProperties properties) { + return properties.getNegotiationType() != NegotiationType.TLS; + } + + private boolean isSecurityDetailsPresent(Security security) { + return isNonNullAndNonBlank(security.getAuthorityOverride()) + || security.getCertificateChain() != null + || security.getPrivateKey() != null + || security.getTrustCertCollection() != null; + } + + /** * Checks whether the given value is non null and non blank. * @@ -335,6 +348,13 @@ public synchronized void close() { return; } this.shutdown = true; + final List shutdownEntries = prepareShutdownEntries(); + awaitChannelTermination(shutdownEntries); + forceShutdownRemainingChannels(); + logShutdownCompletion(this.channels.size()); + } + + private List prepareShutdownEntries() { final List shutdownEntries = new ArrayList<>(); for (final Entry entry : this.channels.entrySet()) { final ManagedChannel channel = entry.getValue(); @@ -342,6 +362,10 @@ public synchronized void close() { final long gracePeriod = this.properties.getChannel(entry.getKey()).getShutdownGracePeriod().toMillis(); shutdownEntries.add(new ShutdownRecord(entry.getKey(), channel, gracePeriod)); } + return shutdownEntries; + } + + private void awaitChannelTermination(List shutdownEntries) { try { final long start = System.currentTimeMillis(); shutdownEntries.sort(comparingLong(ShutdownRecord::getGracePeriod)); @@ -363,15 +387,19 @@ public synchronized void close() { } catch (final InterruptedException e) { Thread.currentThread().interrupt(); log.debug("We got interrupted - Speeding up shutdown process"); - } finally { - for (final ManagedChannel channel : this.channels.values()) { - if (!channel.isTerminated()) { - log.debug("Channel not terminated yet - force shutdown now: {} ", channel); - channel.shutdownNow(); - } + } + } + + private void forceShutdownRemainingChannels() { + for (final ManagedChannel channel : this.channels.values()) { + if (!channel.isTerminated()) { + log.debug("Channel not terminated yet - force shutdown now: {} ", channel); + channel.shutdownNow(); } } - final int channelCount = this.channels.size(); + } + + private void logShutdownCompletion(int channelCount) { this.channels.clear(); this.channelStates.clear(); log.debug("GrpcChannelFactory closed (including {} channels)", channelCount); From b8664ba5c513779b9fd2d13c917b1b3bd756813b Mon Sep 17 00:00:00 2001 From: Kaushal Sapara Date: Mon, 1 Apr 2024 23:53:49 -0300 Subject: [PATCH 2/2] Set - 2 Refactoring --- .../client/config/GrpcChannelProperties.java | 8 +- .../grpc/test/config/GrpcAdviceConfig.java | 73 ++----------------- 2 files changed, 12 insertions(+), 69 deletions(-) diff --git a/grpc-client-spring-boot-starter/src/main/java/net/devh/boot/grpc/client/config/GrpcChannelProperties.java b/grpc-client-spring-boot-starter/src/main/java/net/devh/boot/grpc/client/config/GrpcChannelProperties.java index c19ad320e..156a2827b 100644 --- a/grpc-client-spring-boot-starter/src/main/java/net/devh/boot/grpc/client/config/GrpcChannelProperties.java +++ b/grpc-client-spring-boot-starter/src/main/java/net/devh/boot/grpc/client/config/GrpcChannelProperties.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import lombok.Getter; import org.springframework.boot.convert.DataSizeUnit; import org.springframework.boot.convert.DurationUnit; import org.springframework.core.io.Resource; @@ -505,12 +506,10 @@ public void copyDefaultsFrom(final GrpcChannelProperties config) { if (this.userAgent == null) { this.userAgent = config.userAgent; } + this.security.copyDefaultsFrom(config.security); } - /** - * A container with options for the channel's transport security. - */ @ToString @EqualsAndHashCode public static class Security { @@ -966,4 +965,7 @@ public void copyDefaultsFrom(final Security config) { } + + + } diff --git a/tests/src/test/java/net/devh/boot/grpc/test/config/GrpcAdviceConfig.java b/tests/src/test/java/net/devh/boot/grpc/test/config/GrpcAdviceConfig.java index 74ae6e4cb..14d36794a 100644 --- a/tests/src/test/java/net/devh/boot/grpc/test/config/GrpcAdviceConfig.java +++ b/tests/src/test/java/net/devh/boot/grpc/test/config/GrpcAdviceConfig.java @@ -44,88 +44,29 @@ public class GrpcAdviceConfig { @GrpcService public static class TestGrpcAdviceService extends TestServiceGrpc.TestServiceImplBase { - - private Supplier throwableToSimulate; - private LocationToThrow throwLocation; + private ExceptionSimulator exceptionSimulator = new ExceptionSimulator(); @Override public void normal(final Empty request, final StreamObserver responseObserver) { - - Assertions.assertThat(this.throwableToSimulate).isNotNull(); - switch (this.throwLocation) { - case METHOD: - throw this.throwableToSimulate.get(); - case RESPONSE_OBSERVER: - responseObserver.onError(this.throwableToSimulate.get()); - break; - default: - throw new UnsupportedOperationException("Unsupported LocationToThrow: " + this.throwLocation); - } + exceptionSimulator.simulateException(responseObserver); } @Override public StreamObserver echo(final StreamObserver responseObserver) { - Assertions.assertThat(this.throwableToSimulate).isNotNull(); - switch (this.throwLocation) { - case METHOD: - throw this.throwableToSimulate.get(); - case RESPONSE_OBSERVER: - responseObserver.onError(this.throwableToSimulate.get()); - return responseObserver; - case REQUEST_OBSERVER_ON_NEXT: - return new StreamObserver() { - - @Override - public void onNext(final SomeType value) { - throw TestGrpcAdviceService.this.throwableToSimulate.get(); - } - - @Override - public void onError(final Throwable t) { - responseObserver.onError(t); - } - - @Override - public void onCompleted() { - responseObserver.onCompleted(); - } - - }; - case REQUEST_OBSERVER_ON_COMPLETION: - return new StreamObserver() { - - @Override - public void onNext(final SomeType value) { - responseObserver.onNext(value); - } - - @Override - public void onError(final Throwable t) { - responseObserver.onError(t); - } - - @Override - public void onCompleted() { - throw TestGrpcAdviceService.this.throwableToSimulate.get(); - } - - }; - default: - throw new UnsupportedOperationException("Unsupported LocationToThrow: " + this.throwLocation); - } + exceptionSimulator.simulateException(responseObserver); + return responseObserver; } public void setExceptionToSimulate(final Supplier exception) { - this.throwableToSimulate = exception; + exceptionSimulator.setExceptionToSimulate(exception); } public void setThrowLocation(final LocationToThrow throwLocation) { - this.throwLocation = throwLocation; + exceptionSimulator.setThrowLocation(throwLocation); } - - } + @GrpcAdvice @Bean public TestAdviceWithOutMetadata grpcAdviceWithBean() {