From 888a07cb091064e28f84be75d84e69bb37775fa5 Mon Sep 17 00:00:00 2001 From: kkewwei Date: Sun, 9 Feb 2025 21:17:05 +0800 Subject: [PATCH] Optimize indexing performance in replica shard Signed-off-by: kkewwei Signed-off-by: kkewwei --- CHANGELOG.md | 1 + .../org/opensearch/backwards/IndexingIT.java | 86 ++++++++++++++++- .../opensearch/action/DocWriteResponse.java | 5 + .../bulk/BulkPrimaryExecutionContext.java | 6 +- .../action/bulk/TransportShardBulkAction.java | 13 ++- .../action/delete/DeleteResponse.java | 62 +++++++++++- .../action/index/IndexResponse.java | 63 ++++++++++++- .../action/update/TransportUpdateAction.java | 9 +- .../action/update/UpdateResponse.java | 60 ++++++++++++ .../org/opensearch/index/engine/Engine.java | 86 +++++++++++++++++ .../index/engine/InternalEngine.java | 94 +++++++++++++++++-- .../index/engine/NRTReplicationEngine.java | 10 +- .../opensearch/index/shard/IndexShard.java | 80 ++++++++++++---- .../RecoveryDuringReplicationTests.java | 3 +- .../index/shard/IndexShardTests.java | 30 ++++-- .../PeerRecoveryTargetServiceTests.java | 3 +- .../indices/recovery/RecoveryTests.java | 12 ++- .../index/engine/TranslogHandler.java | 6 +- .../index/shard/IndexShardTestCase.java | 3 +- 19 files changed, 569 insertions(+), 63 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aa6e7bce8655d..14e9a28df30dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Convert transport-reactor-netty4 to use gradle version catalog [#17233](https://github.com/opensearch-project/OpenSearch/pull/17233) - Increase force merge threads to 1/8th of cores [#17255](https://github.com/opensearch-project/OpenSearch/pull/17255) +- Optimize indexing performance in replica shard [#17371](https://github.com/opensearch-project/OpenSearch/pull/17371) ### Deprecated diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java index 13c2daeec37af..2f80c7ba79908 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java @@ -45,7 +45,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.common.xcontent.support.XContentMapValues; -import org.opensearch.core.common.Strings; +import org.opensearch.index.IndexSettings; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.core.rest.RestStatus; @@ -465,6 +465,90 @@ public void testSyncedFlushTransition() throws Exception { } } + public void testReplicasUsePrimaryIndexingStrategy() throws Exception { + Nodes nodes = buildNodeAndVersions(); + logger.info("cluster discovered:\n {}", nodes.toString()); + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1m") + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2); + final String index = "test-index"; + createIndex(index, settings.build()); + ensureNoInitializingShards(); // wait for all other shard activity to finish + ensureGreen(index); + + int docCount = 200; + try (RestClient nodeClient = buildClient(restClientSettings(), + nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + indexDocs(index, 0, docCount); + + Thread[] indexThreads = new Thread[5]; + for (int i = 0; i < indexThreads.length; i++) { + indexThreads[i] = new Thread(() -> { + try { + int idStart = randomInt(docCount / 2); + indexDocs(index, idStart, idStart + docCount / 2); + if (randomBoolean()) { + // perform a refresh + assertOK(client().performRequest(new Request("POST", index + "/_flush"))); + } + } catch (IOException e) { + throw new AssertionError("failed while indexing [" + e.getMessage() + "]"); + } + }); + indexThreads[i].start(); + } + for (Thread indexThread : indexThreads) { + indexThread.join(); + } + if (randomBoolean()) { + // perform a refresh + assertOK(client().performRequest(new Request("POST", index + "/_flush"))); + } + // verify replica catch up with primary + assertSeqNoOnShards(index, nodes, docCount, nodeClient); + assertSourceEqualWithPrimary(index, docCount); + } + } + + private void assertSourceEqualWithPrimary(final String index, final int expectedCount) throws IOException { + Request primaryRequest = new Request("GET", index + "/_search"); + primaryRequest.addParameter("preference", "_primary"); + primaryRequest.addParameter("size", String.valueOf(expectedCount+100)); + final Response primaryResponse = client().performRequest(primaryRequest); + + Map primaryHits = ObjectPath.createFromResponse(primaryResponse).evaluate("hits"); + Map totals = ObjectPath.evaluate(primaryHits, "total"); + assertEquals(expectedCount, totals.get("values")); + + List primarySources = ObjectPath.evaluate(primaryHits, "hits"); + assertEquals(expectedCount, primarySources.size()); + + Map primarys = new HashMap<>(expectedCount); + for (int i = 0; i < primarySources.size(); i++) { + primarys.put(ObjectPath.evaluate(primarySources.get(i), "_id"), primarySources.get(i)); + } + + + // replicas source + Request replicaRequest = new Request("GET", index + "/_search"); + replicaRequest.addParameter("preference", "_replica"); + replicaRequest.addParameter("size", String.valueOf(expectedCount+100)); + final Response replicaResponse = client().performRequest(replicaRequest); + + Map replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits"); + Map replicaTotals = ObjectPath.evaluate(primaryHits, "total"); + assertEquals(expectedCount, replicaTotals.get("values")); + + List replicaSources = ObjectPath.evaluate(replicaHits, "hits"); + assertEquals(expectedCount, replicaSources.size()); + + for (Object replicaSource : replicaSources) { + String id = ObjectPath.evaluate(replicaSource, "_id").toString(); + assertEquals(primarys.get(id), replicaSource); + } + } + private void assertCount(final String index, final String preference, final int expectedCount) throws IOException { Request request = new Request("GET", index + "/_count"); request.addParameter("preference", preference); diff --git a/server/src/main/java/org/opensearch/action/DocWriteResponse.java b/server/src/main/java/org/opensearch/action/DocWriteResponse.java index aada56ed93fd3..b77645d27d621 100644 --- a/server/src/main/java/org/opensearch/action/DocWriteResponse.java +++ b/server/src/main/java/org/opensearch/action/DocWriteResponse.java @@ -49,6 +49,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.seqno.SequenceNumbers; @@ -392,6 +393,10 @@ protected static void parseInnerToXContent(XContentParser parser, Builder contex } } + public InternalEngine.WriteStrategy writeStrategy() { + return null; + }; + /** * Base class of all {@link DocWriteResponse} builders. These {@link DocWriteResponse.Builder} are used during * xcontent parsing to temporarily store the parsed values, then the {@link Builder#build()} method is called to diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java index 08373481d5711..c40c95e0ae572 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java @@ -276,7 +276,8 @@ public void markOperationAsExecuted(Engine.Result result) { result.getSeqNo(), result.getTerm(), indexResult.getVersion(), - indexResult.isCreated() + indexResult.isCreated(), + indexResult.indexingStrategy() ); } else if (result.getOperationType() == Engine.Operation.TYPE.DELETE) { Engine.DeleteResult deleteResult = (Engine.DeleteResult) result; @@ -286,7 +287,8 @@ public void markOperationAsExecuted(Engine.Result result) { deleteResult.getSeqNo(), result.getTerm(), deleteResult.getVersion(), - deleteResult.isFound() + deleteResult.isFound(), + deleteResult.deletionStrategy() ); } else { diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index efe8df735d769..f7c02b0e21521 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -84,6 +84,7 @@ import org.opensearch.index.IndexingPressureService; import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.index.get.GetResult; import org.opensearch.index.mapper.MapperException; @@ -751,7 +752,8 @@ static BulkItemResponse processUpdateResponse( indexResponse.getSeqNo(), indexResponse.getPrimaryTerm(), indexResponse.getVersion(), - indexResponse.getResult() + indexResponse.getResult(), + indexResponse.writeStrategy() ); if (updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) { @@ -783,7 +785,8 @@ static BulkItemResponse processUpdateResponse( deleteResponse.getSeqNo(), deleteResponse.getPrimaryTerm(), deleteResponse.getVersion(), - deleteResponse.getResult() + deleteResponse.getResult(), + deleteResponse.writeStrategy() ); final GetResult getResult = UpdateHelper.extractGetResult( @@ -880,7 +883,8 @@ private static Engine.Result performOpOnReplica( primaryResponse.getVersion(), indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), - sourceToParse + sourceToParse, + (InternalEngine.IndexingStrategy) primaryResponse.writeStrategy() ); break; case DELETE: @@ -889,7 +893,8 @@ private static Engine.Result performOpOnReplica( primaryResponse.getSeqNo(), primaryResponse.getPrimaryTerm(), primaryResponse.getVersion(), - deleteRequest.id() + deleteRequest.id(), + (InternalEngine.DeletionStrategy) primaryResponse.writeStrategy() ); break; default: diff --git a/server/src/main/java/org/opensearch/action/delete/DeleteResponse.java b/server/src/main/java/org/opensearch/action/delete/DeleteResponse.java index 9534cd1055bcf..1d27caa13e111 100644 --- a/server/src/main/java/org/opensearch/action/delete/DeleteResponse.java +++ b/server/src/main/java/org/opensearch/action/delete/DeleteResponse.java @@ -35,13 +35,16 @@ import org.opensearch.action.DocWriteResponse; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.transport.client.Client; +import org.opensearch.index.engine.InternalEngine; import java.io.IOException; +import static org.opensearch.Version.V_3_0_0; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; /** @@ -54,21 +57,53 @@ */ @PublicApi(since = "1.0.0") public class DeleteResponse extends DocWriteResponse { + private final InternalEngine.DeletionStrategy deletionStrategy; public DeleteResponse(ShardId shardId, StreamInput in) throws IOException { super(shardId, in); + if (in.getVersion().onOrAfter(V_3_0_0)) { + this.deletionStrategy = new InternalEngine.DeletionStrategy(in); + } else { + this.deletionStrategy = null; + } } public DeleteResponse(StreamInput in) throws IOException { super(in); + if (in.getVersion().onOrAfter(V_3_0_0)) { + this.deletionStrategy = new InternalEngine.DeletionStrategy(in); + } else { + this.deletionStrategy = null; + } } public DeleteResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, boolean found) { - this(shardId, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND); + this(shardId, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND, null); + } + + public DeleteResponse( + ShardId shardId, + String id, + long seqNo, + long primaryTerm, + long version, + boolean found, + InternalEngine.DeletionStrategy deletionStrategy + ) { + this(shardId, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND, deletionStrategy); } - private DeleteResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result) { + private DeleteResponse( + ShardId shardId, + String id, + long seqNo, + long primaryTerm, + long version, + Result result, + InternalEngine.DeletionStrategy deletionStrategy + ) { super(shardId, id, seqNo, primaryTerm, version, assertDeletedOrNotFound(result)); + this.deletionStrategy = deletionStrategy; } private static Result assertDeletedOrNotFound(Result result) { @@ -93,6 +128,27 @@ public String toString() { return builder.append("]").toString(); } + @Override + public void writeThin(StreamOutput out) throws IOException { + super.writeThin(out); + if (out.getVersion().onOrAfter(V_3_0_0)) { + deletionStrategy.writeTo(out); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getVersion().onOrAfter(V_3_0_0)) { + deletionStrategy.writeTo(out); + } + } + + @Override + public InternalEngine.DeletionStrategy writeStrategy() { + return deletionStrategy; + } + public static DeleteResponse fromXContent(XContentParser parser) throws IOException { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); @@ -122,7 +178,7 @@ public static class Builder extends DocWriteResponse.Builder { @Override public DeleteResponse build() { - DeleteResponse deleteResponse = new DeleteResponse(shardId, id, seqNo, primaryTerm, version, result); + DeleteResponse deleteResponse = new DeleteResponse(shardId, id, seqNo, primaryTerm, version, result, null); deleteResponse.setForcedRefresh(forcedRefresh); if (shardInfo != null) { deleteResponse.setShardInfo(shardInfo); diff --git a/server/src/main/java/org/opensearch/action/index/IndexResponse.java b/server/src/main/java/org/opensearch/action/index/IndexResponse.java index 0ec4b7dbf539d..925821108fa6e 100644 --- a/server/src/main/java/org/opensearch/action/index/IndexResponse.java +++ b/server/src/main/java/org/opensearch/action/index/IndexResponse.java @@ -36,14 +36,17 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.transport.client.Client; +import org.opensearch.index.engine.InternalEngine; import java.io.IOException; +import static org.opensearch.Version.V_3_0_0; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; /** @@ -57,20 +60,53 @@ @PublicApi(since = "1.0.0") public class IndexResponse extends DocWriteResponse { + private final InternalEngine.IndexingStrategy indexingStrategy; + public IndexResponse(ShardId shardId, StreamInput in) throws IOException { super(shardId, in); + if (in.getVersion().onOrAfter(V_3_0_0)) { + this.indexingStrategy = new InternalEngine.IndexingStrategy(in); + } else { + this.indexingStrategy = null; + } } public IndexResponse(StreamInput in) throws IOException { super(in); + if (in.getVersion().onOrAfter(V_3_0_0)) { + this.indexingStrategy = new InternalEngine.IndexingStrategy(in); + } else { + this.indexingStrategy = null; + } } public IndexResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, boolean created) { - this(shardId, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED); + this(shardId, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED, null); + } + + public IndexResponse( + ShardId shardId, + String id, + long seqNo, + long primaryTerm, + long version, + boolean created, + InternalEngine.IndexingStrategy indexingStrategy + ) { + this(shardId, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED, indexingStrategy); } - private IndexResponse(ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result) { + private IndexResponse( + ShardId shardId, + String id, + long seqNo, + long primaryTerm, + long version, + Result result, + InternalEngine.IndexingStrategy indexingStrategy + ) { super(shardId, id, seqNo, primaryTerm, version, assertCreatedOrUpdated(result)); + this.indexingStrategy = indexingStrategy; } private static Result assertCreatedOrUpdated(Result result) { @@ -114,6 +150,27 @@ public static void parseXContentFields(XContentParser parser, Builder context) t DocWriteResponse.parseInnerToXContent(parser, context); } + @Override + public void writeThin(StreamOutput out) throws IOException { + super.writeThin(out); + if (out.getVersion().onOrAfter(V_3_0_0)) { + indexingStrategy.writeTo(out); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getVersion().onOrAfter(V_3_0_0)) { + indexingStrategy.writeTo(out); + } + } + + @Override + public InternalEngine.IndexingStrategy writeStrategy() { + return indexingStrategy; + } + /** * Builder class for {@link IndexResponse}. This builder is usually used during xcontent parsing to * temporarily store the parsed values, then the {@link Builder#build()} method is called to @@ -125,7 +182,7 @@ public static void parseXContentFields(XContentParser parser, Builder context) t public static class Builder extends DocWriteResponse.Builder { @Override public IndexResponse build() { - IndexResponse indexResponse = new IndexResponse(shardId, id, seqNo, primaryTerm, version, result); + IndexResponse indexResponse = new IndexResponse(shardId, id, seqNo, primaryTerm, version, result, null); indexResponse.setForcedRefresh(forcedRefresh); if (shardInfo != null) { indexResponse.setShardInfo(shardInfo); diff --git a/server/src/main/java/org/opensearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/opensearch/action/update/TransportUpdateAction.java index a8fad74b3b091..4489804462035 100644 --- a/server/src/main/java/org/opensearch/action/update/TransportUpdateAction.java +++ b/server/src/main/java/org/opensearch/action/update/TransportUpdateAction.java @@ -247,7 +247,8 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), - response.getResult() + response.getResult(), + response.writeStrategy() ); if (request.fetchSource() != null && request.fetchSource().fetchSource()) { Tuple> sourceAndContent = XContentHelper.convertToMap( @@ -296,7 +297,8 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), - response.getResult() + response.getResult(), + response.writeStrategy() ); update.setGetResult( UpdateHelper.extractGetResult( @@ -324,7 +326,8 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), - response.getResult() + response.getResult(), + response.writeStrategy() ); update.setGetResult( UpdateHelper.extractGetResult( diff --git a/server/src/main/java/org/opensearch/action/update/UpdateResponse.java b/server/src/main/java/org/opensearch/action/update/UpdateResponse.java index c7ee1742af0f2..321f6a4826c6f 100644 --- a/server/src/main/java/org/opensearch/action/update/UpdateResponse.java +++ b/server/src/main/java/org/opensearch/action/update/UpdateResponse.java @@ -40,10 +40,12 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.get.GetResult; import java.io.IOException; +import static org.opensearch.Version.V_3_0_0; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; /** @@ -58,11 +60,22 @@ public class UpdateResponse extends DocWriteResponse { private GetResult getResult; + private final InternalEngine.WriteStrategy writeStrategy; + public UpdateResponse(ShardId shardId, StreamInput in) throws IOException { super(shardId, in); if (in.readBoolean()) { getResult = new GetResult(in); } + if (in.getVersion().onOrAfter(V_3_0_0)) { + if (in.readBoolean()) { + this.writeStrategy = new InternalEngine.IndexingStrategy(in); + } else { + this.writeStrategy = new InternalEngine.DeletionStrategy(in); + } + } else { + this.writeStrategy = null; + } } public UpdateResponse(StreamInput in) throws IOException { @@ -70,6 +83,15 @@ public UpdateResponse(StreamInput in) throws IOException { if (in.readBoolean()) { getResult = new GetResult(in); } + if (in.getVersion().onOrAfter(V_3_0_0)) { + if (in.readBoolean()) { + this.writeStrategy = new InternalEngine.IndexingStrategy(in); + } else { + this.writeStrategy = new InternalEngine.DeletionStrategy(in); + } + } else { + this.writeStrategy = null; + } } /** @@ -81,8 +103,22 @@ public UpdateResponse(ShardId shardId, String id, long seqNo, long primaryTerm, } public UpdateResponse(ShardInfo shardInfo, ShardId shardId, String id, long seqNo, long primaryTerm, long version, Result result) { + this(shardInfo, shardId, id, seqNo, primaryTerm, version, result, null); + } + + public UpdateResponse( + ShardInfo shardInfo, + ShardId shardId, + String id, + long seqNo, + long primaryTerm, + long version, + Result result, + InternalEngine.WriteStrategy writeStrategy + ) { super(shardId, id, seqNo, primaryTerm, version, result); setShardInfo(shardInfo); + this.writeStrategy = writeStrategy; } public void setGetResult(GetResult getResult) { @@ -102,12 +138,31 @@ public RestStatus status() { public void writeThin(StreamOutput out) throws IOException { super.writeThin(out); writeGetResult(out); + if (out.getVersion().onOrAfter(V_3_0_0)) { + if (writeStrategy instanceof InternalEngine.IndexingStrategy) { + out.writeBoolean(true); + ((InternalEngine.IndexingStrategy) writeStrategy).writeTo(out); + } else { + out.writeBoolean(false); + ((InternalEngine.DeletionStrategy) writeStrategy).writeTo(out); + } + } + } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); writeGetResult(out); + if (out.getVersion().onOrAfter(V_3_0_0)) { + if (writeStrategy instanceof InternalEngine.IndexingStrategy) { + out.writeBoolean(true); + ((InternalEngine.IndexingStrategy) writeStrategy).writeTo(out); + } else { + out.writeBoolean(false); + ((InternalEngine.DeletionStrategy) writeStrategy).writeTo(out); + } + } } private void writeGetResult(StreamOutput out) throws IOException { @@ -130,6 +185,11 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t return builder; } + @Override + public InternalEngine.WriteStrategy writeStrategy() { + return writeStrategy; + }; + @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index db08ea1164f68..347c85331ee97 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -602,10 +602,18 @@ public enum Type { public static class IndexResult extends Result { private final boolean created; + private final InternalEngine.IndexingStrategy indexingStrategy; + + public IndexResult(long version, long term, long seqNo, boolean created, InternalEngine.IndexingStrategy plan) { + super(Operation.TYPE.INDEX, version, term, seqNo); + this.created = created; + this.indexingStrategy = plan; + } public IndexResult(long version, long term, long seqNo, boolean created) { super(Operation.TYPE.INDEX, version, term, seqNo); this.created = created; + this.indexingStrategy = null; } /** @@ -618,17 +626,23 @@ public IndexResult(Exception failure, long version) { public IndexResult(Exception failure, long version, long term, long seqNo) { super(Operation.TYPE.INDEX, failure, version, term, seqNo); this.created = false; + this.indexingStrategy = null; } public IndexResult(Mapping requiredMappingUpdate) { super(Operation.TYPE.INDEX, requiredMappingUpdate); this.created = false; + this.indexingStrategy = null; } public boolean isCreated() { return created; } + public InternalEngine.IndexingStrategy indexingStrategy() { + return indexingStrategy; + } + } /** @@ -640,10 +654,16 @@ public boolean isCreated() { public static class DeleteResult extends Result { private final boolean found; + private final InternalEngine.DeletionStrategy deletionStrategy; public DeleteResult(long version, long term, long seqNo, boolean found) { + this(version, term, seqNo, found, null); + } + + public DeleteResult(long version, long term, long seqNo, boolean found, InternalEngine.DeletionStrategy deletionStrategy) { super(Operation.TYPE.DELETE, version, term, seqNo); this.found = found; + this.deletionStrategy = deletionStrategy; } /** @@ -656,17 +676,23 @@ public DeleteResult(Exception failure, long version, long term) { public DeleteResult(Exception failure, long version, long term, long seqNo, boolean found) { super(Operation.TYPE.DELETE, failure, version, term, seqNo); this.found = found; + this.deletionStrategy = null; } public DeleteResult(Mapping requiredMappingUpdate) { super(Operation.TYPE.DELETE, requiredMappingUpdate); this.found = false; + this.deletionStrategy = null; } public boolean isFound() { return found; } + public InternalEngine.DeletionStrategy deletionStrategy() { + return deletionStrategy; + } + } /** @@ -1599,6 +1625,7 @@ public static class Index extends Operation { private final boolean isRetry; private final long ifSeqNo; private final long ifPrimaryTerm; + private final InternalEngine.IndexingStrategy indexingStrategy; public Index( Term uid, @@ -1613,6 +1640,38 @@ public Index( boolean isRetry, long ifSeqNo, long ifPrimaryTerm + ) { + this( + uid, + doc, + seqNo, + primaryTerm, + version, + versionType, + origin, + startTime, + autoGeneratedIdTimestamp, + isRetry, + ifSeqNo, + ifPrimaryTerm, + null + ); + } + + public Index( + Term uid, + ParsedDocument doc, + long seqNo, + long primaryTerm, + long version, + VersionType versionType, + Origin origin, + long startTime, + long autoGeneratedIdTimestamp, + boolean isRetry, + long ifSeqNo, + long ifPrimaryTerm, + InternalEngine.IndexingStrategy indexingStrategy ) { super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin; @@ -1625,6 +1684,7 @@ public Index( this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp; this.ifSeqNo = ifSeqNo; this.ifPrimaryTerm = ifPrimaryTerm; + this.indexingStrategy = indexingStrategy; } public Index(Term uid, long primaryTerm, ParsedDocument doc) { @@ -1703,6 +1763,10 @@ public long getIfSeqNo() { public long getIfPrimaryTerm() { return ifPrimaryTerm; } + + public InternalEngine.IndexingStrategy indexingStrategy() { + return indexingStrategy; + } } /** @@ -1716,6 +1780,7 @@ public static class Delete extends Operation { private final String id; private final long ifSeqNo; private final long ifPrimaryTerm; + private final InternalEngine.DeletionStrategy deletionStrategy; public Delete( String id, @@ -1728,6 +1793,22 @@ public Delete( long startTime, long ifSeqNo, long ifPrimaryTerm + ) { + this(id, uid, seqNo, primaryTerm, version, versionType, origin, startTime, ifSeqNo, ifPrimaryTerm, null); + } + + public Delete( + String id, + Term uid, + long seqNo, + long primaryTerm, + long version, + VersionType versionType, + Origin origin, + long startTime, + long ifSeqNo, + long ifPrimaryTerm, + InternalEngine.DeletionStrategy deletionStrategy ) { super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin; @@ -1738,6 +1819,7 @@ public Delete( this.id = Objects.requireNonNull(id); this.ifSeqNo = ifSeqNo; this.ifPrimaryTerm = ifPrimaryTerm; + this.deletionStrategy = deletionStrategy; } public Delete(String id, Term uid, long primaryTerm) { @@ -1792,6 +1874,10 @@ public long getIfSeqNo() { public long getIfPrimaryTerm() { return ifPrimaryTerm; } + + public InternalEngine.DeletionStrategy deletionStrategy() { + return deletionStrategy; + } } /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index ff790fa1513f1..cf8e4ef148102 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -70,6 +70,7 @@ import org.opensearch.common.Booleans; import org.opensearch.common.Nullable; import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.LoggerInfoStream; @@ -86,6 +87,9 @@ import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.Assertions; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.AppendOnlyIndexOperationRetryException; import org.opensearch.core.index.shard.ShardId; @@ -881,7 +885,11 @@ public IndexResult index(Index index) throws IOException { * or calls updateDocument. */ final IndexingStrategy plan = indexingStrategyForOperation(index); - reservedDocs = plan.reservedDocs; + if (index.origin() == Operation.Origin.REPLICA && engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode() == false) { + reservedDocs = 0; + } else { + reservedDocs = plan.reservedDocs; + } final IndexResult indexResult; if (plan.earlyResultOnPreFlightError.isPresent()) { @@ -903,7 +911,8 @@ public IndexResult index(Index index) throws IOException { index.getAutoGeneratedIdTimestamp(), index.isRetry(), index.getIfSeqNo(), - index.getIfPrimaryTerm() + index.getIfPrimaryTerm(), + index.indexingStrategy() ); final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; @@ -923,7 +932,8 @@ public IndexResult index(Index index) throws IOException { plan.versionForIndexing, index.primaryTerm(), index.seqNo(), - plan.currentNotFoundOrDeleted + plan.currentNotFoundOrDeleted, + plan ); } @@ -1011,6 +1021,11 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO } else { boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode(); versionMap.enforceSafeAccess(); + if (index.origin() == Operation.Origin.REPLICA && segRepEnabled == false) { + assert index.indexingStrategy() != null; + return index.indexingStrategy(); + } + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { if (segRepEnabled) { @@ -1138,7 +1153,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws I assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); addDocs(index.docs(), indexWriter); } - return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted); + return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted, plan); } catch (Exception ex) { if (ex instanceof AlreadyClosedException == false && indexWriter.getTragicException() == null @@ -1221,7 +1236,8 @@ private void addStaleDocs(final List docs, final IndexWri * * @opensearch.internal */ - protected static final class IndexingStrategy { + @PublicApi(since = "3.0.0") + public static final class IndexingStrategy implements Writeable, WriteStrategy { final boolean currentNotFoundOrDeleted; final boolean useLuceneUpdateDocument; final long versionForIndexing; @@ -1259,6 +1275,17 @@ private IndexingStrategy( : Optional.of(earlyResultOnPreFlightError); } + public IndexingStrategy(StreamInput in) throws IOException { + this.currentNotFoundOrDeleted = in.readBoolean(); + this.useLuceneUpdateDocument = in.readBoolean(); + this.versionForIndexing = in.readVLong(); + this.indexIntoLucene = in.readBoolean(); + this.addStaleOpToLucene = in.readBoolean(); + this.reservedDocs = in.readVInt(); + this.earlyResultOnPreFlightError = Optional.empty(); + assert reservedDocs == 0 || indexIntoLucene || addStaleOpToLucene : reservedDocs; + } + static IndexingStrategy optimizedAppendOnly(long versionForIndexing, int reservedDocs) { return new IndexingStrategy(true, false, true, false, versionForIndexing, reservedDocs, null); } @@ -1300,6 +1327,16 @@ static IndexingStrategy failAsTooManyDocs(Exception e) { static IndexingStrategy failAsIndexAppendOnly(IndexResult result, long versionForIndexing, int reservedDocs) { return new IndexingStrategy(false, false, false, true, versionForIndexing, reservedDocs, result); } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(currentNotFoundOrDeleted); + out.writeBoolean(useLuceneUpdateDocument); + out.writeVLong(versionForIndexing); + out.writeBoolean(indexIntoLucene); + out.writeBoolean(addStaleOpToLucene); + out.writeVInt(reservedDocs); + } } /** @@ -1353,7 +1390,11 @@ public DeleteResult delete(Delete delete) throws IOException { ensureOpen(); lastWriteNanos = delete.startTime(); final DeletionStrategy plan = deletionStrategyForOperation(delete); - reservedDocs = plan.reservedDocs; + if (delete.origin() == Operation.Origin.REPLICA && engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode() == false) { + reservedDocs = 0; + } else { + reservedDocs = plan.reservedDocs; + } if (plan.earlyResultOnPreflightError.isPresent()) { assert delete.origin() == Operation.Origin.PRIMARY : delete.origin(); deleteResult = plan.earlyResultOnPreflightError.get(); @@ -1370,7 +1411,8 @@ public DeleteResult delete(Delete delete) throws IOException { delete.origin(), delete.startTime(), delete.getIfSeqNo(), - delete.getIfPrimaryTerm() + delete.getIfPrimaryTerm(), + delete.deletionStrategy() ); advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(delete.seqNo()); @@ -1399,7 +1441,8 @@ public DeleteResult delete(Delete delete) throws IOException { plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), - plan.currentlyDeleted == false + plan.currentlyDeleted == false, + plan ); } } @@ -1477,6 +1520,10 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws plan = DeletionStrategy.processButSkipLucene(false, delete.version()); } else { boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode(); + if (delete.origin() == Operation.Origin.REPLICA && segRepEnabled == false) { + assert delete.deletionStrategy() != null; + return delete.deletionStrategy(); + } final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { if (segRepEnabled) { @@ -1572,7 +1619,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws } else { indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField); } - return new DeleteResult(plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false); + return new DeleteResult(plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false, plan); } catch (final Exception ex) { /* * Document level failures when deleting are unexpected, we likely hit something fatal such as the Lucene index being corrupt, @@ -1597,7 +1644,8 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws * * @opensearch.internal */ - protected static final class DeletionStrategy { + @PublicApi(since = "3.0.0") + public static final class DeletionStrategy implements Writeable, WriteStrategy { // of a rare double delete final boolean deleteFromLucene; final boolean addStaleOpToLucene; @@ -1631,6 +1679,15 @@ private DeletionStrategy( : Optional.of(earlyResultOnPreflightError); } + public DeletionStrategy(StreamInput in) throws IOException { + this.deleteFromLucene = in.readBoolean(); + this.addStaleOpToLucene = in.readBoolean(); + this.currentlyDeleted = in.readBoolean(); + this.versionOfDeletion = in.readVLong(); + this.reservedDocs = in.readVInt(); + this.earlyResultOnPreflightError = Optional.empty(); + } + public static DeletionStrategy skipDueToVersionConflict( VersionConflictEngineException e, long currentVersion, @@ -1669,8 +1726,25 @@ static DeletionStrategy failAsTooManyDocs(Exception e) { ); return new DeletionStrategy(false, false, false, Versions.NOT_FOUND, 0, deleteResult); } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(deleteFromLucene); + out.writeBoolean(addStaleOpToLucene); + out.writeBoolean(currentlyDeleted); + out.writeVLong(versionOfDeletion); + out.writeVInt(reservedDocs); + } } + /** + * The write Strategy + * + * @opensearch.internal + */ + @PublicApi(since = "3.0.0") + public interface WriteStrategy {} + @Override public void maybePruneDeletes() { // It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index d759423ce5a55..4318640561030 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -231,7 +231,7 @@ public boolean isThrottled() { @Override public IndexResult index(Index index) throws IOException { ensureOpen(); - IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false); + IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false, index.indexingStrategy()); final Translog.Location location = translogManager.add(new Translog.Index(index, indexResult)); indexResult.setTranslogLocation(location); indexResult.setTook(System.nanoTime() - index.startTime()); @@ -243,7 +243,13 @@ public IndexResult index(Index index) throws IOException { @Override public DeleteResult delete(Delete delete) throws IOException { ensureOpen(); - DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true); + DeleteResult deleteResult = new DeleteResult( + delete.version(), + delete.primaryTerm(), + delete.seqNo(), + true, + delete.deletionStrategy() + ); final Translog.Location location = translogManager.add(new Translog.Delete(delete, deleteResult)); deleteResult.setTranslogLocation(location); deleteResult.setTook(System.nanoTime() - delete.startTime()); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index df841dac4cf8e..b396eb017f652 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -126,6 +126,7 @@ import org.opensearch.index.engine.EngineConfigFactory; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.EngineFactory; +import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.ReadOnlyEngine; import org.opensearch.index.engine.RefreshFailedEngineException; @@ -1054,6 +1055,7 @@ public Engine.IndexResult applyIndexOperationOnPrimary( isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse, + null, null ); } @@ -1065,7 +1067,8 @@ public Engine.IndexResult applyIndexOperationOnReplica( long version, long autoGeneratedTimeStamp, boolean isRetry, - SourceToParse sourceToParse + SourceToParse sourceToParse, + InternalEngine.IndexingStrategy indexingStrategy ) throws IOException { return applyIndexOperation( getEngine(), @@ -1079,7 +1082,8 @@ public Engine.IndexResult applyIndexOperationOnReplica( isRetry, Engine.Operation.Origin.REPLICA, sourceToParse, - id + id, + indexingStrategy ); } @@ -1095,7 +1099,8 @@ private Engine.IndexResult applyIndexOperation( boolean isRetry, Engine.Operation.Origin origin, SourceToParse sourceToParse, - String id + String id, + InternalEngine.IndexingStrategy indexingStrategy ) throws IOException { // For Segment Replication enabled replica shards we can be skip parsing the documents as we directly copy segments from primary @@ -1113,7 +1118,8 @@ private Engine.IndexResult applyIndexOperation( autoGeneratedTimeStamp, isRetry, UNASSIGNED_SEQ_NO, - 0 + 0, + indexingStrategy ); return getEngine().index(index); } @@ -1136,7 +1142,8 @@ private Engine.IndexResult applyIndexOperation( autoGeneratedTimeStamp, isRetry, ifSeqNo, - ifPrimaryTerm + ifPrimaryTerm, + indexingStrategy ); Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { @@ -1165,7 +1172,8 @@ public static Engine.Index prepareIndex( long autoGeneratedIdTimestamp, boolean isRetry, long ifSeqNo, - long ifPrimaryTerm + long ifPrimaryTerm, + InternalEngine.IndexingStrategy indexingStrategy ) { long startTime = System.nanoTime(); ParsedDocument doc = docMapper.getDocumentMapper().parse(source); @@ -1185,7 +1193,8 @@ public static Engine.Index prepareIndex( autoGeneratedIdTimestamp, isRetry, ifSeqNo, - ifPrimaryTerm + ifPrimaryTerm, + indexingStrategy ); } @@ -1294,11 +1303,22 @@ public Engine.DeleteResult applyDeleteOperationOnPrimary( versionType, ifSeqNo, ifPrimaryTerm, - Engine.Operation.Origin.PRIMARY + Engine.Operation.Origin.PRIMARY, + null ); } public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long opPrimaryTerm, long version, String id) throws IOException { + return applyDeleteOperationOnReplica(seqNo, opPrimaryTerm, version, id, null); + } + + public Engine.DeleteResult applyDeleteOperationOnReplica( + long seqNo, + long opPrimaryTerm, + long version, + String id, + InternalEngine.DeletionStrategy deletionStrategy + ) throws IOException { if (indexSettings.isSegRepEnabledOrRemoteNode()) { final Engine.Delete delete = new Engine.Delete( id, @@ -1310,7 +1330,8 @@ public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long opPrim Engine.Operation.Origin.REPLICA, System.nanoTime(), UNASSIGNED_SEQ_NO, - 0 + 0, + deletionStrategy ); return getEngine().delete(delete); } @@ -1323,7 +1344,8 @@ public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long opPrim null, UNASSIGNED_SEQ_NO, 0, - Engine.Operation.Origin.REPLICA + Engine.Operation.Origin.REPLICA, + deletionStrategy ); } @@ -1336,7 +1358,8 @@ private Engine.DeleteResult applyDeleteOperation( @Nullable VersionType versionType, long ifSeqNo, long ifPrimaryTerm, - Engine.Operation.Origin origin + Engine.Operation.Origin origin, + InternalEngine.DeletionStrategy deletionStrategy ) throws IOException { assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ " + opPrimaryTerm @@ -1344,7 +1367,17 @@ private Engine.DeleteResult applyDeleteOperation( + getOperationPrimaryTerm() + "]"; ensureWriteAllowed(origin); - final Engine.Delete delete = prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm); + final Engine.Delete delete = prepareDelete( + id, + seqNo, + opPrimaryTerm, + version, + versionType, + origin, + ifSeqNo, + ifPrimaryTerm, + deletionStrategy + ); return delete(engine, delete); } @@ -1356,11 +1389,24 @@ public static Engine.Delete prepareDelete( VersionType versionType, Engine.Operation.Origin origin, long ifSeqNo, - long ifPrimaryTerm + long ifPrimaryTerm, + InternalEngine.DeletionStrategy deletionStrategy ) { long startTime = System.nanoTime(); final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id)); - return new Engine.Delete(id, uid, seqNo, primaryTerm, version, versionType, origin, startTime, ifSeqNo, ifPrimaryTerm); + return new Engine.Delete( + id, + uid, + seqNo, + primaryTerm, + version, + versionType, + origin, + startTime, + ifSeqNo, + ifPrimaryTerm, + deletionStrategy + ); } private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException { @@ -2417,7 +2463,8 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o MediaTypeRegistry.xContentType(index.source()), index.routing() ), - index.id() + index.id(), + null ); break; case DELETE: @@ -2431,7 +2478,8 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o versionType, UNASSIGNED_SEQ_NO, 0, - origin + origin, + null ); break; case NO_OP: diff --git a/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java index b891ac63378ac..45c9f4757941b 100644 --- a/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java @@ -160,7 +160,8 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception { 1, randomNonNegativeLong(), false, - new SourceToParse("index", "replica", new BytesArray("{}"), MediaTypeRegistry.JSON) + new SourceToParse("index", "replica", new BytesArray("{}"), MediaTypeRegistry.JSON), + null ); shards.promoteReplicaToPrimary(promotedReplica).get(); oldPrimary.close("demoted", randomBoolean(), false); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 7614a54da52bf..613a5ce6647b0 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2344,7 +2344,8 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - new SourceToParse(shard.shardId().getIndexName(), "id", new BytesArray("{}"), MediaTypeRegistry.JSON) + new SourceToParse(shard.shardId().getIndexName(), "id", new BytesArray("{}"), MediaTypeRegistry.JSON), + null ); shard.applyIndexOperationOnReplica( UUID.randomUUID().toString(), @@ -2353,7 +2354,8 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { 3, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - new SourceToParse(shard.shardId().getIndexName(), "id-3", new BytesArray("{}"), MediaTypeRegistry.JSON) + new SourceToParse(shard.shardId().getIndexName(), "id-3", new BytesArray("{}"), MediaTypeRegistry.JSON), + null ); // Flushing a new commit with local checkpoint=1 allows to skip the translog gen #1 in recovery. shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); @@ -2364,7 +2366,8 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { 3, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - new SourceToParse(shard.shardId().getIndexName(), "id-2", new BytesArray("{}"), MediaTypeRegistry.JSON) + new SourceToParse(shard.shardId().getIndexName(), "id-2", new BytesArray("{}"), MediaTypeRegistry.JSON), + null ); shard.applyIndexOperationOnReplica( UUID.randomUUID().toString(), @@ -2373,7 +2376,8 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - new SourceToParse(shard.shardId().getIndexName(), "id-5", new BytesArray("{}"), MediaTypeRegistry.JSON) + new SourceToParse(shard.shardId().getIndexName(), "id-5", new BytesArray("{}"), MediaTypeRegistry.JSON), + null ); shard.sync(); // advance local checkpoint @@ -2521,7 +2525,8 @@ public void testRecoverFromStoreWithNoOps() throws IOException { 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - sourceToParse + sourceToParse, + null ); final ShardRouting primaryShardRouting = shard.routingEntry(); @@ -2649,7 +2654,8 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - new SourceToParse(indexName, "doc-0", new BytesArray("{}"), MediaTypeRegistry.JSON) + new SourceToParse(indexName, "doc-0", new BytesArray("{}"), MediaTypeRegistry.JSON), + null ); flushShard(shard); shard.updateGlobalCheckpointOnReplica(0, "test"); // stick the global checkpoint here. @@ -2660,7 +2666,8 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - new SourceToParse(indexName, "doc-1", new BytesArray("{}"), MediaTypeRegistry.JSON) + new SourceToParse(indexName, "doc-1", new BytesArray("{}"), MediaTypeRegistry.JSON), + null ); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1")); @@ -2673,7 +2680,8 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - new SourceToParse(indexName, "doc-2", new BytesArray("{}"), MediaTypeRegistry.JSON) + new SourceToParse(indexName, "doc-2", new BytesArray("{}"), MediaTypeRegistry.JSON), + null ); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1", "doc-2")); @@ -4155,7 +4163,8 @@ private Result indexOnReplicaWithGaps(final IndexShard indexShard, final int ope 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - sourceToParse + sourceToParse, + null ); if (!gap && i == localCheckpoint + 1) { localCheckpoint++; @@ -4780,7 +4789,8 @@ public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception { 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - new SourceToParse(shard.shardId.getIndexName(), Long.toString(i), new BytesArray("{}"), MediaTypeRegistry.JSON) + new SourceToParse(shard.shardId.getIndexName(), Long.toString(i), new BytesArray("{}"), MediaTypeRegistry.JSON), + null ); shard.updateGlobalCheckpointOnReplica(shard.getLocalCheckpoint(), "test"); if (randomInt(100) < 10) { diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index a8e5a02011538..5962af7825370 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -190,7 +190,8 @@ private SeqNoStats populateRandomData(IndexShard shard) throws IOException { shard.getOperationPrimaryTerm(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - new SourceToParse(shard.shardId().getIndexName(), UUIDs.randomBase64UUID(), new BytesArray("{}"), MediaTypeRegistry.JSON) + new SourceToParse(shard.shardId().getIndexName(), UUIDs.randomBase64UUID(), new BytesArray("{}"), MediaTypeRegistry.JSON), + null ); if (randomInt(100) < 5) { shard.flush(new FlushRequest().waitIfOngoing(true)); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index 71d89e2856c6e..71b07ed192024 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -190,7 +190,8 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - new SourceToParse(indexName, "id", new BytesArray("{}"), MediaTypeRegistry.JSON) + new SourceToParse(indexName, "id", new BytesArray("{}"), MediaTypeRegistry.JSON), + null ); // index #3 orgReplica.applyIndexOperationOnReplica( @@ -200,7 +201,8 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - new SourceToParse(indexName, "id-3", new BytesArray("{}"), MediaTypeRegistry.JSON) + new SourceToParse(indexName, "id-3", new BytesArray("{}"), MediaTypeRegistry.JSON), + null ); // Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1. orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true)); @@ -212,7 +214,8 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - new SourceToParse(indexName, "id-2", new BytesArray("{}"), MediaTypeRegistry.JSON) + new SourceToParse(indexName, "id-2", new BytesArray("{}"), MediaTypeRegistry.JSON), + null ); orgReplica.sync(); // advance local checkpoint orgReplica.updateGlobalCheckpointOnReplica(3L, "test"); @@ -224,7 +227,8 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - new SourceToParse(indexName, "id-5", new BytesArray("{}"), MediaTypeRegistry.JSON) + new SourceToParse(indexName, "id-5", new BytesArray("{}"), MediaTypeRegistry.JSON), + null ); if (randomBoolean()) { diff --git a/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java index 9e4e59d9a4d15..225b5a5ad3722 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java @@ -152,7 +152,8 @@ public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.O index.getAutoGeneratedIdTimestamp(), true, SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + null ); return engineIndex; case DELETE: @@ -165,7 +166,8 @@ public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.O versionType, origin, SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + null ); case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) operation; diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 655a9eb7d5d38..4214508916f93 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1335,7 +1335,8 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - sourceToParse + sourceToParse, + null ); shard.sync(); // advance local checkpoint if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {