Skip to content

Commit

Permalink
Changed health check to verify both Astyanax and CQL drivers are conn…
Browse files Browse the repository at this point in the history
…ected
  • Loading branch information
billkalter committed Oct 17, 2016
1 parent 8ea8b52 commit e0c9a5c
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
package com.bazaarvoice.emodb.common.cassandra.health;

import com.bazaarvoice.emodb.common.cassandra.CassandraKeyspace;
import com.bazaarvoice.emodb.common.dropwizard.guava.MoreSuppliers;
import com.codahale.metrics.health.HealthCheck;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.QueryTrace;
import com.datastax.driver.core.ResultSet;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
Expand All @@ -10,7 +19,9 @@
import com.netflix.astyanax.serializers.ByteBufferSerializer;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.netflix.astyanax.model.ConsistencyLevel.CL_LOCAL_QUORUM;
Expand All @@ -22,7 +33,8 @@ public class CassandraHealthCheck extends HealthCheck {
private final CassandraKeyspace _keyspace;
private final ColumnFamily<ByteBuffer, ByteBuffer> _validationColumnFamily;
private final Supplier<ByteBuffer> _keySupplier;
private final Supplier<OperationResult<?>> _pingCache;
private final Supplier<Result> _resultCache;
private final Supplier<PreparedStatement> _cqlStatement;

public CassandraHealthCheck(CassandraKeyspace keyspace, String validationColumnFamily,
Supplier<ByteBuffer> keySupplier) {
Expand All @@ -32,17 +44,12 @@ public CassandraHealthCheck(CassandraKeyspace keyspace, String validationColumnF
ByteBufferSerializer.get(), ByteBufferSerializer.get());
_keySupplier = checkNotNull(keySupplier, "keySupplier");

// Rate limit health check calls to Cassandra.
_pingCache = Suppliers.memoizeWithExpiration(new Supplier<OperationResult<?>>() {
@Override
public OperationResult<?> get() {
try {
return ping();
} catch (Throwable t) {
throw Throwables.propagate(t);
}
}
}, 5, TimeUnit.SECONDS);
_cqlStatement = Suppliers.memoize(this::prepareCqlStatement);

// Rate limit health check calls to Cassandra. Because there are typically several Cassandra connections used
// by EmoDB randomize the cache refresh time to avoid requiring all Cassandra clusters to be queried on
// a single health check in the event health checks are performed with high frequency.
_resultCache = MoreSuppliers.memoizeWithRandomExpiration(this::pingAllUnchecked, 5, 10, TimeUnit.SECONDS);
}

public String getName() {
Expand All @@ -51,18 +58,62 @@ public String getName() {

@Override
protected Result check() throws Exception {
OperationResult<?> result = _pingCache.get();
return Result.healthy(
result.getHost() +
" " + result.getLatency(TimeUnit.MICROSECONDS) + "us" +
(result.getAttemptsCount() != 1 ? ", " + result.getAttemptsCount() + " attempts" : ""));
return _resultCache.get();
}

private Result pingAllUnchecked() {
try {
// Get a random row to distribute queries among different servers in the ring.
ByteBuffer key = _keySupplier.get();
StringBuilder message = new StringBuilder();

OperationResult<?> astyanaxResult = pingAstyanax(key);
message.append("Astyanax: ").append(astyanaxResult.getHost()).append(" ")
.append(astyanaxResult.getLatency(TimeUnit.MICROSECONDS)).append("us");

if (astyanaxResult.getAttemptsCount() != 1) {
message.append(", ").append(astyanaxResult.getAttemptsCount()).append(" attempts");
}

ResultSet cqlResult = pingCql(key);
Host host = cqlResult.getExecutionInfo().getQueriedHost();
QueryTrace trace = cqlResult.getExecutionInfo().getQueryTrace();

message.append(" | CQL: ").append(host).append(" -> ").append(trace.getCoordinator()).append(" ")
.append(trace.getDurationMicros()).append("us");

return Result.healthy(message.toString());
} catch (Throwable t) {
throw Throwables.propagate(t);
}
}

private OperationResult<?> ping() throws Exception {
// Get a random row to distribute queries among different servers in the ring.
private OperationResult<?> pingAstyanax(ByteBuffer key) throws Exception {
// Use quorum consistency to ensure a minimum # of nodes are alive.
return _keyspace.prepareQuery(_validationColumnFamily, CL_LOCAL_QUORUM)
.getKey(_keySupplier.get())
.getKey(key)
.execute();
}

private PreparedStatement prepareCqlStatement() {
List<ColumnMetadata> metadata = _keyspace.getKeyspaceMetadata().getTable(_validationColumnFamily.getName()).getPartitionKey();
String query = "select " +
Joiner.on(", ").join(metadata.stream().map(ColumnMetadata::getName).collect(Collectors.toList())) +
" from " + _validationColumnFamily.getName() +
" where " +
Joiner.on(" and ").join(metadata.stream().map((md) -> "token(" + md.getName() + ")=?").collect(Collectors.toList())) +
" limit 1";

return _keyspace.getCqlSession().prepare(query).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
}

private ResultSet pingCql(ByteBuffer key) throws Exception {
PreparedStatement preparedStatement = _cqlStatement.get();
BoundStatement statement = preparedStatement.bind();
for (int i=0; i < preparedStatement.getVariables().size(); i++) {
statement.setBytesUnsafe(i, key);
}

return _keyspace.getCqlSession().execute(statement.enableTracing());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.bazaarvoice.emodb.common.dropwizard.guava;

import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;

import java.util.PrimitiveIterator;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
* Extension of {@link Suppliers}.
*/
public final class MoreSuppliers {

private MoreSuppliers() {
// empty
}

/**
* Memoization similar to {@link Suppliers#memoizeWithExpiration(Supplier, long, TimeUnit)} except the expiration
* is randomly selected to be a value within provided bounds. For example:
*
* <code>
* Supplier&lt;Object&t; supplier = MoreSuppliers.memoizeWithRandomExpiration(delegate, 5, 10, TimeUnit.SECONDS);
* </code>
*
* returns a Supplier that will memoize the delegate's response for a random number of nanoseconds between 5
* (inclusive) to 10 (exclusive) seconds.
*
* This is useful when numerous memoized values are typically computed independently but used within the same operation.
* If all values were computed and memoized together then each call that refreshes from delegate could take longer
* than desirable. Similarly, if each value were memomized independently using the same expiration then herding
* behavior would result in the same long calls at expiration time. A random expiration allows each value to be
* cached but spread out the refresh cycle so that it is unlikely that any single call will refresh the entire value
* set, so long as the call frequency is significantly below <code>minDuration</code>.
*/
public static <T> Supplier<T> memoizeWithRandomExpiration(Supplier<T> delegate, long minDuration, long maxDuration, TimeUnit units) {
if (minDuration == maxDuration) {
// This case resolves to standard expiration
return Suppliers.memoizeWithExpiration(delegate, minDuration, units);
}
return new RandomExpirationSupplier<T>(delegate, minDuration, maxDuration, units);
}

/**
* Class modeled off of {@link Suppliers.ExpiringMemoizingSupplier} except the expiration time is randomly
* selected within the provided bounds for each iteration.
*/
private final static class RandomExpirationSupplier<T> implements Supplier<T> {

private final Supplier<T> _delegate;
private final PrimitiveIterator.OfLong _nanosDurations;
private transient volatile T _value;
private transient volatile long _expirationNanos = 0;

RandomExpirationSupplier(Supplier<T> delegate, long minDuration, long maxDuration, TimeUnit timeUnit) {
checkArgument(minDuration >= 0, "minDuration cannot be negative");
checkArgument(maxDuration > minDuration, "maxDuration must be greater than minDuration");
checkNotNull(delegate, "delegate");
checkNotNull(timeUnit, "timeUnit");

long minDurationNanos = timeUnit.toNanos(minDuration);
long maxDurationNanos = timeUnit.toNanos(maxDuration);

_delegate = delegate;
_nanosDurations = new Random().longs(minDurationNanos, maxDurationNanos).iterator();
}

@Override
public T get() {
long expirationNanos = _expirationNanos;
long now = System.nanoTime();

if (expirationNanos == 0 || now - expirationNanos >= 0) {
synchronized(this) {
if (expirationNanos == _expirationNanos) {
_value = _delegate.get();
expirationNanos = now + _nanosDurations.nextLong();
}
_expirationNanos = expirationNanos == 0 ? 1 : expirationNanos;
}
}

return _value;
}
}
}

0 comments on commit e0c9a5c

Please sign in to comment.