diff --git a/common/astyanax/src/main/java/com/bazaarvoice/emodb/common/cassandra/health/CassandraHealthCheck.java b/common/astyanax/src/main/java/com/bazaarvoice/emodb/common/cassandra/health/CassandraHealthCheck.java index 824472304f..03069e63af 100644 --- a/common/astyanax/src/main/java/com/bazaarvoice/emodb/common/cassandra/health/CassandraHealthCheck.java +++ b/common/astyanax/src/main/java/com/bazaarvoice/emodb/common/cassandra/health/CassandraHealthCheck.java @@ -1,7 +1,16 @@ package com.bazaarvoice.emodb.common.cassandra.health; import com.bazaarvoice.emodb.common.cassandra.CassandraKeyspace; +import com.bazaarvoice.emodb.common.dropwizard.guava.MoreSuppliers; import com.codahale.metrics.health.HealthCheck; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.QueryTrace; +import com.datastax.driver.core.ResultSet; +import com.google.common.base.Joiner; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.base.Throwables; @@ -10,7 +19,9 @@ import com.netflix.astyanax.serializers.ByteBufferSerializer; import java.nio.ByteBuffer; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkNotNull; import static com.netflix.astyanax.model.ConsistencyLevel.CL_LOCAL_QUORUM; @@ -22,7 +33,8 @@ public class CassandraHealthCheck extends HealthCheck { private final CassandraKeyspace _keyspace; private final ColumnFamily _validationColumnFamily; private final Supplier _keySupplier; - private final Supplier> _pingCache; + private final Supplier _resultCache; + private final Supplier _cqlStatement; public CassandraHealthCheck(CassandraKeyspace keyspace, String validationColumnFamily, Supplier keySupplier) { @@ -32,17 +44,12 @@ public CassandraHealthCheck(CassandraKeyspace keyspace, String validationColumnF ByteBufferSerializer.get(), ByteBufferSerializer.get()); _keySupplier = checkNotNull(keySupplier, "keySupplier"); - // Rate limit health check calls to Cassandra. - _pingCache = Suppliers.memoizeWithExpiration(new Supplier>() { - @Override - public OperationResult get() { - try { - return ping(); - } catch (Throwable t) { - throw Throwables.propagate(t); - } - } - }, 5, TimeUnit.SECONDS); + _cqlStatement = Suppliers.memoize(this::prepareCqlStatement); + + // Rate limit health check calls to Cassandra. Because there are typically several Cassandra connections used + // by EmoDB randomize the cache refresh time to avoid requiring all Cassandra clusters to be queried on + // a single health check in the event health checks are performed with high frequency. + _resultCache = MoreSuppliers.memoizeWithRandomExpiration(this::pingAllUnchecked, 5, 10, TimeUnit.SECONDS); } public String getName() { @@ -51,18 +58,62 @@ public String getName() { @Override protected Result check() throws Exception { - OperationResult result = _pingCache.get(); - return Result.healthy( - result.getHost() + - " " + result.getLatency(TimeUnit.MICROSECONDS) + "us" + - (result.getAttemptsCount() != 1 ? ", " + result.getAttemptsCount() + " attempts" : "")); + return _resultCache.get(); + } + + private Result pingAllUnchecked() { + try { + // Get a random row to distribute queries among different servers in the ring. + ByteBuffer key = _keySupplier.get(); + StringBuilder message = new StringBuilder(); + + OperationResult astyanaxResult = pingAstyanax(key); + message.append("Astyanax: ").append(astyanaxResult.getHost()).append(" ") + .append(astyanaxResult.getLatency(TimeUnit.MICROSECONDS)).append("us"); + + if (astyanaxResult.getAttemptsCount() != 1) { + message.append(", ").append(astyanaxResult.getAttemptsCount()).append(" attempts"); + } + + ResultSet cqlResult = pingCql(key); + Host host = cqlResult.getExecutionInfo().getQueriedHost(); + QueryTrace trace = cqlResult.getExecutionInfo().getQueryTrace(); + + message.append(" | CQL: ").append(host).append(" -> ").append(trace.getCoordinator()).append(" ") + .append(trace.getDurationMicros()).append("us"); + + return Result.healthy(message.toString()); + } catch (Throwable t) { + throw Throwables.propagate(t); + } } - private OperationResult ping() throws Exception { - // Get a random row to distribute queries among different servers in the ring. + private OperationResult pingAstyanax(ByteBuffer key) throws Exception { // Use quorum consistency to ensure a minimum # of nodes are alive. return _keyspace.prepareQuery(_validationColumnFamily, CL_LOCAL_QUORUM) - .getKey(_keySupplier.get()) + .getKey(key) .execute(); } + + private PreparedStatement prepareCqlStatement() { + List metadata = _keyspace.getKeyspaceMetadata().getTable(_validationColumnFamily.getName()).getPartitionKey(); + String query = "select " + + Joiner.on(", ").join(metadata.stream().map(ColumnMetadata::getName).collect(Collectors.toList())) + + " from " + _validationColumnFamily.getName() + + " where " + + Joiner.on(" and ").join(metadata.stream().map((md) -> "token(" + md.getName() + ")=?").collect(Collectors.toList())) + + " limit 1"; + + return _keyspace.getCqlSession().prepare(query).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + } + + private ResultSet pingCql(ByteBuffer key) throws Exception { + PreparedStatement preparedStatement = _cqlStatement.get(); + BoundStatement statement = preparedStatement.bind(); + for (int i=0; i < preparedStatement.getVariables().size(); i++) { + statement.setBytesUnsafe(i, key); + } + + return _keyspace.getCqlSession().execute(statement.enableTracing()); + } } diff --git a/common/dropwizard/src/main/java/com/bazaarvoice/emodb/common/dropwizard/guava/MoreSuppliers.java b/common/dropwizard/src/main/java/com/bazaarvoice/emodb/common/dropwizard/guava/MoreSuppliers.java new file mode 100644 index 0000000000..ae8a8d1d12 --- /dev/null +++ b/common/dropwizard/src/main/java/com/bazaarvoice/emodb/common/dropwizard/guava/MoreSuppliers.java @@ -0,0 +1,90 @@ +package com.bazaarvoice.emodb.common.dropwizard.guava; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; + +import java.util.PrimitiveIterator; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Extension of {@link Suppliers}. + */ +public final class MoreSuppliers { + + private MoreSuppliers() { + // empty + } + + /** + * Memoization similar to {@link Suppliers#memoizeWithExpiration(Supplier, long, TimeUnit)} except the expiration + * is randomly selected to be a value within provided bounds. For example: + * + * + * Supplier<Object&t; supplier = MoreSuppliers.memoizeWithRandomExpiration(delegate, 5, 10, TimeUnit.SECONDS); + * + * + * returns a Supplier that will memoize the delegate's response for a random number of nanoseconds between 5 + * (inclusive) to 10 (exclusive) seconds. + * + * This is useful when numerous memoized values are typically computed independently but used within the same operation. + * If all values were computed and memoized together then each call that refreshes from delegate could take longer + * than desirable. Similarly, if each value were memomized independently using the same expiration then herding + * behavior would result in the same long calls at expiration time. A random expiration allows each value to be + * cached but spread out the refresh cycle so that it is unlikely that any single call will refresh the entire value + * set, so long as the call frequency is significantly below minDuration. + */ + public static Supplier memoizeWithRandomExpiration(Supplier delegate, long minDuration, long maxDuration, TimeUnit units) { + if (minDuration == maxDuration) { + // This case resolves to standard expiration + return Suppliers.memoizeWithExpiration(delegate, minDuration, units); + } + return new RandomExpirationSupplier(delegate, minDuration, maxDuration, units); + } + + /** + * Class modeled off of {@link Suppliers.ExpiringMemoizingSupplier} except the expiration time is randomly + * selected within the provided bounds for each iteration. + */ + private final static class RandomExpirationSupplier implements Supplier { + + private final Supplier _delegate; + private final PrimitiveIterator.OfLong _nanosDurations; + private transient volatile T _value; + private transient volatile long _expirationNanos = 0; + + RandomExpirationSupplier(Supplier delegate, long minDuration, long maxDuration, TimeUnit timeUnit) { + checkArgument(minDuration >= 0, "minDuration cannot be negative"); + checkArgument(maxDuration > minDuration, "maxDuration must be greater than minDuration"); + checkNotNull(delegate, "delegate"); + checkNotNull(timeUnit, "timeUnit"); + + long minDurationNanos = timeUnit.toNanos(minDuration); + long maxDurationNanos = timeUnit.toNanos(maxDuration); + + _delegate = delegate; + _nanosDurations = new Random().longs(minDurationNanos, maxDurationNanos).iterator(); + } + + @Override + public T get() { + long expirationNanos = _expirationNanos; + long now = System.nanoTime(); + + if (expirationNanos == 0 || now - expirationNanos >= 0) { + synchronized(this) { + if (expirationNanos == _expirationNanos) { + _value = _delegate.get(); + expirationNanos = now + _nanosDurations.nextLong(); + } + _expirationNanos = expirationNanos == 0 ? 1 : expirationNanos; + } + } + + return _value; + } + } +}