Skip to content

Commit

Permalink
Optimize indexing performance in replica shard
Browse files Browse the repository at this point in the history
Signed-off-by: kkewwei <[email protected]>
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
kkewwei committed Feb 16, 2025
1 parent 56825f6 commit 888a07c
Show file tree
Hide file tree
Showing 19 changed files with 569 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Convert transport-reactor-netty4 to use gradle version catalog [#17233](https://github.com/opensearch-project/OpenSearch/pull/17233)
- Increase force merge threads to 1/8th of cores [#17255](https://github.com/opensearch-project/OpenSearch/pull/17255)
- Optimize indexing performance in replica shard [#17371](https://github.com/opensearch-project/OpenSearch/pull/17371)

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.common.xcontent.support.XContentMapValues;
import org.opensearch.core.common.Strings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.core.rest.RestStatus;
Expand Down Expand Up @@ -465,6 +465,90 @@ public void testSyncedFlushTransition() throws Exception {
}
}

public void testReplicasUsePrimaryIndexingStrategy() throws Exception {
Nodes nodes = buildNodeAndVersions();
logger.info("cluster discovered:\n {}", nodes.toString());
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1m")
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2);
final String index = "test-index";
createIndex(index, settings.build());
ensureNoInitializingShards(); // wait for all other shard activity to finish
ensureGreen(index);

int docCount = 200;
try (RestClient nodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
indexDocs(index, 0, docCount);

Thread[] indexThreads = new Thread[5];
for (int i = 0; i < indexThreads.length; i++) {
indexThreads[i] = new Thread(() -> {
try {
int idStart = randomInt(docCount / 2);
indexDocs(index, idStart, idStart + docCount / 2);
if (randomBoolean()) {
// perform a refresh
assertOK(client().performRequest(new Request("POST", index + "/_flush")));
}
} catch (IOException e) {
throw new AssertionError("failed while indexing [" + e.getMessage() + "]");
}
});
indexThreads[i].start();
}
for (Thread indexThread : indexThreads) {
indexThread.join();
}
if (randomBoolean()) {
// perform a refresh
assertOK(client().performRequest(new Request("POST", index + "/_flush")));
}
// verify replica catch up with primary
assertSeqNoOnShards(index, nodes, docCount, nodeClient);
assertSourceEqualWithPrimary(index, docCount);
}
}

private void assertSourceEqualWithPrimary(final String index, final int expectedCount) throws IOException {
Request primaryRequest = new Request("GET", index + "/_search");
primaryRequest.addParameter("preference", "_primary");
primaryRequest.addParameter("size", String.valueOf(expectedCount+100));
final Response primaryResponse = client().performRequest(primaryRequest);

Map<String, Object> primaryHits = ObjectPath.createFromResponse(primaryResponse).evaluate("hits");
Map<String, Object> totals = ObjectPath.evaluate(primaryHits, "total");
assertEquals(expectedCount, totals.get("values"));

List<Object> primarySources = ObjectPath.evaluate(primaryHits, "hits");
assertEquals(expectedCount, primarySources.size());

Map<String, Object> primarys = new HashMap<>(expectedCount);
for (int i = 0; i < primarySources.size(); i++) {
primarys.put(ObjectPath.evaluate(primarySources.get(i), "_id"), primarySources.get(i));
}


// replicas source
Request replicaRequest = new Request("GET", index + "/_search");
replicaRequest.addParameter("preference", "_replica");
replicaRequest.addParameter("size", String.valueOf(expectedCount+100));
final Response replicaResponse = client().performRequest(replicaRequest);

Map<String, Object> replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits");
Map<String, Object> replicaTotals = ObjectPath.evaluate(primaryHits, "total");
assertEquals(expectedCount, replicaTotals.get("values"));

List<Object> replicaSources = ObjectPath.evaluate(replicaHits, "hits");
assertEquals(expectedCount, replicaSources.size());

for (Object replicaSource : replicaSources) {
String id = ObjectPath.evaluate(replicaSource, "_id").toString();
assertEquals(primarys.get(id), replicaSource);
}
}

private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
Request request = new Request("GET", index + "/_count");
request.addParameter("preference", preference);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SequenceNumbers;

Expand Down Expand Up @@ -392,6 +393,10 @@ protected static void parseInnerToXContent(XContentParser parser, Builder contex
}
}

public InternalEngine.WriteStrategy writeStrategy() {
return null;
};

/**
* Base class of all {@link DocWriteResponse} builders. These {@link DocWriteResponse.Builder} are used during
* xcontent parsing to temporarily store the parsed values, then the {@link Builder#build()} method is called to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ public void markOperationAsExecuted(Engine.Result result) {
result.getSeqNo(),
result.getTerm(),
indexResult.getVersion(),
indexResult.isCreated()
indexResult.isCreated(),
indexResult.indexingStrategy()
);
} else if (result.getOperationType() == Engine.Operation.TYPE.DELETE) {
Engine.DeleteResult deleteResult = (Engine.DeleteResult) result;
Expand All @@ -286,7 +287,8 @@ public void markOperationAsExecuted(Engine.Result result) {
deleteResult.getSeqNo(),
result.getTerm(),
deleteResult.getVersion(),
deleteResult.isFound()
deleteResult.isFound(),
deleteResult.deletionStrategy()
);

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.get.GetResult;
import org.opensearch.index.mapper.MapperException;
Expand Down Expand Up @@ -751,7 +752,8 @@ static BulkItemResponse processUpdateResponse(
indexResponse.getSeqNo(),
indexResponse.getPrimaryTerm(),
indexResponse.getVersion(),
indexResponse.getResult()
indexResponse.getResult(),
indexResponse.writeStrategy()
);

if (updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) {
Expand Down Expand Up @@ -783,7 +785,8 @@ static BulkItemResponse processUpdateResponse(
deleteResponse.getSeqNo(),
deleteResponse.getPrimaryTerm(),
deleteResponse.getVersion(),
deleteResponse.getResult()
deleteResponse.getResult(),
deleteResponse.writeStrategy()
);

final GetResult getResult = UpdateHelper.extractGetResult(
Expand Down Expand Up @@ -880,7 +883,8 @@ private static Engine.Result performOpOnReplica(
primaryResponse.getVersion(),
indexRequest.getAutoGeneratedTimestamp(),
indexRequest.isRetry(),
sourceToParse
sourceToParse,
(InternalEngine.IndexingStrategy) primaryResponse.writeStrategy()
);
break;
case DELETE:
Expand All @@ -889,7 +893,8 @@ private static Engine.Result performOpOnReplica(
primaryResponse.getSeqNo(),
primaryResponse.getPrimaryTerm(),
primaryResponse.getVersion(),
deleteRequest.id()
deleteRequest.id(),
(InternalEngine.DeletionStrategy) primaryResponse.writeStrategy()
);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@
import org.opensearch.action.DocWriteResponse;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.transport.client.Client;
import org.opensearch.index.engine.InternalEngine;

import java.io.IOException;

import static org.opensearch.Version.V_3_0_0;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
Expand All @@ -54,21 +57,53 @@
*/
@PublicApi(since = "1.0.0")
public class DeleteResponse extends DocWriteResponse {
private final InternalEngine.DeletionStrategy deletionStrategy;

public DeleteResponse(ShardId shardId, StreamInput in) throws IOException {
super(shardId, in);
if (in.getVersion().onOrAfter(V_3_0_0)) {
this.deletionStrategy = new InternalEngine.DeletionStrategy(in);
} else {
this.deletionStrategy = null;
}
}

public DeleteResponse(StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(V_3_0_0)) {
this.deletionStrategy = new InternalEngine.DeletionStrategy(in);
} else {
this.deletionStrategy = null;
}
}

public DeleteResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, boolean found) {
this(shardId, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND);
this(shardId, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND, null);
}

public DeleteResponse(
ShardId shardId,
String id,
long seqNo,
long primaryTerm,
long version,
boolean found,
InternalEngine.DeletionStrategy deletionStrategy
) {
this(shardId, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND, deletionStrategy);
}

private DeleteResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result) {
private DeleteResponse(
ShardId shardId,
String id,
long seqNo,
long primaryTerm,
long version,
Result result,
InternalEngine.DeletionStrategy deletionStrategy
) {
super(shardId, id, seqNo, primaryTerm, version, assertDeletedOrNotFound(result));
this.deletionStrategy = deletionStrategy;
}

private static Result assertDeletedOrNotFound(Result result) {
Expand All @@ -93,6 +128,27 @@ public String toString() {
return builder.append("]").toString();
}

@Override
public void writeThin(StreamOutput out) throws IOException {
super.writeThin(out);
if (out.getVersion().onOrAfter(V_3_0_0)) {
deletionStrategy.writeTo(out);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(V_3_0_0)) {
deletionStrategy.writeTo(out);
}
}

@Override
public InternalEngine.DeletionStrategy writeStrategy() {
return deletionStrategy;
}

public static DeleteResponse fromXContent(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

Expand Down Expand Up @@ -122,7 +178,7 @@ public static class Builder extends DocWriteResponse.Builder {

@Override
public DeleteResponse build() {
DeleteResponse deleteResponse = new DeleteResponse(shardId, id, seqNo, primaryTerm, version, result);
DeleteResponse deleteResponse = new DeleteResponse(shardId, id, seqNo, primaryTerm, version, result, null);
deleteResponse.setForcedRefresh(forcedRefresh);
if (shardInfo != null) {
deleteResponse.setShardInfo(shardInfo);
Expand Down
Loading

0 comments on commit 888a07c

Please sign in to comment.