Skip to content

Commit

Permalink
RATIS-1998. Add watch request metrics (apache#1009)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivandika3 authored Jan 13, 2024
1 parent 84285d3 commit 59dfd35
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 29 deletions.
53 changes: 28 additions & 25 deletions ratis-docs/src/site/markdown/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,31 +80,34 @@

### Raft Server Metrics

| Application | Component | Name | Type | Description |
|-------------|-----------|----------------------------------|---------|---------------------------------------------------------------------|
| ratis | server | {peer}_lastHeartbeatElapsedTime | Gauge | Time elapsed since last heartbeat rpc response |
| ratis | server | follower_append_entry_latency | Timer | Time taken for followers to append log entries |
| ratis | server | {peer}_peerCommitIndex | Gauge | Commit index of peer |
| ratis | server | clientReadRequest | Timer | Time taken to process read requests from client |
| ratis | server | clientStaleReadRequest | Timer | Time taken to process stale-read requests from client |
| ratis | server | clientWriteRequest | Timer | Time taken to process write requests from client |
| ratis | server | clientWatch{level}Request | Timer | Time taken to process watch(replication_level) requests from client |
| ratis | server | numRequestQueueLimitHits | Counter | Number of (total client requests in queue) limit hits |
| ratis | server | numRequestsByteSizeLimitHits | Counter | Number of (total size of client requests in queue) limit hits |
| ratis | server | numResourceLimitHits | Counter | Sum of numRequestQueueLimitHits and numRequestsByteSizeLimitHits |
| ratis | server | numPendingRequestInQueue | Gauge | Number of pending client requests in queue |
| ratis | server | numPendingRequestMegaByteSize | Gauge | Total size of pending client requests in queue |
| ratis | server | retryCacheEntryCount | Gauge | Number of entries in retry cache |
| ratis | server | retryCacheHitCount | Gauge | Number of retry cache hits |
| ratis | server | retryCacheHitRate | Gauge | Retry cache hit rate |
| ratis | server | retryCacheMissCount | Gauge | Number of retry cache misses |
| ratis | server | retryCacheMissRate | Gauge | Retry cache miss rate |
| ratis | server | numFailedClientStaleReadOnServer | Counter | Number of failed stale-read requests |
| ratis | server | numFailedClientReadOnServer | Counter | Number of failed read requests |
| ratis | server | numFailedClientWriteOnServer | Counter | Number of failed write requests |
| ratis | server | numFailedClientWatchOnServer | Counter | Number of failed watch requests |
| ratis | server | numFailedClientStreamOnServer | Counter | Number of failed stream requests |
| ratis | server | numInstallSnapshot | Counter | Number of install-snapshot requests |
| Application | Component | Name | Type | Description |
|-------------|-----------|--------------------------------------|---------|---------------------------------------------------------------------|
| ratis | server | {peer}_lastHeartbeatElapsedTime | Gauge | Time elapsed since last heartbeat rpc response |
| ratis | server | follower_append_entry_latency | Timer | Time taken for followers to append log entries |
| ratis | server | {peer}_peerCommitIndex | Gauge | Commit index of peer |
| ratis | server | clientReadRequest | Timer | Time taken to process read requests from client |
| ratis | server | clientStaleReadRequest | Timer | Time taken to process stale-read requests from client |
| ratis | server | clientWriteRequest | Timer | Time taken to process write requests from client |
| ratis | server | clientWatch{level}Request | Timer | Time taken to process watch(replication_level) requests from client |
| ratis | server | numRequestQueueLimitHits | Counter | Number of (total client requests in queue) limit hits |
| ratis | server | numRequestsByteSizeLimitHits | Counter | Number of (total size of client requests in queue) limit hits |
| ratis | server | numResourceLimitHits | Counter | Sum of numRequestQueueLimitHits and numRequestsByteSizeLimitHits |
| ratis | server | numPendingRequestInQueue | Gauge | Number of pending client requests in queue |
| ratis | server | numPendingRequestMegaByteSize | Gauge | Total size of pending client requests in queue |
| ratis | server | retryCacheEntryCount | Gauge | Number of entries in retry cache |
| ratis | server | retryCacheHitCount | Gauge | Number of retry cache hits |
| ratis | server | retryCacheHitRate | Gauge | Retry cache hit rate |
| ratis | server | retryCacheMissCount | Gauge | Number of retry cache misses |
| ratis | server | retryCacheMissRate | Gauge | Retry cache miss rate |
| ratis | server | numFailedClientStaleReadOnServer | Counter | Number of failed stale-read requests |
| ratis | server | numFailedClientReadOnServer | Counter | Number of failed read requests |
| ratis | server | numFailedClientWriteOnServer | Counter | Number of failed write requests |
| ratis | server | numFailedClientWatchOnServer | Counter | Number of failed watch requests |
| ratis | server | numFailedClientStreamOnServer | Counter | Number of failed stream requests |
| ratis | server | numInstallSnapshot | Counter | Number of install-snapshot requests |
| ratis | server | numWatch{level}RequestTimeout | Counter | Number of watch(replication_level) request timeout |
| ratis | server | numWatch{level}RequestInQueue | Gauge | Number of watch(replication_level) requests in queue |
| ratis | server | numWatch{level}RequestQueueLimitHits | Counter | Number of (total watch request in queue) limit hits |


## Ratis Netty Metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ protected static <T extends Enum<T>> Map<T, Map<String, LongCounter>> newCounter
return Collections.unmodifiableMap(maps);
}

protected static <T extends Enum<T>> Map<T, LongCounter> newCounterMap(
Class<T> clazz, Function<T, LongCounter> constructor) {
final EnumMap<T, LongCounter> map = new EnumMap<>(clazz);
Arrays.stream(clazz.getEnumConstants()).forEach(t -> map.put(t, constructor.apply(t)));
return Collections.unmodifiableMap(map);
}

protected static <T extends Enum<T>> Map<T, Timekeeper> newTimerMap(
Class<T> clazz, Function<T, Timekeeper> constructor) {
final EnumMap<T, Timekeeper> map = new EnumMap<>(clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ boolean isApplied(LogEntryProto logEntry) {
raftServerMetrics = server.getRaftServerMetrics();
logAppenderMetrics = new LogAppenderMetrics(server.getMemberId());
this.pendingRequests = new PendingRequests(server.getMemberId(), properties, raftServerMetrics);
this.watchRequests = new WatchRequests(server.getMemberId(), properties);
this.watchRequests = new WatchRequests(server.getMemberId(), properties, raftServerMetrics);
this.messageStreamRequests = new MessageStreamRequests(server.getMemberId());
this.pendingStepDown = new PendingStepDown(this);
this.readIndexHeartbeats = new ReadIndexHeartbeats();
Expand Down Expand Up @@ -457,6 +457,7 @@ CompletableFuture<Void> stop() {
logAppenderMetrics.unregister();
raftServerMetrics.unregister();
pendingRequests.close();
watchRequests.close();
return f;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -75,11 +76,15 @@ private class WatchQueue {
private final SortedMap<PendingWatch, PendingWatch> q = new TreeMap<>(
Comparator.comparingLong(PendingWatch::getIndex).thenComparing(PendingWatch::getCreationTime));
private final ResourceSemaphore resource;
private final RaftServerMetricsImpl raftServerMetrics;
private volatile long index; //Invariant: q.isEmpty() or index < any element q

WatchQueue(ReplicationLevel replication, int elementLimit) {
WatchQueue(ReplicationLevel replication, int elementLimit, RaftServerMetricsImpl raftServerMetrics) {
this.replication = replication;
this.resource = new ResourceSemaphore(elementLimit);
this.raftServerMetrics = raftServerMetrics;

raftServerMetrics.addNumPendingWatchRequestsGauge(resource::used, replication);
}

long getIndex() {
Expand All @@ -103,6 +108,7 @@ CompletableFuture<Long> add(RaftClientRequest request) {

if (computed == null) {
// failed to acquire
raftServerMetrics.onWatchRequestQueueLimitHit(replication);
return JavaUtils.completeExceptionally(new ResourceUnavailableException(
"Failed to acquire a pending watch request in " + name + " for " + request));
}
Expand All @@ -123,6 +129,7 @@ void handleTimeout(RaftClientRequest request, PendingWatch pending) {
pending.getFuture().completeExceptionally(
new NotReplicatedException(request.getCallId(), replication, pending.getIndex()));
LOG.debug("{}: timeout {}, {}", name, pending, request);
raftServerMetrics.onWatchRequestTimeout(replication);
}
}

Expand Down Expand Up @@ -162,6 +169,12 @@ synchronized void failAll(Exception e) {
q.clear();
resource.close();
}

void close() {
if (raftServerMetrics != null) {
raftServerMetrics.removeNumPendingWatchRequestsGauge(replication);
}
}
}

private final String name;
Expand All @@ -171,7 +184,7 @@ synchronized void failAll(Exception e) {
private final TimeDuration watchTimeoutDenominationNanos;
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();

WatchRequests(Object name, RaftProperties properties) {
WatchRequests(Object name, RaftProperties properties, RaftServerMetricsImpl raftServerMetrics) {
this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());

final TimeDuration watchTimeout = RaftServerConfigKeys.Watch.timeout(properties);
Expand All @@ -183,7 +196,8 @@ synchronized void failAll(Exception e) {
+ watchTimeoutDenomination + ").");

final int elementLimit = RaftServerConfigKeys.Watch.elementLimit(properties);
Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new WatchQueue(r, elementLimit)));
Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r,
new WatchQueue(r, elementLimit, raftServerMetrics)));
}

CompletableFuture<Long> add(RaftClientRequest request) {
Expand All @@ -207,4 +221,8 @@ void update(ReplicationLevel replication, final long newIndex) {
void failWatches(Exception e) {
queues.values().forEach(q -> q.failAll(e));
}

void close() {
queues.values().forEach(WatchQueue::close);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,14 @@ public final class RaftServerMetricsImpl extends RatisMetrics implements RaftSer
public static final String REQUEST_QUEUE_LIMIT_HIT_COUNTER = "numRequestQueueLimitHits";
public static final String REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER = "numRequestsByteSizeLimitHits";
public static final String RESOURCE_LIMIT_HIT_COUNTER = "numResourceLimitHits";
public static final String WATCH_REQUEST_QUEUE_LIMIT_HIT_COUNTER = "numWatch%sRequestQueueLimitHits";

public static final String REQUEST_QUEUE_SIZE = "numPendingRequestInQueue";
public static final String REQUEST_MEGA_BYTE_SIZE = "numPendingRequestMegaByteSize";

public static final String WATCH_REQUEST_QUEUE_SIZE = "numWatch%sRequestInQueue";
public static final String WATCH_REQUEST_TIMEOUT_COUNTER = "numWatch%sRequestTimeout";

public static final String RETRY_CACHE_ENTRY_COUNT_METRIC = "retryCacheEntryCount";
public static final String RETRY_CACHE_HIT_COUNT_METRIC = "retryCacheHitCount";
public static final String RETRY_CACHE_HIT_RATE_METRIC = "retryCacheHitRate";
Expand All @@ -76,6 +81,11 @@ public final class RaftServerMetricsImpl extends RatisMetrics implements RaftSer
private final LongCounter numRequestQueueLimitHits = getRegistry().counter(REQUEST_QUEUE_LIMIT_HIT_COUNTER);
private final LongCounter numRequestsByteSizeLimitHits = getRegistry().counter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER);
private final LongCounter numResourceLimitHits = getRegistry().counter(RESOURCE_LIMIT_HIT_COUNTER);
private final Map<ReplicationLevel, LongCounter> numWatchRequestQueueLimitHits = newCounterMap(ReplicationLevel.class,
replication -> getRegistry().counter(
String.format(WATCH_REQUEST_QUEUE_LIMIT_HIT_COUNTER, Type.toString(replication))));
private final Map<ReplicationLevel, LongCounter> numWatchRequestsTimeout = newCounterMap(ReplicationLevel.class,
replication -> getRegistry().counter(String.format(WATCH_REQUEST_TIMEOUT_COUNTER, Type.toString(replication))));

private final LongCounter numFailedClientStaleRead
= getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT);
Expand Down Expand Up @@ -150,6 +160,14 @@ public LongCounter getNumInstallSnapshot() {
return numInstallSnapshot;
}

public LongCounter getNumWatchRequestQueueLimitHits(ReplicationLevel replication) {
return numWatchRequestQueueLimitHits.get(replication);
}

public LongCounter getNumWatchRequestsTimeout(ReplicationLevel replication) {
return numWatchRequestsTimeout.get(replication);
}

private static RatisMetricRegistry createRegistry(String serverId) {
return create(new MetricRegistryInfo(serverId,
RATIS_APPLICATION_NAME_METRICS, RATIS_SERVER_METRICS,
Expand Down Expand Up @@ -237,6 +255,22 @@ public boolean removeNumPendingRequestsByteSize() {
return getRegistry().remove(REQUEST_MEGA_BYTE_SIZE);
}

public void onWatchRequestQueueLimitHit(ReplicationLevel replicationLevel) {
numWatchRequestQueueLimitHits.get(replicationLevel).inc();
}

public void onWatchRequestTimeout(ReplicationLevel replicationLevel) {
numWatchRequestsTimeout.get(replicationLevel).inc();
}

public void addNumPendingWatchRequestsGauge(Supplier<Integer> queueSize, ReplicationLevel replication) {
getRegistry().gauge(String.format(WATCH_REQUEST_QUEUE_SIZE, Type.toString(replication)), () -> queueSize);
}

public boolean removeNumPendingWatchRequestsGauge(ReplicationLevel replication) {
return getRegistry().remove(String.format(WATCH_REQUEST_QUEUE_SIZE, Type.toString(replication)));
}

public void onRequestByteSizeLimitHit() {
numRequestsByteSizeLimitHits.inc();
}
Expand Down
57 changes: 57 additions & 0 deletions ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys.Watch;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.Slf4jUtils;
Expand All @@ -47,12 +49,14 @@
import org.slf4j.event.Level;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.Assert.fail;
Expand Down Expand Up @@ -469,6 +473,59 @@ static void runTestWatchRequestClientTimeout(TestParameters p) throws Exception
}
}

@Test
public void testWatchMetrics() throws Exception {
final RaftProperties p = getProperties();
RaftServerConfigKeys.Watch.setElementLimit(p, 10);
RaftServerConfigKeys.Watch.setTimeout(p, TimeDuration.valueOf(2, TimeUnit.SECONDS));
try {
runWithNewCluster(NUM_SERVERS,
cluster -> runSingleTest(WatchRequestTests::runTestWatchMetrics, cluster, LOG));
} finally {
RaftServerConfigKeys.Watch.setElementLimit(p, Watch.ELEMENT_LIMIT_DEFAULT);
RaftServerConfigKeys.Watch.setTimeout(p, RaftServerConfigKeys.Watch.TIMEOUT_DEFAULT);
}
}

static RaftServerMetricsImpl getRaftServerMetrics(RaftServer.Division division) {
return (RaftServerMetricsImpl) division.getRaftServerMetrics();
}

static void runTestWatchMetrics(TestParameters p) throws Exception {
final MiniRaftCluster cluster = p.cluster;

List<RaftClient> clients = new ArrayList<>();

final ReplicationLevel replicationLevel = ReplicationLevel.MAJORITY;
try {
long initialWatchRequestTimeoutCount = getRaftServerMetrics(cluster.getLeader())
.getNumWatchRequestsTimeout(replicationLevel).getCount();
long initialLimitHit = getRaftServerMetrics(cluster.getLeader())
.getNumWatchRequestQueueLimitHits(replicationLevel).getCount();

int uncommittedBaseIndex = 10000;
// Logs with indices 10001 - 10011 will never be committed, so it should fail with NotReplicatedException
for (int i = 1; i <= 11; i++) {
RaftClient client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
clients.add(client);
client.async().watch(uncommittedBaseIndex + i, replicationLevel);
}

// All the watch timeout for each unique index should increment the metric
RaftTestUtil.waitFor(() -> getRaftServerMetrics(cluster.getLeader())
.getNumWatchRequestsTimeout(replicationLevel).getCount() == initialWatchRequestTimeoutCount + 10,
300, 5000);
// There are 11 pending watch request, but the pending watch request limit is 10
RaftTestUtil.waitFor(() -> getRaftServerMetrics(cluster.getLeader())
.getNumWatchRequestQueueLimitHits(replicationLevel).getCount() ==
initialLimitHit + 1, 300, 5000);
} finally {
for(RaftClient client : clients) {
client.close();
}
}
}

static void checkTimeout(List<CompletableFuture<RaftClientReply>> replies,
List<CompletableFuture<WatchReplies>> watches, Logger LOG) throws Exception {
for(int i = 0; i < replies.size(); i++) {
Expand Down

0 comments on commit 59dfd35

Please sign in to comment.