Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring Code Smells #1083

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
/*
* Copyright (c) 2016-2023 The gRPC-Spring Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.devh.boot.grpc.examples.observability.frontend;

import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -41,6 +25,10 @@ public class FrontendApplication implements CommandLineRunner {

private static final Logger LOGGER = Logger.getLogger(FrontendApplication.class.getName());

// Define constants for byte array sizes
private static final int MIN_BYTE_ARRAY_SIZE = 10240;
private static final int MAX_BYTE_ARRAY_SIZE = 20480;

public static void main(String[] args) {
SpringApplication.run(FrontendApplication.class, args);
}
Expand All @@ -49,7 +37,7 @@ public static void main(String[] args) {
private ExampleServiceStub stub;

private void CallUnaryRpc() {
byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(10240, 20480)];
byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(MIN_BYTE_ARRAY_SIZE, MAX_BYTE_ARRAY_SIZE)];
ThreadLocalRandom.current().nextBytes(bytes);
UnaryRequest request = UnaryRequest.newBuilder().setMessage(new String(bytes)).build();
stub.unaryRpc(request, new StreamObserver<>() {
Expand All @@ -70,7 +58,7 @@ public void onCompleted() {
}

private void CallClientStreamingRpc() {
byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(10240, 20480)];
byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(MIN_BYTE_ARRAY_SIZE, MAX_BYTE_ARRAY_SIZE)];
ThreadLocalRandom.current().nextBytes(bytes);
ClientStreamingRequest request = ClientStreamingRequest.newBuilder()
.setMessage(new String(bytes)).build();
Expand All @@ -94,7 +82,7 @@ public void onCompleted() {
}

private void CallServerStreamingRpc() {
byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(10240, 20480)];
byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(MIN_BYTE_ARRAY_SIZE, MAX_BYTE_ARRAY_SIZE)];
ThreadLocalRandom.current().nextBytes(bytes);
ServerStreamingRequest request = ServerStreamingRequest.newBuilder()
.setMessage(new String(bytes)).build();
Expand All @@ -115,7 +103,7 @@ public void onCompleted() {
}

private void CallBidStreamingRpc() {
byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(10240, 20480)];
byte[] bytes = new byte[ThreadLocalRandom.current().nextInt(MIN_BYTE_ARRAY_SIZE, MAX_BYTE_ARRAY_SIZE)];
ThreadLocalRandom.current().nextBytes(bytes);
BidiStreamingRequest request = BidiStreamingRequest.newBuilder()
.setMessage(new String(bytes)).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,29 @@ protected void configureSecurity(final T builder, final String name) {
final GrpcChannelProperties properties = getPropertiesFor(name);
final Security security = properties.getSecurity();

if (properties.getNegotiationType() != NegotiationType.TLS // non-default
|| isNonNullAndNonBlank(security.getAuthorityOverride())
|| security.getCertificateChain() != null
|| security.getPrivateKey() != null
|| security.getTrustCertCollection() != null) {
if (isSecurityConfigured(properties, security)) {
throw new IllegalStateException(
"Security is configured but this implementation does not support security!");
}
}

private boolean isSecurityConfigured(GrpcChannelProperties properties, Security security) {
return isNonDefaultNegotiationType(properties)
|| isSecurityDetailsPresent(security);
}

private boolean isNonDefaultNegotiationType(GrpcChannelProperties properties) {
return properties.getNegotiationType() != NegotiationType.TLS;
}

private boolean isSecurityDetailsPresent(Security security) {
return isNonNullAndNonBlank(security.getAuthorityOverride())
|| security.getCertificateChain() != null
|| security.getPrivateKey() != null
|| security.getTrustCertCollection() != null;
}


/**
* Checks whether the given value is non null and non blank.
*
Expand Down Expand Up @@ -335,13 +348,24 @@ public synchronized void close() {
return;
}
this.shutdown = true;
final List<ShutdownRecord> shutdownEntries = prepareShutdownEntries();
awaitChannelTermination(shutdownEntries);
forceShutdownRemainingChannels();
logShutdownCompletion(this.channels.size());
}

private List<ShutdownRecord> prepareShutdownEntries() {
final List<ShutdownRecord> shutdownEntries = new ArrayList<>();
for (final Entry<String, ManagedChannel> entry : this.channels.entrySet()) {
final ManagedChannel channel = entry.getValue();
channel.shutdown();
final long gracePeriod = this.properties.getChannel(entry.getKey()).getShutdownGracePeriod().toMillis();
shutdownEntries.add(new ShutdownRecord(entry.getKey(), channel, gracePeriod));
}
return shutdownEntries;
}

private void awaitChannelTermination(List<ShutdownRecord> shutdownEntries) {
try {
final long start = System.currentTimeMillis();
shutdownEntries.sort(comparingLong(ShutdownRecord::getGracePeriod));
Expand All @@ -363,15 +387,19 @@ public synchronized void close() {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
log.debug("We got interrupted - Speeding up shutdown process");
} finally {
for (final ManagedChannel channel : this.channels.values()) {
if (!channel.isTerminated()) {
log.debug("Channel not terminated yet - force shutdown now: {} ", channel);
channel.shutdownNow();
}
}
}

private void forceShutdownRemainingChannels() {
for (final ManagedChannel channel : this.channels.values()) {
if (!channel.isTerminated()) {
log.debug("Channel not terminated yet - force shutdown now: {} ", channel);
channel.shutdownNow();
}
}
final int channelCount = this.channels.size();
}

private void logShutdownCompletion(int channelCount) {
this.channels.clear();
this.channelStates.clear();
log.debug("GrpcChannelFactory closed (including {} channels)", channelCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import lombok.Getter;
import org.springframework.boot.convert.DataSizeUnit;
import org.springframework.boot.convert.DurationUnit;
import org.springframework.core.io.Resource;
Expand Down Expand Up @@ -505,12 +506,10 @@ public void copyDefaultsFrom(final GrpcChannelProperties config) {
if (this.userAgent == null) {
this.userAgent = config.userAgent;
}

this.security.copyDefaultsFrom(config.security);
}

/**
* A container with options for the channel's transport security.
*/
@ToString
@EqualsAndHashCode
public static class Security {
Expand Down Expand Up @@ -966,4 +965,7 @@ public void copyDefaultsFrom(final Security config) {

}




}
Original file line number Diff line number Diff line change
Expand Up @@ -44,88 +44,29 @@ public class GrpcAdviceConfig {

@GrpcService
public static class TestGrpcAdviceService extends TestServiceGrpc.TestServiceImplBase {

private Supplier<? extends RuntimeException> throwableToSimulate;
private LocationToThrow throwLocation;
private ExceptionSimulator exceptionSimulator = new ExceptionSimulator();

@Override
public void normal(final Empty request, final StreamObserver<SomeType> responseObserver) {

Assertions.assertThat(this.throwableToSimulate).isNotNull();
switch (this.throwLocation) {
case METHOD:
throw this.throwableToSimulate.get();
case RESPONSE_OBSERVER:
responseObserver.onError(this.throwableToSimulate.get());
break;
default:
throw new UnsupportedOperationException("Unsupported LocationToThrow: " + this.throwLocation);
}
exceptionSimulator.simulateException(responseObserver);
}

@Override
public StreamObserver<SomeType> echo(final StreamObserver<SomeType> responseObserver) {
Assertions.assertThat(this.throwableToSimulate).isNotNull();
switch (this.throwLocation) {
case METHOD:
throw this.throwableToSimulate.get();
case RESPONSE_OBSERVER:
responseObserver.onError(this.throwableToSimulate.get());
return responseObserver;
case REQUEST_OBSERVER_ON_NEXT:
return new StreamObserver<SomeType>() {

@Override
public void onNext(final SomeType value) {
throw TestGrpcAdviceService.this.throwableToSimulate.get();
}

@Override
public void onError(final Throwable t) {
responseObserver.onError(t);
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}

};
case REQUEST_OBSERVER_ON_COMPLETION:
return new StreamObserver<SomeType>() {

@Override
public void onNext(final SomeType value) {
responseObserver.onNext(value);
}

@Override
public void onError(final Throwable t) {
responseObserver.onError(t);
}

@Override
public void onCompleted() {
throw TestGrpcAdviceService.this.throwableToSimulate.get();
}

};
default:
throw new UnsupportedOperationException("Unsupported LocationToThrow: " + this.throwLocation);
}
exceptionSimulator.simulateException(responseObserver);
return responseObserver;
}

public void setExceptionToSimulate(final Supplier<? extends RuntimeException> exception) {
this.throwableToSimulate = exception;
exceptionSimulator.setExceptionToSimulate(exception);
}

public void setThrowLocation(final LocationToThrow throwLocation) {
this.throwLocation = throwLocation;
exceptionSimulator.setThrowLocation(throwLocation);
}


}


@GrpcAdvice
@Bean
public TestAdviceWithOutMetadata grpcAdviceWithBean() {
Expand Down