Skip to content

Commit

Permalink
Optimize NodeIndicesStats output behind flag
Browse files Browse the repository at this point in the history
Signed-off-by: Pranshu Shukla <[email protected]>
  • Loading branch information
Pranshu-S committed Jul 1, 2024
1 parent 93d507a commit f6d3b58
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.opensearch.ExceptionsHelper;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
Expand All @@ -19,6 +21,7 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
Expand All @@ -31,6 +34,7 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -242,6 +246,47 @@ public void testNodeIndicesStatsDocStatusStatsCreateDeleteUpdate() {
}
}
}
public void testNodeIndicesStatsOptimisedResponse() {
internalCluster().startNode();
ensureGreen();
String indexName = "test1";
index(indexName, "type", "1", "f", "f");
refresh();
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();

NodesStatsResponse response = client().admin().cluster().prepareNodesStats().get();
response.getNodes().forEach(nodeStats -> {
assertNotNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex()));
assertNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex()));
});
CommonStatsFlags commonStatsFlags = new CommonStatsFlags();
commonStatsFlags.optimizeNodeIndicesStatsOnLevel(true);
response = client().admin().cluster().prepareNodesStats().setIndices(commonStatsFlags).get();
response.getNodes().forEach(nodeStats -> {
assertNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex()));
assertNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex()));

});
ArrayList<String> level_arg = new ArrayList<>();
level_arg.add("indices");
commonStatsFlags.setLevels(level_arg.toArray(new String[0]));
response = client().admin().cluster().prepareNodesStats().setIndices(commonStatsFlags).get();
response.getNodes().forEach(nodeStats -> {
assertNotNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex()));
assertNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex()));
});

level_arg.clear();
level_arg.add("shards");
commonStatsFlags.setLevels(level_arg.toArray(new String[0]));
response = client().admin().cluster().prepareNodesStats().setIndices(commonStatsFlags).get();
response.getNodes().forEach(nodeStats -> {
assertNotNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex()));
assertNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex()));
});
}



private void assertDocStatusStats() {
DocStatusStats docStatusStats = client().admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@ public NodesStatsRequest indices(boolean indices) {
return this;
}

/**
* Use Optimized Response filtered based on level
*/
public NodesStatsRequest useOptimizedNodeIndicesStats(boolean useOptimizedNodeIndicesStats){
if (this.indices!=null) {
this.indices.optimizeNodeIndicesStatsOnLevel(true);
}
return this;
}

/**
* Get the names of requested metrics, excluding indices, which are
* handled separately.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class CommonStatsFlags implements Writeable, Cloneable {
// Used for metric CACHE_STATS, to determine which caches to report stats for
private EnumSet<CacheType> includeCaches = EnumSet.noneOf(CacheType.class);
private String[] levels = new String[0];
private boolean optimizeNodeIndicesStatsOnLevel = false;


/**
* @param flags flags to set. If no flags are supplied, default flags will be set.
Expand Down Expand Up @@ -100,6 +102,9 @@ public CommonStatsFlags(StreamInput in) throws IOException {
includeCaches = in.readEnumSet(CacheType.class);
levels = in.readStringArray();
}
if (in.getVersion().onOrAfter(Version.V_2_15_0)) {
optimizeNodeIndicesStatsOnLevel = in.readBoolean();
}
}

@Override
Expand All @@ -124,6 +129,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeEnumSet(includeCaches);
out.writeStringArrayNullable(levels);
}
if (out.getVersion().onOrAfter(Version.V_2_15_0)){
out.writeBoolean(optimizeNodeIndicesStatsOnLevel);
}
}

/**
Expand Down Expand Up @@ -262,6 +270,14 @@ public boolean includeSegmentFileSizes() {
return this.includeSegmentFileSizes;
}

public void optimizeNodeIndicesStatsOnLevel(boolean optimizeNodeIndicesStatsOnLevel) {
this.optimizeNodeIndicesStatsOnLevel = optimizeNodeIndicesStatsOnLevel;
}

public boolean optimizeNodeIndicesStatsOnLevel() {
return this.optimizeNodeIndicesStatsOnLevel;
}

public boolean isSet(Flag flag) {
return flags.contains(flag);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,10 @@ public NodeIndicesStats stats(CommonStatsFlags flags) {
break;
}
}

if (flags.optimizeNodeIndicesStatsOnLevel()) {
logger.info("Picked NodeIndicesStats");
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, flags.getLevels());
}
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
}

Expand Down
142 changes: 127 additions & 15 deletions server/src/main/java/org/opensearch/indices/NodeIndicesStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

package org.opensearch.indices;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.IndexShardStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
Expand Down Expand Up @@ -63,6 +66,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -75,22 +79,21 @@
@PublicApi(since = "1.0.0")
public class NodeIndicesStats implements Writeable, ToXContentFragment {
private CommonStats stats;
private Map<Index, CommonStats> statsByIndex;
private Map<Index, List<IndexShardStats>> statsByShard;

public NodeIndicesStats(StreamInput in) throws IOException {
stats = new CommonStats(in);
if (in.getVersion().onOrAfter(Version.V_2_15_0)) {
// contains statsByIndex
if (in.readBoolean()) {
statsByIndex = new HashMap<>();
readStatsByIndex(in);
}
}
if (in.readBoolean()) {
int entries = in.readVInt();
statsByShard = new HashMap<>();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
}
readStatsByShards(in);
}
}

Expand All @@ -112,6 +115,57 @@ public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>>
}
}

public NodeIndicesStats(
CommonStats oldStats,
Map<Index, List<IndexShardStats>> statsByShard,
SearchRequestStats searchRequestStats,
String[] levels
) {
// make a total common stats from old ones and current ones
this.stats = oldStats;
for (List<IndexShardStats> shardStatsList : statsByShard.values()) {
for (IndexShardStats indexShardStats : shardStatsList) {
for (ShardStats shardStats : indexShardStats.getShards()) {
stats.add(shardStats.getStats());
}
}
}

if (this.stats.search != null) {
this.stats.search.setSearchRequestStats(searchRequestStats);
}

if (levels != null) {
if (Arrays.stream(levels).anyMatch(NodeIndicesStats.levels.indices::equals)) {
this.statsByIndex = createStatsByIndex(statsByShard);
} else if (Arrays.stream(levels).anyMatch(NodeIndicesStats.levels.shards::equals)) {
this.statsByShard = statsByShard;
}
}
}

private void readStatsByIndex(StreamInput in) throws IOException {
int indexEntries = in.readVInt();
for (int i = 0; i < indexEntries; i++) {
Index index = new Index(in);
CommonStats commonStats = new CommonStats(in);
statsByIndex.put(index, commonStats);
}
}

private void readStatsByShards(StreamInput in) throws IOException {
int entries = in.readVInt();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
}
}

@Nullable
public StoreStats getStore() {
return stats.getStore();
Expand Down Expand Up @@ -195,7 +249,31 @@ public RecoveryStats getRecoveryStats() {
@Override
public void writeTo(StreamOutput out) throws IOException {
stats.writeTo(out);

if (out.getVersion().onOrAfter(Version.V_2_15_0)) {
out.writeBoolean(statsByIndex != null);
if (statsByIndex != null) {
writeStatsByIndex(out);
}
}

out.writeBoolean(statsByShard != null);
if (statsByShard != null) {
writeStatsByShards(out);
}
}

private void writeStatsByIndex(StreamOutput out) throws IOException {
if (statsByIndex != null) {
out.writeVInt(statsByIndex.size());
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
entry.getKey().writeTo(out);
entry.getValue().writeTo(out);
}
}
}

private void writeStatsByShards(StreamOutput out) throws IOException {
if (statsByShard != null) {
out.writeVInt(statsByShard.size());
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
Expand All @@ -222,16 +300,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject(Fields.INDICES);
stats.toXContent(builder, params);

if ("indices".equals(level)) {
Map<Index, CommonStats> indexStats = createStatsByIndex();
if (levels.indices.equals(level)) {
builder.startObject(Fields.INDICES);
for (Map.Entry<Index, CommonStats> entry : indexStats.entrySet()) {
if (statsByIndex == null && statsByShard!=null) {
statsByIndex = createStatsByIndex(statsByShard);
}
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
builder.startObject(entry.getKey().getName());
entry.getValue().toXContent(builder, params);
builder.endObject();
}
builder.endObject();
} else if ("shards".equals(level)) {
} else if (levels.shards.equals(level)) {
builder.startObject("shards");
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
builder.startArray(entry.getKey().getName());
Expand All @@ -251,7 +331,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

private Map<Index, CommonStats> createStatsByIndex() {
private Map<Index, CommonStats> createStatsByIndex(Map<Index, List<IndexShardStats>> statsByShard) {
Map<Index, CommonStats> statsMap = new HashMap<>();
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
if (!statsMap.containsKey(entry.getKey())) {
Expand All @@ -276,6 +356,14 @@ public List<IndexShardStats> getShardStats(Index index) {
}
}

public CommonStats getIndexStats(Index index) {
if (statsByIndex == null) {
return null;
} else {
return statsByIndex.get(index);
}
}

/**
* Fields used for parsing and toXContent
*
Expand All @@ -284,4 +372,28 @@ public List<IndexShardStats> getShardStats(Index index) {
static final class Fields {
static final String INDICES = "indices";
}

/**
* Levels for the NodeIndicesStats
*/
public enum levels {
node("node"),
indices("indices"),
shards("shards");

private final String name;

levels(String name) {
this.name = name;
}

@Override
public String toString() {
return name;
}

public boolean equals(String value) {
return this.name.equals(value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// If no levels are passed in this results in an empty array.
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.indices().optimizeNodeIndicesStatsOnLevel(true);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.timeout(request.param("timeout"));
nodesStatsRequest.clear()
.indices(true)
.indices(true).useOptimizedNodeIndicesStats(true)
.addMetrics(
NodesStatsRequest.Metric.JVM.metricName(),
NodesStatsRequest.Metric.OS.metricName(),
Expand Down

0 comments on commit f6d3b58

Please sign in to comment.