Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Optimize NodeIndicesStats output behind flag #15504

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
- [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343)))
- Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630))
- Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class CommonStatsFlags implements Writeable, Cloneable {
// Used for metric CACHE_STATS, to determine which caches to report stats for
private EnumSet<CacheType> includeCaches = EnumSet.noneOf(CacheType.class);
private String[] levels = new String[0];
private boolean includeIndicesStatsByLevel = false;

/**
* @param flags flags to set. If no flags are supplied, default flags will be set.
Expand Down Expand Up @@ -106,6 +107,9 @@ public CommonStatsFlags(StreamInput in) throws IOException {
includeCaches = in.readEnumSet(CacheType.class);
levels = in.readStringArray();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
includeIndicesStatsByLevel = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -135,6 +139,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeEnumSet(includeCaches);
out.writeStringArrayNullable(levels);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(includeIndicesStatsByLevel);
}
}

/**
Expand Down Expand Up @@ -273,6 +280,14 @@ public boolean includeSegmentFileSizes() {
return this.includeSegmentFileSizes;
}

public void setIncludeIndicesStatsByLevel(boolean includeIndicesStatsByLevel) {
this.includeIndicesStatsByLevel = includeIndicesStatsByLevel;
}

public boolean getIncludeIndicesStatsByLevel() {
return this.includeIndicesStatsByLevel;
}

public boolean isSet(Flag flag) {
return flags.contains(flag);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,8 +758,12 @@ public NodeIndicesStats stats(CommonStatsFlags flags) {
break;
}
}

return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
if (flags.getIncludeIndicesStatsByLevel()) {
NodeIndicesStats.StatsLevel statsLevel = NodeIndicesStats.getAcceptedLevel(flags.getLevels());
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, statsLevel);
} else {
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
}
}

Map<Index, List<IndexShardStats>> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) {
Expand Down
199 changes: 171 additions & 28 deletions server/src/main/java/org/opensearch/indices/NodeIndicesStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.indices;

import org.opensearch.Version;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.IndexShardStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
Expand Down Expand Up @@ -63,9 +64,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Global information on indices stats running on a specific node.
Expand All @@ -74,26 +77,27 @@
*/
@PublicApi(since = "1.0.0")
public class NodeIndicesStats implements Writeable, ToXContentFragment {
private CommonStats stats;
private Map<Index, List<IndexShardStats>> statsByShard;
protected CommonStats stats;
protected Map<Index, CommonStats> statsByIndex;
protected Map<Index, List<IndexShardStats>> statsByShard;

public NodeIndicesStats(StreamInput in) throws IOException {
stats = new CommonStats(in);
if (in.readBoolean()) {
int entries = in.readVInt();
statsByShard = new HashMap<>();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
// contains statsByIndex
if (in.readBoolean()) {
statsByIndex = readStatsByIndex(in);
}
}
if (in.readBoolean()) {
statsByShard = readStatsByShard(in);
}
}

/**
* Without passing the information of the levels to the constructor, we return the Node-level aggregated stats as
* {@link CommonStats} along with a hash-map containing Index to List of Shard Stats.
*/
public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>> statsByShard, SearchRequestStats searchRequestStats) {
// this.stats = stats;
this.statsByShard = statsByShard;
Expand All @@ -112,6 +116,90 @@ public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>>
}
}

/**
* Passing the level information to the nodes allows us to aggregate the stats based on the level passed. This
* allows us to aggregate based on NodeLevel (default - if no level is passed) or Index level if `indices` level is
* passed and finally return the statsByShards map if `shards` level is passed. This allows us to reduce ser/de of
* stats and return only the information that is required while returning to the client.
*/
public NodeIndicesStats(
CommonStats oldStats,
Map<Index, List<IndexShardStats>> statsByShard,
SearchRequestStats searchRequestStats,
StatsLevel level
) {
// make a total common stats from old ones and current ones
this.stats = oldStats;
for (List<IndexShardStats> shardStatsList : statsByShard.values()) {
for (IndexShardStats indexShardStats : shardStatsList) {
for (ShardStats shardStats : indexShardStats.getShards()) {
stats.add(shardStats.getStats());
}
}
}

if (this.stats.search != null) {
this.stats.search.setSearchRequestStats(searchRequestStats);
}

if (level != null) {
switch (level) {
case INDICES:
this.statsByIndex = createStatsByIndex(statsByShard);
break;
case SHARDS:
this.statsByShard = statsByShard;
break;
}
}
}

/**
* By default, the levels passed from the transport action will be a list of strings, since NodeIndicesStats can
* only aggregate on one level, we pick the first accepted level else we ignore if no known level is passed. Level is
* selected based on enum defined in {@link StatsLevel}
*
* Note - we are picking the first level as multiple levels are not supported in the previous versions.
* @param levels - levels sent in the request.
*
* @return Corresponding identified enum {@link StatsLevel}
*/
public static StatsLevel getAcceptedLevel(String[] levels) {
if (levels != null && levels.length > 0) {
Optional<StatsLevel> level = Arrays.stream(StatsLevel.values())
.filter(field -> field.getRestName().equals(levels[0]))
.findFirst();
return level.orElseThrow(() -> new IllegalArgumentException("Level provided is not supported by NodeIndicesStats"));
}
return null;
}

private Map<Index, CommonStats> readStatsByIndex(StreamInput in) throws IOException {
Map<Index, CommonStats> statsByIndex = new HashMap<>();
int indexEntries = in.readVInt();
for (int i = 0; i < indexEntries; i++) {
Index index = new Index(in);
CommonStats commonStats = new CommonStats(in);
statsByIndex.put(index, commonStats);
}
return statsByIndex;
}

private Map<Index, List<IndexShardStats>> readStatsByShard(StreamInput in) throws IOException {
Map<Index, List<IndexShardStats>> statsByShard = new HashMap<>();
int entries = in.readVInt();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
}
return statsByShard;
}

@Nullable
public StoreStats getStore() {
return stats.getStore();
Expand Down Expand Up @@ -195,7 +283,31 @@ public RecoveryStats getRecoveryStats() {
@Override
public void writeTo(StreamOutput out) throws IOException {
stats.writeTo(out);

if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(statsByIndex != null);
if (statsByIndex != null) {
writeStatsByIndex(out);
}
}

out.writeBoolean(statsByShard != null);
if (statsByShard != null) {
writeStatsByShards(out);
}
}

private void writeStatsByIndex(StreamOutput out) throws IOException {
if (statsByIndex != null) {
out.writeVInt(statsByIndex.size());
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
entry.getKey().writeTo(out);
entry.getValue().writeTo(out);
}
}
}

private void writeStatsByShards(StreamOutput out) throws IOException {
if (statsByShard != null) {
out.writeVInt(statsByShard.size());
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
Expand All @@ -210,29 +322,46 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
final String level = params.param("level", "node");
final boolean isLevelValid = "indices".equalsIgnoreCase(level)
|| "node".equalsIgnoreCase(level)
|| "shards".equalsIgnoreCase(level);
final String level = params.param("level", StatsLevel.NODE.getRestName());
final boolean isLevelValid = StatsLevel.NODE.getRestName().equalsIgnoreCase(level)
|| StatsLevel.INDICES.getRestName().equalsIgnoreCase(level)
|| StatsLevel.SHARDS.getRestName().equalsIgnoreCase(level);
if (!isLevelValid) {
throw new IllegalArgumentException("level parameter must be one of [indices] or [node] or [shards] but was [" + level + "]");
throw new IllegalArgumentException(
"level parameter must be one of ["
+ StatsLevel.INDICES.getRestName()
+ "] or ["
+ StatsLevel.NODE.getRestName()
+ "] or ["
+ StatsLevel.SHARDS.getRestName()
+ "] but was ["
+ level
+ "]"
);
}

// "node" level
builder.startObject(Fields.INDICES);
builder.startObject(StatsLevel.INDICES.getRestName());
stats.toXContent(builder, params);

if ("indices".equals(level)) {
Map<Index, CommonStats> indexStats = createStatsByIndex();
builder.startObject(Fields.INDICES);
for (Map.Entry<Index, CommonStats> entry : indexStats.entrySet()) {
if (StatsLevel.INDICES.getRestName().equals(level)) {
assert statsByIndex != null || statsByShard != null : "Expected shard stats or index stats in response for generating ["
+ StatsLevel.INDICES
+ "] field";
if (statsByIndex == null) {
statsByIndex = createStatsByIndex(statsByShard);
}

builder.startObject(StatsLevel.INDICES.getRestName());
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
builder.startObject(entry.getKey().getName());
entry.getValue().toXContent(builder, params);
builder.endObject();
}
builder.endObject();
} else if ("shards".equals(level)) {
builder.startObject("shards");
} else if (StatsLevel.SHARDS.getRestName().equals(level)) {
builder.startObject(StatsLevel.SHARDS.getRestName());
assert statsByShard != null : "Expected shard stats in response for generating [" + StatsLevel.SHARDS + "] field";
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
builder.startArray(entry.getKey().getName());
for (IndexShardStats indexShardStats : entry.getValue()) {
Expand All @@ -251,7 +380,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

private Map<Index, CommonStats> createStatsByIndex() {
private Map<Index, CommonStats> createStatsByIndex(Map<Index, List<IndexShardStats>> statsByShard) {
Map<Index, CommonStats> statsMap = new HashMap<>();
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
if (!statsMap.containsKey(entry.getKey())) {
Expand Down Expand Up @@ -281,7 +410,21 @@ public List<IndexShardStats> getShardStats(Index index) {
*
* @opensearch.internal
*/
static final class Fields {
static final String INDICES = "indices";
@PublicApi(since = "3.0.0")
public enum StatsLevel {
INDICES("indices"),
SHARDS("shards"),
NODE("node");

private final String restName;

StatsLevel(String restName) {
this.restName = restName;
}

public String getRestName() {
return restName;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.setIncludeDiscoveryNodes(false);
nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest.Metric.PROCESS.metricName(),
NodesStatsRequest.Metric.SCRIPT.metricName()
);
nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true);
client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener<NodesStatsResponse>(channel) {
@Override
public RestResponse buildResponse(NodesStatsResponse nodesStatsResponse) throws Exception {
Expand Down
Loading
Loading