Skip to content

Commit

Permalink
[Backport 2.x] Add cluster state stats (#10904)
Browse files Browse the repository at this point in the history
* Add cluster state stats (#10670)
* Add cluster state update stats along with remote upload stats around success/ failure, latency metric
* Change backward compatibility version to 2.12 for 2.x merge

Signed-off-by: Aman Khare <[email protected]>
(cherry picked from commit 54e74a8)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
amkhar authored Oct 25, 2023
1 parent db18e84 commit a08cca1
Show file tree
Hide file tree
Showing 18 changed files with 511 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add search query categorizor ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- Added cluster setting cluster.restrict.index.replication_type to restrict setting of index setting replication type ([#10866](https://github.com/opensearch-project/OpenSearch/pull/10866))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))

### Dependencies
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.NoClusterManagerBlockService;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterStateStats;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.MediaTypeRegistry;
Expand Down Expand Up @@ -195,6 +196,8 @@ public void testIsolateClusterManagerAndVerifyClusterStateConsensus() throws Exc
}

}
ClusterStateStats clusterStateStats = internalCluster().clusterService().getClusterManagerService().getClusterStateStats();
assertTrue(clusterStateStats.getUpdateFailed() > 0);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -19,6 +22,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
Expand Down Expand Up @@ -94,6 +98,45 @@ public void testFullClusterRestoreStaleDelete() throws Exception {
assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
}

public void testRemoteStateStats() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
String clusterManagerNode = internalCluster().getClusterManagerName();
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);

// Fetch _nodes/stats
NodesStatsResponse nodesStatsResponse = client().admin()
.cluster()
.prepareNodesStats(clusterManagerNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.get();

// assert cluster state stats
DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();

assertNotNull(discoveryStats.getClusterStateStats());
assertTrue(discoveryStats.getClusterStateStats().getUpdateSuccess() > 1);
assertEquals(0, discoveryStats.getClusterStateStats().getUpdateFailed());
assertTrue(discoveryStats.getClusterStateStats().getUpdateTotalTimeInMillis() > 0);
// assert remote state stats
assertTrue(discoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 1);
assertEquals(0, discoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount());
assertTrue(discoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0);

NodesStatsResponse nodesStatsResponseDataNode = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.get();
// assert cluster state stats for data node
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponseDataNode.getNodes().get(0).getDiscoveryStats();
assertNotNull(dataNodeDiscoveryStats.getClusterStateStats());
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess());
}

private void setReplicaCount(int replicaCount) {
client().admin()
.indices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,12 @@ public interface PersistedState extends Closeable {
*/
void setLastAcceptedState(ClusterState clusterState);

/**
* Returns the stats for the persistence layer for {@link CoordinationState}.
* @return PersistedStateStats
*/
PersistedStateStats getStats();

/**
* Marks the last accepted cluster state as committed.
* After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.cluster.service.ClusterApplier;
import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.cluster.service.ClusterStateStats;
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -866,7 +867,16 @@ protected void doStart() {

@Override
public DiscoveryStats stats() {
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats());
ClusterStateStats clusterStateStats = clusterManagerService.getClusterStateStats();
ArrayList<PersistedStateStats> stats = new ArrayList<>();
Stream.of(PersistedStateRegistry.PersistedStateType.values()).forEach(stateType -> {
if (persistedStateRegistry.getPersistedState(stateType) != null
&& persistedStateRegistry.getPersistedState(stateType).getStats() != null) {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
clusterStateStats.setPersistenceStats(stats);
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public void setLastAcceptedState(ClusterState clusterState) {
this.acceptedState = clusterState;
}

@Override
public PersistedStateStats getStats() {
return null;
}

@Override
public long getCurrentTerm() {
return currentTerm;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Persisted cluster state related stats.
*
* @opensearch.internal
*/
public class PersistedStateStats implements Writeable, ToXContentObject {
private String statsName;
private AtomicLong totalTimeInMillis = new AtomicLong(0);
private AtomicLong failedCount = new AtomicLong(0);
private AtomicLong successCount = new AtomicLong(0);
private Map<String, AtomicLong> extendedFields = new HashMap<>(); // keeping minimal extensibility

public PersistedStateStats(String statsName) {
this.statsName = statsName;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(successCount.get());
out.writeVLong(failedCount.get());
out.writeVLong(totalTimeInMillis.get());
if (extendedFields.size() > 0) {
out.writeBoolean(true);
out.writeVInt(extendedFields.size());
for (Map.Entry<String, AtomicLong> extendedField : extendedFields.entrySet()) {
out.writeString(extendedField.getKey());
out.writeVLong(extendedField.getValue().get());
}
} else {
out.writeBoolean(false);
}
}

public PersistedStateStats(StreamInput in) throws IOException {
this.successCount = new AtomicLong(in.readVLong());
this.failedCount = new AtomicLong(in.readVLong());
this.totalTimeInMillis = new AtomicLong(in.readVLong());
if (in.readBoolean()) {
int extendedFieldsSize = in.readVInt();
this.extendedFields = new HashMap<>();
for (int fieldNumber = 0; fieldNumber < extendedFieldsSize; fieldNumber++) {
extendedFields.put(in.readString(), new AtomicLong(in.readVLong()));
}
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(statsName);
builder.field(Fields.SUCCESS_COUNT, getSuccessCount());
builder.field(Fields.FAILED_COUNT, getFailedCount());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, getTotalTimeInMillis());
if (extendedFields.size() > 0) {
for (Map.Entry<String, AtomicLong> extendedField : extendedFields.entrySet()) {
builder.field(extendedField.getKey(), extendedField.getValue().get());
}
}
builder.endObject();
return builder;
}

public void stateFailed() {
failedCount.incrementAndGet();
}

public void stateSucceeded() {
successCount.incrementAndGet();
}

/**
* Expects user to send time taken in milliseconds.
*
* @param timeTakenInUpload time taken in uploading the cluster state to remote
*/
public void stateTook(long timeTakenInUpload) {
totalTimeInMillis.addAndGet(timeTakenInUpload);
}

public long getTotalTimeInMillis() {
return totalTimeInMillis.get();
}

public long getFailedCount() {
return failedCount.get();
}

public long getSuccessCount() {
return successCount.get();
}

protected void addToExtendedFields(String extendedField, AtomicLong extendedFieldValue) {
this.extendedFields.put(extendedField, extendedFieldValue);
}

/**
* Fields for parsing and toXContent
*
* @opensearch.internal
*/
static final class Fields {
static final String SUCCESS_COUNT = "success_count";
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
static final String FAILED_COUNT = "failed_count";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.opensearch.cluster.coordination.PersistedStateStats;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
* Cluster state related stats.
*
* @opensearch.internal
*/
public class ClusterStateStats implements Writeable, ToXContentObject {

private AtomicLong updateSuccess = new AtomicLong(0);
private AtomicLong updateTotalTimeInMillis = new AtomicLong(0);
private AtomicLong updateFailed = new AtomicLong(0);
private List<PersistedStateStats> persistenceStats = new ArrayList<>();

public ClusterStateStats() {}

public long getUpdateSuccess() {
return updateSuccess.get();
}

public long getUpdateTotalTimeInMillis() {
return updateTotalTimeInMillis.get();
}

public long getUpdateFailed() {
return updateFailed.get();
}

public List<PersistedStateStats> getPersistenceStats() {
return persistenceStats;
}

public void stateUpdated() {
updateSuccess.incrementAndGet();
}

public void stateUpdateFailed() {
updateFailed.incrementAndGet();
}

public void stateUpdateTook(long stateUpdateTime) {
updateTotalTimeInMillis.addAndGet(stateUpdateTime);
}

public ClusterStateStats setPersistenceStats(List<PersistedStateStats> persistenceStats) {
this.persistenceStats = persistenceStats;
return this;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(updateSuccess.get());
out.writeVLong(updateTotalTimeInMillis.get());
out.writeVLong(updateFailed.get());
out.writeVInt(persistenceStats.size());
for (PersistedStateStats stats : persistenceStats) {
stats.writeTo(out);
}
}

public ClusterStateStats(StreamInput in) throws IOException {
this.updateSuccess = new AtomicLong(in.readVLong());
this.updateTotalTimeInMillis = new AtomicLong(in.readVLong());
this.updateFailed = new AtomicLong(in.readVLong());
int persistedStatsSize = in.readVInt();
this.persistenceStats = new ArrayList<>();
for (int statsNumber = 0; statsNumber < persistedStatsSize; statsNumber++) {
PersistedStateStats stats = new PersistedStateStats(in);
this.persistenceStats.add(stats);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.CLUSTER_STATE_STATS);
builder.startObject(Fields.OVERALL);
builder.field(Fields.UPDATE_COUNT, getUpdateSuccess());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, getUpdateTotalTimeInMillis());
builder.field(Fields.FAILED_COUNT, getUpdateFailed());
builder.endObject();
for (PersistedStateStats stats : persistenceStats) {
stats.toXContent(builder, params);
}
builder.endObject();
return builder;
}

/**
* Fields for parsing and toXContent
*
* @opensearch.internal
*/
static final class Fields {
static final String CLUSTER_STATE_STATS = "cluster_state_stats";
static final String OVERALL = "overall";
static final String UPDATE_COUNT = "update_count";
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
static final String FAILED_COUNT = "failed_count";
}
}
Loading

0 comments on commit a08cca1

Please sign in to comment.