Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SnapshotV2] Add timestamp of last successful fetch of pinned timestamps in node stats #15611

Merged
merged 5 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.remotestore;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -20,6 +22,8 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REMOTE_STORE;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
static final String INDEX_NAME = "remote-store-test-idx-1";
Expand Down Expand Up @@ -180,4 +184,41 @@ public void onFailure(Exception e) {
assertBusy(() -> assertEquals(Set.of(timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2()));
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}

public void testLastSuccessfulFetchOfPinnedTimestampsPresentInNodeStats() throws Exception {
logger.info("Starting up cluster manager");
logger.info("cluster.remote_store.pinned_timestamps.enabled set to true");
logger.info("cluster.remote_store.pinned_timestamps.scheduler_interval set to minimum value of 1minute");
Settings pinnedTimestampEnabledSettings = Settings.builder()
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.getKey(), "1m")
.build();
internalCluster().startClusterManagerOnlyNode(pinnedTimestampEnabledSettings);
String remoteNodeName = internalCluster().startDataOnlyNodes(1, pinnedTimestampEnabledSettings).get(0);
ensureStableCluster(2);
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
remoteNodeName
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

assertBusy(() -> {
long lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
assertTrue(lastSuccessfulFetchOfPinnedTimestamps > 0L);
NodesStatsResponse nodesStatsResponse = internalCluster().client()
.admin()
.cluster()
.prepareNodesStats()
.addMetric(REMOTE_STORE.metricName())
.execute()
.actionGet();
for (NodeStats nodeStats : nodesStatsResponse.getNodes()) {
long lastRecordedFetch = nodeStats.getRemoteStoreNodeStats().getLastSuccessfulFetchOfPinnedTimestamps();
assertTrue(lastRecordedFetch >= lastSuccessfulFetchOfPinnedTimestamps);
}
});

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.node.NodesResourceUsageStats;
import org.opensearch.node.remotestore.RemoteStoreNodeStats;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
import org.opensearch.repositories.RepositoriesStats;
import org.opensearch.script.ScriptCacheStats;
Expand Down Expand Up @@ -162,6 +163,9 @@
@Nullable
private NodeCacheStats nodeCacheStats;

@Nullable
private RemoteStoreNodeStats remoteStoreNodeStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -243,6 +247,12 @@
} else {
nodeCacheStats = null;
}
// TODO: change version to V_2_18_0
if (in.getVersion().onOrAfter(Version.CURRENT)) {
remoteStoreNodeStats = in.readOptionalWriteable(RemoteStoreNodeStats::new);
} else {
remoteStoreNodeStats = null;

Check warning on line 254 in server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java#L254

Added line #L254 was not covered by tests
}
}

public NodeStats(
Expand Down Expand Up @@ -274,7 +284,8 @@
@Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats,
@Nullable RepositoriesStats repositoriesStats,
@Nullable AdmissionControlStats admissionControlStats,
@Nullable NodeCacheStats nodeCacheStats
@Nullable NodeCacheStats nodeCacheStats,
@Nullable RemoteStoreNodeStats remoteStoreNodeStats
) {
super(node);
this.timestamp = timestamp;
Expand Down Expand Up @@ -305,6 +316,7 @@
this.repositoriesStats = repositoriesStats;
this.admissionControlStats = admissionControlStats;
this.nodeCacheStats = nodeCacheStats;
this.remoteStoreNodeStats = remoteStoreNodeStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -467,6 +479,11 @@
return nodeCacheStats;
}

@Nullable
public RemoteStoreNodeStats getRemoteStoreNodeStats() {
return remoteStoreNodeStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -525,6 +542,10 @@
if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
out.writeOptionalWriteable(nodeCacheStats);
}
// TODO: change version to V_2_18_0
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalWriteable(remoteStoreNodeStats);
}
}

@Override
Expand Down Expand Up @@ -631,6 +652,9 @@
if (getNodeCacheStats() != null) {
getNodeCacheStats().toXContent(builder, params);
}
if (getRemoteStoreNodeStats() != null) {
getRemoteStoreNodeStats().toXContent(builder, params);

Check warning on line 656 in server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java#L656

Added line #L656 was not covered by tests
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ public enum Metric {
SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"),
REPOSITORIES("repositories"),
ADMISSION_CONTROL("admission_control"),
CACHE_STATS("caches");
CACHE_STATS("caches"),
REMOTE_STORE("remote_store");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics),
NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics),
NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics)
NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.REMOTE_STORE.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.MonitorService;
import org.opensearch.node.remotestore.RemoteStoreNodeStats;
import org.opensearch.plugins.PluginsService;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -241,7 +242,8 @@ public NodeStats stats(
boolean segmentReplicationTrackerStats,
boolean repositoriesStats,
boolean admissionControl,
boolean cacheService
boolean cacheService,
boolean remoteStoreNodeStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand Down Expand Up @@ -274,7 +276,8 @@ public NodeStats stats(
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null,
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null,
admissionControl ? this.admissionControlService.stats() : null,
cacheService ? this.cacheService.stats(indices) : null
cacheService ? this.cacheService.stats(indices) : null,
remoteStoreNodeStats ? new RemoteStoreNodeStats() : null
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.node.remotestore;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Node level remote store stats
* @opensearch.internal
*/
public class RemoteStoreNodeStats implements Writeable, ToXContentFragment {

public static final String STATS_NAME = "remote_store";
public static final String LAST_SUCCESSFUL_FETCH_OF_PINNED_TIMESTAMPS = "last_successful_fetch_of_pinned_timestamps";

/**
* Time stamp for the last successful fetch of pinned timestamps by the {@linkplain RemoteStorePinnedTimestampService}
*/
private final long lastSuccessfulFetchOfPinnedTimestamps;

public RemoteStoreNodeStats() {
this.lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
}

public long getLastSuccessfulFetchOfPinnedTimestamps() {
return this.lastSuccessfulFetchOfPinnedTimestamps;

Check warning on line 39 in server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java#L39

Added line #L39 was not covered by tests
}

public RemoteStoreNodeStats(StreamInput in) throws IOException {
this.lastSuccessfulFetchOfPinnedTimestamps = in.readLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(this.lastSuccessfulFetchOfPinnedTimestamps);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(STATS_NAME);
builder.field(LAST_SUCCESSFUL_FETCH_OF_PINNED_TIMESTAMPS, this.lastSuccessfulFetchOfPinnedTimestamps);
return builder.endObject();

Check warning on line 55 in server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java#L53-L55

Added lines #L53 - L55 were not covered by tests
}

@Override
public String toString() {
return "RemoteStoreNodeStats{ lastSuccessfulFetchOfPinnedTimestamps=" + lastSuccessfulFetchOfPinnedTimestamps + "}";

Check warning on line 60 in server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java#L60

Added line #L60 was not covered by tests
}

@Override
public boolean equals(Object o) {
if (o == null) {
return false;

Check warning on line 66 in server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java#L66

Added line #L66 was not covered by tests
}
if (o.getClass() != RemoteStoreNodeStats.class) {
return false;

Check warning on line 69 in server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java#L69

Added line #L69 was not covered by tests
}
RemoteStoreNodeStats other = (RemoteStoreNodeStats) o;
return this.lastSuccessfulFetchOfPinnedTimestamps == other.lastSuccessfulFetchOfPinnedTimestamps;
}

@Override
public int hashCode() {
return Objects.hash(lastSuccessfulFetchOfPinnedTimestamps);

Check warning on line 77 in server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java#L77

Added line #L77 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.opensearch.node.NodeResourceUsageStats;
import org.opensearch.node.NodesResourceUsageStats;
import org.opensearch.node.ResponseCollectorService;
import org.opensearch.node.remotestore.RemoteStoreNodeStats;
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
Expand Down Expand Up @@ -614,6 +615,14 @@ public void testSerialization() throws IOException {
} else {
assertEquals(nodeCacheStats, deserializedNodeCacheStats);
}

RemoteStoreNodeStats remoteStoreNodeStats = nodeStats.getRemoteStoreNodeStats();
RemoteStoreNodeStats deserializedRemoteStoreNodeStats = deserializedNodeStats.getRemoteStoreNodeStats();
if (remoteStoreNodeStats == null) {
assertNull(deserializedRemoteStoreNodeStats);
} else {
assertEquals(remoteStoreNodeStats, deserializedRemoteStoreNodeStats);
}
}
}
}
Expand Down Expand Up @@ -996,6 +1005,16 @@ public void apply(String action, AdmissionControlActionType admissionControlActi
nodeCacheStats = new NodeCacheStats(cacheStatsMap, flags);
}

RemoteStoreNodeStats remoteStoreNodeStats = null;
if (frequently()) {
remoteStoreNodeStats = new RemoteStoreNodeStats() {
@Override
public long getLastSuccessfulFetchOfPinnedTimestamps() {
return 123456L;
}
};
}

// TODO: Only remote_store based aspects of NodeIndicesStats are being tested here.
// It is possible to test other metrics in NodeIndicesStats as well since it extends Writeable now
return new NodeStats(
Expand Down Expand Up @@ -1027,7 +1046,8 @@ public void apply(String action, AdmissionControlActionType admissionControlActi
segmentReplicationRejectionStats,
null,
admissionControlStats,
nodeCacheStats
nodeCacheStats,
remoteStoreNodeStats
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ private ClusterStatsNodeResponse createClusterStatsNodeResponse(
null,
null,
null,
null,
null
);
if (defaultBehavior) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -226,6 +227,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -257,6 +259,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
)
);
Expand Down Expand Up @@ -319,6 +322,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -350,6 +354,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -381,6 +386,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
)
);
Expand Down
Loading
Loading