Skip to content

Commit

Permalink
Fix exception in GrpcRetryer. (#2021)
Browse files Browse the repository at this point in the history
* Fix uncaught gRPC exception in retry logic.

* Limit retry timeout to deadline; don't query capabilities on health check RPCs.

* Oops, replace setMaximumInterval with setExpiration.

* Fix formatting.

* Back out the chnages to the interceptor, something hinky is happening.

* Add tests.
  • Loading branch information
chronos-tachyon authored Apr 17, 2024
1 parent c6cceca commit ed211fa
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 32 deletions.
1 change: 1 addition & 0 deletions temporal-serviceclient/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
api "org.slf4j:slf4j-api:$slf4jVersion"

testImplementation project(':temporal-testing')
testImplementation "io.grpc:grpc-testing:${grpcVersion}"
testImplementation "junit:junit:${junitVersion}"
testImplementation "org.mockito:mockito-core:${mockitoVersion}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
import io.grpc.health.v1.HealthGrpc;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.MetadataUtils;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse.Capabilities;
import io.temporal.internal.retryer.GrpcRetryer;
import io.temporal.internal.retryer.GrpcRetryer.GrpcRetryerOptions;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -87,7 +88,7 @@ final class ChannelManager {
private final Channel interceptedChannel;
private final HealthGrpc.HealthBlockingStub healthBlockingStub;

private final CompletableFuture<GetSystemInfoResponse.Capabilities> serverCapabilitiesFuture =
private final CompletableFuture<Capabilities> serverCapabilitiesFuture =
new CompletableFuture<>();

public ChannelManager(
Expand Down Expand Up @@ -289,8 +290,8 @@ public void connect(String healthCheckServiceName, @Nullable Duration timeout) {
if (timeout == null) {
timeout = options.getRpcTimeout();
}
GrpcRetryer.GrpcRetryerOptions grpcRetryerOptions =
new GrpcRetryer.GrpcRetryerOptions(
GrpcRetryerOptions grpcRetryerOptions =
new GrpcRetryerOptions(
RpcRetryOptions.newBuilder().setExpiration(timeout).validateBuildWithDefaults(), null);

new GrpcRetryer(getServerCapabilities())
Expand All @@ -310,30 +311,24 @@ public void connect(String healthCheckServiceName, @Nullable Duration timeout) {
*/
public HealthCheckResponse healthCheck(
String healthCheckServiceName, @Nullable Duration timeout) {
HealthGrpc.HealthBlockingStub stub;
if (timeout != null) {
stub =
this.healthBlockingStub.withDeadline(
Deadline.after(
options.getHealthCheckAttemptTimeout().toMillis(), TimeUnit.MILLISECONDS));
} else {
stub = this.healthBlockingStub;
if (timeout == null) {
timeout = options.getHealthCheckAttemptTimeout();
}
return stub.check(HealthCheckRequest.newBuilder().setService(healthCheckServiceName).build());
return this.healthBlockingStub
.withDeadline(deadlineFrom(timeout))
.check(HealthCheckRequest.newBuilder().setService(healthCheckServiceName).build());
}

public Supplier<GetSystemInfoResponse.Capabilities> getServerCapabilities() {
return () -> {
synchronized (serverCapabilitiesFuture) {
GetSystemInfoResponse.Capabilities capabilities = serverCapabilitiesFuture.getNow(null);
if (capabilities == null) {
serverCapabilitiesFuture.complete(
SystemInfoInterceptor.getServerCapabilitiesOrThrow(interceptedChannel, null));
capabilities = serverCapabilitiesFuture.getNow(null);
}
return capabilities;
}
};
public Supplier<Capabilities> getServerCapabilities() {
return () ->
SystemInfoInterceptor.getServerCapabilitiesWithRetryOrThrow(
serverCapabilitiesFuture,
interceptedChannel,
deadlineFrom(options.getHealthCheckAttemptTimeout()));
}

private static Deadline deadlineFrom(Duration duration) {
return Deadline.after(duration.toMillis(), TimeUnit.MILLISECONDS);
}

public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@
import io.grpc.*;
import io.temporal.api.workflowservice.v1.GetSystemInfoRequest;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse.Capabilities;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import io.temporal.internal.retryer.GrpcRetryer;
import io.temporal.internal.retryer.GrpcRetryer.GrpcRetryerOptions;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class SystemInfoInterceptor implements ClientInterceptor {

private final CompletableFuture<GetSystemInfoResponse.Capabilities> serverCapabilitiesFuture;
private final CompletableFuture<Capabilities> serverCapabilitiesFuture;

public SystemInfoInterceptor(
CompletableFuture<GetSystemInfoResponse.Capabilities> serverCapabilitiesFuture) {
public SystemInfoInterceptor(CompletableFuture<Capabilities> serverCapabilitiesFuture) {
this.serverCapabilitiesFuture = serverCapabilitiesFuture;
}

Expand Down Expand Up @@ -63,8 +69,7 @@ public void onMessage(RespT message) {
@Override
public void onClose(Status status, Metadata trailers) {
if (Status.UNIMPLEMENTED.getCode().equals(status.getCode())) {
serverCapabilitiesFuture.complete(
GetSystemInfoResponse.Capabilities.getDefaultInstance());
serverCapabilitiesFuture.complete(Capabilities.getDefaultInstance());
}
super.onClose(status, trailers);
}
Expand All @@ -87,7 +92,39 @@ public void onClose(Status status, Metadata trailers) {
};
}

public static GetSystemInfoResponse.Capabilities getServerCapabilitiesOrThrow(
public static Capabilities getServerCapabilitiesWithRetryOrThrow(
@Nonnull CompletableFuture<Capabilities> future,
@Nonnull Channel channel,
@Nullable Deadline deadline) {
Capabilities capabilities = future.getNow(null);
if (capabilities == null) {
synchronized (Objects.requireNonNull(future)) {
capabilities = future.getNow(null);
if (capabilities == null) {
if (deadline == null) {
deadline = Deadline.after(30, TimeUnit.SECONDS);
}
Deadline computedDeadline = deadline;
RpcRetryOptions rpcRetryOptions =
RpcRetryOptions.newBuilder()
.setExpiration(
Duration.ofMillis(computedDeadline.timeRemaining(TimeUnit.MILLISECONDS)))
.validateBuildWithDefaults();
GrpcRetryerOptions grpcRetryerOptions =
new GrpcRetryerOptions(rpcRetryOptions, computedDeadline);
capabilities =
new GrpcRetryer(Capabilities::getDefaultInstance)
.retryWithResult(
() -> getServerCapabilitiesOrThrow(channel, computedDeadline),
grpcRetryerOptions);
future.complete(capabilities);
}
}
}
return capabilities;
}

public static Capabilities getServerCapabilitiesOrThrow(
Channel channel, @Nullable Deadline deadline) {
try {
return WorkflowServiceGrpc.newBlockingStub(channel)
Expand All @@ -96,7 +133,7 @@ public static GetSystemInfoResponse.Capabilities getServerCapabilitiesOrThrow(
.getCapabilities();
} catch (StatusRuntimeException ex) {
if (Status.Code.UNIMPLEMENTED.equals(ex.getStatus().getCode())) {
return GetSystemInfoResponse.Capabilities.getDefaultInstance();
return Capabilities.getDefaultInstance();
}
throw ex;
}
Expand Down
Loading

0 comments on commit ed211fa

Please sign in to comment.