Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Search replica to return segrep stats #16677

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add vertical scaling and SoftReference for snapshot repository data cache ([#16489](https://github.com/opensearch-project/OpenSearch/pull/16489))
- Support prefix list for remote repository attributes([#16271](https://github.com/opensearch-project/OpenSearch/pull/16271))
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
- Added implementation for the stats calculation for search and regular replica in shards ([#16677](https://github.com/opensearch-project/OpenSearch/pull/16677))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
Expand Down Expand Up @@ -433,4 +434,103 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception {

}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
}

public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();

int numShards = 2;
assertAcked(
prepareCreate(
INDEX_NAME,
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 1)
.put("number_of_search_only_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index(INDEX_NAME, "doc", Integer.toString(i));
}
refresh(INDEX_NAME);
ensureSearchable(INDEX_NAME);

assertBusy(() -> {
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();
SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
final SegmentReplicationState currentReplicationState = perGroupStats.getReplicaStats()
.stream()
.findFirst()
.get()
.getCurrentReplicationState();
assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1);
assertEquals(segmentReplicationStatsResponse.getTotalShards(), numShards * 3);
assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), numShards * 3);
assertNotNull(currentReplicationState);
assertEquals(currentReplicationState.getStage(), SegmentReplicationState.Stage.DONE);
assertTrue(currentReplicationState.getIndex().recoveredFileCount() > 0);
}, 1, TimeUnit.MINUTES);
}

public void testSegmentReplicationStatsResponseWithOnlySearchReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();

int numShards = 1;
assertAcked(
prepareCreate(
INDEX_NAME,
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 0)
.put("number_of_search_only_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index(INDEX_NAME, "doc", Integer.toString(i));
}
refresh(INDEX_NAME);
ensureSearchable(INDEX_NAME);

assertBusy(() -> {
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();
SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
final SegmentReplicationState currentReplicationState = perGroupStats.getReplicaStats()
.stream()
.findFirst()
.get()
.getCurrentReplicationState();
assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1);
assertEquals(segmentReplicationStatsResponse.getTotalShards(), 2);
assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), 2);
assertNotNull(currentReplicationState);
assertEquals(currentReplicationState.getStage(), SegmentReplicationState.Stage.DONE);
assertTrue(currentReplicationState.getIndex().recoveredFileCount() > 0);
}, 1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.indices.replication.SegmentReplicationState;

import java.io.IOException;
Expand All @@ -31,19 +32,31 @@ public class SegmentReplicationShardStatsResponse implements Writeable {
@Nullable
private final SegmentReplicationState replicaStats;

@Nullable
private final SegmentReplicationShardStats segmentReplicationShardStats;

public SegmentReplicationShardStatsResponse(StreamInput in) throws IOException {
this.primaryStats = in.readOptionalWriteable(SegmentReplicationPerGroupStats::new);
this.replicaStats = in.readOptionalWriteable(SegmentReplicationState::new);
this.segmentReplicationShardStats = in.readOptionalWriteable(SegmentReplicationShardStats::new);
}

public SegmentReplicationShardStatsResponse(SegmentReplicationPerGroupStats primaryStats) {
this.primaryStats = primaryStats;
this.replicaStats = null;
this.segmentReplicationShardStats = null;
}

public SegmentReplicationShardStatsResponse(SegmentReplicationState replicaStats) {
this.replicaStats = replicaStats;
this.primaryStats = null;
this.segmentReplicationShardStats = null;
}

public SegmentReplicationShardStatsResponse(SegmentReplicationShardStats segmentReplicationShardStats) {
this.primaryStats = null;
this.replicaStats = null;
this.segmentReplicationShardStats = segmentReplicationShardStats;
}

public SegmentReplicationPerGroupStats getPrimaryStats() {
Expand All @@ -54,10 +67,15 @@ public SegmentReplicationState getReplicaStats() {
return replicaStats;
}

public SegmentReplicationShardStats getSegmentReplicationShardStats() {
return segmentReplicationShardStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(primaryStats);
out.writeOptionalWriteable(replicaStats);
out.writeOptionalWriteable(segmentReplicationShardStats);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -101,6 +106,9 @@ protected SegmentReplicationStatsResponse newResponse(
final Map<String, SegmentReplicationState> replicaStats = new HashMap<>();
// map of index name to list of replication group stats.
final Map<String, List<SegmentReplicationPerGroupStats>> primaryStats = new HashMap<>();
// search replica responses
final Set<SegmentReplicationShardStats> searchReplicaSegRepShardStats = new HashSet<>();

for (SegmentReplicationShardStatsResponse response : responses) {
if (response != null) {
if (response.getReplicaStats() != null) {
Expand All @@ -109,6 +117,11 @@ protected SegmentReplicationStatsResponse newResponse(
replicaStats.putIfAbsent(shardRouting.allocationId().getId(), response.getReplicaStats());
}
}

if (response.getSegmentReplicationShardStats() != null) {
searchReplicaSegRepShardStats.add(response.getSegmentReplicationShardStats());
}

if (response.getPrimaryStats() != null) {
final ShardId shardId = response.getPrimaryStats().getShardId();
if (shardsToFetch.isEmpty() || shardsToFetch.contains(shardId.getId())) {
Expand All @@ -134,6 +147,15 @@ protected SegmentReplicationStatsResponse newResponse(
}
}
}
// combine the search replica stats with the stats of other replicas
for (Map.Entry<String, List<SegmentReplicationPerGroupStats>> entry : primaryStats.entrySet()) {
for (SegmentReplicationPerGroupStats group : entry.getValue()) {
Set<SegmentReplicationShardStats> updatedSet = new HashSet<>(group.getReplicaStats());
updatedSet.addAll(searchReplicaSegRepShardStats);
group.setReplicaStats(updatedSet);
}
}

return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, primaryStats, shardFailures);
}

Expand All @@ -154,13 +176,17 @@ protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplication

if (shardRouting.primary()) {
return new SegmentReplicationShardStatsResponse(pressureService.getStatsForShard(indexShard));
} else if (shardRouting.isSearchOnly()) {
SegmentReplicationShardStats segmentReplicationShardStats = calcualteSegmentReplicationShardStats(
shardRouting,
indexShard,
shardId,
request.activeOnly()
);
return new SegmentReplicationShardStatsResponse(segmentReplicationShardStats);
} else {
return new SegmentReplicationShardStatsResponse(getSegmentReplicationState(shardId, request.activeOnly()));
}

// return information about only on-going segment replication events.
if (request.activeOnly()) {
return new SegmentReplicationShardStatsResponse(targetService.getOngoingEventSegmentReplicationState(shardId));
}
return new SegmentReplicationShardStatsResponse(targetService.getSegmentReplicationState(shardId));
}

@Override
Expand All @@ -181,4 +207,75 @@ protected ClusterBlockException checkRequestBlock(
) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}

private SegmentReplicationShardStats calcualteSegmentReplicationShardStats(
ShardRouting shardRouting,
IndexShard indexShard,
ShardId shardId,
boolean isActiveOnly
) {
ReplicationCheckpoint indexReplicationCheckpoint = indexShard.getLatestReplicationCheckpoint();
SegmentReplicationState segmentReplicationState = getSegmentReplicationState(shardId, isActiveOnly);
if (segmentReplicationState != null) {
ReplicationCheckpoint latestReplicationCheckpointReceived = segmentReplicationState.getLatestReplicationCheckpoint();

SegmentReplicationShardStats segmentReplicationShardStats = new SegmentReplicationShardStats(
shardRouting.allocationId().getId(),
calculateCheckpointsBehind(indexReplicationCheckpoint, latestReplicationCheckpointReceived),
calculateBytesBehind(indexReplicationCheckpoint, latestReplicationCheckpointReceived),
0,
calculateCurrentReplicationLag(shardId),
getLastCompletedReplicationLag(shardId)
);

segmentReplicationShardStats.setCurrentReplicationState(segmentReplicationState);
return segmentReplicationShardStats;
} else {
return new SegmentReplicationShardStats(shardRouting.allocationId().getId(), 0, 0, 0, 0, 0);
}
}

private SegmentReplicationState getSegmentReplicationState(ShardId shardId, boolean isActiveOnly) {
if (isActiveOnly) {
return targetService.getOngoingEventSegmentReplicationState(shardId);
} else {
return targetService.getSegmentReplicationState(shardId);
}
}

private long calculateCheckpointsBehind(
ReplicationCheckpoint indexReplicationCheckpoint,
ReplicationCheckpoint latestReplicationCheckpointReceived
) {
if (latestReplicationCheckpointReceived != null) {
return latestReplicationCheckpointReceived.getSegmentInfosVersion() - indexReplicationCheckpoint.getSegmentInfosVersion();
}
return 0;
}

private long calculateBytesBehind(
ReplicationCheckpoint indexReplicationCheckpoint,
ReplicationCheckpoint latestReplicationCheckpointReceived
) {
if (latestReplicationCheckpointReceived != null) {
Store.RecoveryDiff diff = Store.segmentReplicationDiff(
latestReplicationCheckpointReceived.getMetadataMap(),
indexReplicationCheckpoint.getMetadataMap()
);
return diff.missing.stream().mapToLong(StoreFileMetadata::length).sum();
}
return 0;
}

private long calculateCurrentReplicationLag(ShardId shardId) {
SegmentReplicationState ongoingEventSegmentReplicationState = targetService.getOngoingEventSegmentReplicationState(shardId);
return ongoingEventSegmentReplicationState != null ? ongoingEventSegmentReplicationState.getTimer().time() : 0;
}

private long getLastCompletedReplicationLag(ShardId shardId) {
SegmentReplicationState lastCompletedSegmentReplicationState = targetService.getlatestCompletedEventSegmentReplicationState(
shardId
);
return lastCompletedSegmentReplicationState != null ? lastCompletedSegmentReplicationState.getTimer().time() : 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class SegmentReplicationPerGroupStats implements Writeable, ToXContentFragment {

private final ShardId shardId;
private final Set<SegmentReplicationShardStats> replicaStats;
private Set<SegmentReplicationShardStats> replicaStats;
private final long rejectedRequestCount;

public SegmentReplicationPerGroupStats(ShardId shardId, Set<SegmentReplicationShardStats> replicaStats, long rejectedRequestCount) {
Expand All @@ -55,6 +55,10 @@ public ShardId getShardId() {
return shardId;
}

public void setReplicaStats(Set<SegmentReplicationShardStats> replicaStats) {
this.replicaStats = replicaStats;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("rejected_requests", rejectedRequestCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public SegmentReplicationShardStats(StreamInput in) throws IOException {
this.currentReplicationTimeMillis = in.readVLong();
this.lastCompletedReplicationTimeMillis = in.readVLong();
this.currentReplicationLagMillis = in.readVLong();
this.currentReplicationState = in.readOptionalWriteable(SegmentReplicationState::new);
}

public String getAllocationId() {
Expand Down Expand Up @@ -118,7 +119,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("current_replication_lag", new TimeValue(currentReplicationLagMillis));
builder.field("last_completed_replication_time", new TimeValue(lastCompletedReplicationTimeMillis));
if (currentReplicationState != null) {
builder.startObject();
builder.startObject("current_replication_state");
currentReplicationState.toXContent(builder, params);
builder.endObject();
}
Expand All @@ -134,6 +135,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(currentReplicationTimeMillis);
out.writeVLong(lastCompletedReplicationTimeMillis);
out.writeVLong(currentReplicationLagMillis);
out.writeOptionalWriteable(currentReplicationState);
}

@Override
Expand Down
Loading
Loading