Skip to content

Commit

Permalink
Add per-shard Redis circuit breakers
Browse files Browse the repository at this point in the history
  • Loading branch information
eager-signal authored Apr 12, 2024
1 parent 05a9249 commit 2dc707d
Show file tree
Hide file tree
Showing 22 changed files with 1,677 additions and 313 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,10 @@
import org.whispersystems.textsecuregcm.push.PushLatencyManager;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.redis.ClusterFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ConnectionEventLogger;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.ShardFaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.registration.RegistrationServiceClient;
import org.whispersystems.textsecuregcm.s3.PolicySigner;
import org.whispersystems.textsecuregcm.s3.PostPolicyGenerator;
Expand Down Expand Up @@ -412,18 +414,25 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
final VerificationSessions verificationSessions = new VerificationSessions(dynamoDbAsyncClient,
config.getDynamoDbTables().getVerificationSessions().getTableName(), clock);

final ClientResources redisClientResources = ClientResources.builder()
.commandLatencyRecorder(new MicrometerCommandLatencyRecorder(Metrics.globalRegistry, MicrometerOptions.builder().build()))
.build();
final ClientResources.Builder redisClientResourcesBuilder = ClientResources.builder()
.commandLatencyRecorder(
new MicrometerCommandLatencyRecorder(Metrics.globalRegistry, MicrometerOptions.builder().build()));
final ClientResources redisClientResources = redisClientResourcesBuilder.build();

ConnectionEventLogger.logConnectionEvents(redisClientResources);

FaultTolerantRedisCluster cacheCluster = new FaultTolerantRedisCluster("main_cache_cluster", config.getCacheClusterConfiguration(), redisClientResources);
FaultTolerantRedisCluster messagesCluster = new FaultTolerantRedisCluster("messages_cluster", config.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClientResources);
FaultTolerantRedisCluster clientPresenceCluster = new FaultTolerantRedisCluster("client_presence_cluster", config.getClientPresenceClusterConfiguration(), redisClientResources);
FaultTolerantRedisCluster metricsCluster = new FaultTolerantRedisCluster("metrics_cluster", config.getMetricsClusterConfiguration(), redisClientResources);
FaultTolerantRedisCluster pushSchedulerCluster = new FaultTolerantRedisCluster("push_scheduler", config.getPushSchedulerCluster(), redisClientResources);
FaultTolerantRedisCluster rateLimitersCluster = new FaultTolerantRedisCluster("rate_limiters", config.getRateLimitersCluster(), redisClientResources);
FaultTolerantRedisCluster cacheCluster = new ClusterFaultTolerantRedisCluster("main_cache_cluster",
config.getCacheClusterConfiguration(), redisClientResources);
FaultTolerantRedisCluster messagesCluster = new ClusterFaultTolerantRedisCluster("messages_cluster",
config.getMessageCacheConfiguration().getRedisClusterConfiguration(), redisClientResources);
FaultTolerantRedisCluster clientPresenceCluster = new ClusterFaultTolerantRedisCluster("client_presence_cluster",
config.getClientPresenceClusterConfiguration(), redisClientResources);
FaultTolerantRedisCluster metricsCluster = new ShardFaultTolerantRedisCluster("metrics",
config.getMetricsClusterConfiguration(), redisClientResourcesBuilder);
FaultTolerantRedisCluster pushSchedulerCluster = new ClusterFaultTolerantRedisCluster("push_scheduler",
config.getPushSchedulerCluster(), redisClientResources);
FaultTolerantRedisCluster rateLimitersCluster = new ClusterFaultTolerantRedisCluster("rate_limiters",
config.getRateLimitersCluster(), redisClientResources);

final BlockingQueue<Runnable> keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(100_000);
Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ public class CircuitBreakerConfiguration {

@JsonProperty
@NotNull
@Min(1)
private long waitDurationInOpenStateInSeconds = 10;
private Duration waitDurationInOpenState = Duration.ofSeconds(10);

@JsonProperty
private List<String> ignoredExceptions = Collections.emptyList();
Expand All @@ -64,8 +63,8 @@ public int getSlidingWindowMinimumNumberOfCalls() {
return slidingWindowMinimumNumberOfCalls;
}

public long getWaitDurationInOpenStateInSeconds() {
return waitDurationInOpenStateInSeconds;
public Duration getWaitDurationInOpenState() {
return waitDurationInOpenState;
}

public List<Class<?>> getIgnoredExceptions() {
Expand Down Expand Up @@ -101,8 +100,8 @@ public void setPermittedNumberOfCallsInHalfOpenState(int size) {
}

@VisibleForTesting
public void setWaitDurationInOpenStateInSeconds(int seconds) {
this.waitDurationInOpenStateInSeconds = seconds;
public void setWaitDurationInOpenState(Duration duration) {
this.waitDurationInOpenState = duration;
}

@VisibleForTesting
Expand All @@ -115,7 +114,7 @@ public CircuitBreakerConfig toCircuitBreakerConfig() {
.failureRateThreshold(getFailureRateThreshold())
.ignoreExceptions(getIgnoredExceptions().toArray(new Class[0]))
.permittedNumberOfCallsInHalfOpenState(getPermittedNumberOfCallsInHalfOpenState())
.waitDurationInOpenState(Duration.ofSeconds(getWaitDurationInOpenStateInSeconds()))
.waitDurationInOpenState(getWaitDurationInOpenState())
.slidingWindow(getSlidingWindowSize(), getSlidingWindowMinimumNumberOfCalls(),
CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import io.micrometer.core.instrument.Tags;
import org.glassfish.jersey.SslConfigurator;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
Expand Down Expand Up @@ -56,7 +57,7 @@ public static Builder newBuilder() {
this.defaultRequestTimeout = defaultRequestTimeout;
this.breaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig());

CircuitBreakerUtil.registerMetrics(breaker, FaultTolerantHttpClient.class);
CircuitBreakerUtil.registerMetrics(breaker, FaultTolerantHttpClient.class, Tags.empty());

if (retryConfiguration != null) {
if (this.retryExecutor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public ProvisioningManager(final String redisUri,

this.circuitBreaker = CircuitBreaker.of("pubsub-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig());

CircuitBreakerUtil.registerMetrics(circuitBreaker, ProvisioningManager.class);
CircuitBreakerUtil.registerMetrics(circuitBreaker, ProvisioningManager.class, Tags.empty());

Metrics.gaugeMapSize(ACTIVE_LISTENERS_GAUGE_NAME, Tags.empty(), listenersByProvisioningAddress);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/

package org.whispersystems.textsecuregcm.redis;

import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.retry.Retry;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import reactor.core.scheduler.Scheduler;

public class ClusterFaultTolerantPubSubConnection<K, V> implements FaultTolerantPubSubConnection<K, V> {

private static final Logger logger = LoggerFactory.getLogger(ClusterFaultTolerantPubSubConnection.class);


private final String name;
private final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection;

private final CircuitBreaker circuitBreaker;
private final Retry retry;
private final Retry resubscribeRetry;
private final Scheduler topologyChangedEventScheduler;

private final Timer executeTimer;

public ClusterFaultTolerantPubSubConnection(final String name,
final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection, final CircuitBreaker circuitBreaker,
final Retry retry, final Retry resubscribeRetry, final Scheduler topologyChangedEventScheduler) {
this.name = name;
this.pubSubConnection = pubSubConnection;
this.circuitBreaker = circuitBreaker;
this.retry = retry;
this.resubscribeRetry = resubscribeRetry;
this.topologyChangedEventScheduler = topologyChangedEventScheduler;

this.pubSubConnection.setNodeMessagePropagation(true);

this.executeTimer = Metrics.timer(name(getClass(), "execute"), "clusterName", name + "-pubsub");

CircuitBreakerUtil.registerMetrics(circuitBreaker, ClusterFaultTolerantPubSubConnection.class, Tags.empty());
}

@Override
public void usePubSubConnection(final Consumer<StatefulRedisClusterPubSubConnection<K, V>> consumer) {
try {
circuitBreaker.executeCheckedRunnable(
() -> retry.executeRunnable(() -> executeTimer.record(() -> consumer.accept(pubSubConnection))));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}

@Override
public <T> T withPubSubConnection(final Function<StatefulRedisClusterPubSubConnection<K, V>, T> function) {
try {
return circuitBreaker.executeCheckedSupplier(
() -> retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection))));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}


@Override
public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) {

usePubSubConnection(connection -> connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.subscribeOn(topologyChangedEventScheduler)
.subscribe(event -> {
logger.info("Got topology change event for {}, resubscribing all keyspace notifications", name);

resubscribeRetry.executeRunnable(() -> {
try {
eventHandler.run();
} catch (final RuntimeException e) {
logger.warn("Resubscribe for {} failed", name, e);
throw e;
}
});
}));
}

}
Loading

0 comments on commit 2dc707d

Please sign in to comment.