diff --git a/CHANGELOG.md b/CHANGELOG.md index 3aee4b27e28f8..a131422b6b62b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -79,6 +79,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763)) - Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606)) - Always use `constant_score` query for `match_only_text` field ([#16964](https://github.com/opensearch-project/OpenSearch/pull/16964)) +- Fix Shallow copy snapshot failures on closed index ([#16868](https://github.com/opensearch-project/OpenSearch/pull/16868)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index ebb911c739eb3..1c4585e38ee90 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -39,6 +39,9 @@ import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.snapshots.SnapshotInfo; +import org.opensearch.snapshots.SnapshotState; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; @@ -1078,4 +1081,79 @@ public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws Interrup Thread.sleep(10000); ensureGreen(INDEX_NAME); } + + public void testSuccessfulShallowV1SnapshotPostIndexClose() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + String dataNode = internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); + ensureGreen(INDEX_NAME); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "0ms")); + + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + logger.info("Create shallow snapshot setting enabled repo"); + String shallowSnapshotRepoName = "shallow-snapshot-repo-name"; + Path shallowSnapshotRepoPath = randomRepoPath(); + Settings.Builder settings = Settings.builder() + .put("location", shallowSnapshotRepoPath) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE); + createRepository(shallowSnapshotRepoName, "fs", settings); + + for (int i = 0; i < 10; i++) { + indexBulk(INDEX_NAME, 1); + } + flushAndRefresh(INDEX_NAME); + + logger.info("Verify shallow snapshot created before close"); + final String snapshot1 = "snapshot1"; + SnapshotInfo snapshotInfo1 = internalCluster().client() + .admin() + .cluster() + .prepareCreateSnapshot(shallowSnapshotRepoName, snapshot1) + .setIndices(INDEX_NAME) + .setWaitForCompletion(true) + .get() + .getSnapshotInfo(); + + assertEquals(SnapshotState.SUCCESS, snapshotInfo1.state()); + assertTrue(snapshotInfo1.successfulShards() > 0); + assertEquals(0, snapshotInfo1.failedShards()); + + for (int i = 0; i < 10; i++) { + indexBulk(INDEX_NAME, 1); + } + + // close index + client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet(); + Thread.sleep(1000); + logger.info("Verify shallow snapshot created after close"); + final String snapshot2 = "snapshot2"; + + SnapshotInfo snapshotInfo2 = internalCluster().client() + .admin() + .cluster() + .prepareCreateSnapshot(shallowSnapshotRepoName, snapshot2) + .setIndices(INDEX_NAME) + .setWaitForCompletion(true) + .get() + .getSnapshotInfo(); + + assertEquals(SnapshotState.SUCCESS, snapshotInfo2.state()); + assertTrue(snapshotInfo2.successfulShards() > 0); + assertEquals(0, snapshotInfo2.failedShards()); + + // delete the index + cluster().wipeIndices(INDEX_NAME); + // try restoring the snapshot + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(shallowSnapshotRepoName, snapshot2) + .setWaitForCompletion(true) + .execute() + .actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + ensureGreen(INDEX_NAME); + flushAndRefresh(INDEX_NAME); + assertBusy(() -> { assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), 20); }); + } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index baa1351f15cda..c0d9f1bca1223 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1631,6 +1631,22 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() { return luceneVersion == null ? indexSettings.getIndexVersionCreated().luceneVersion : luceneVersion; } + /** + * Fetches the last remote uploaded segment metadata file + * @return {@link RemoteSegmentMetadata} + * @throws IOException + */ + public RemoteSegmentMetadata fetchLastRemoteUploadedSegmentMetadata() throws IOException { + if (!indexSettings.isAssignedOnRemoteNode()) { + throw new IllegalStateException("Index is not assigned on Remote Node"); + } + RemoteSegmentMetadata lastUploadedMetadata = getRemoteDirectory().readLatestMetadataFile(); + if (lastUploadedMetadata == null) { + throw new FileNotFoundException("No metadata file found in remote store"); + } + return lastUploadedMetadata; + } + /** * Creates a new {@link IndexCommit} snapshot from the currently running engine. All resources referenced by this * commit won't be freed until the commit / snapshot is closed. diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 3c24d1965744a..64a64869df7eb 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -429,6 +429,45 @@ default void snapshotRemoteStoreIndexShard( throw new UnsupportedOperationException(); } + /** + * Adds a reference of remote store data for a index commit point. + *

+ * The index commit point can be obtained by using {@link org.opensearch.index.engine.Engine#acquireLastIndexCommit} method. + * Or for closed index can be obtained by reading last remote uploaded metadata by using {@link org.opensearch.index.shard.IndexShard#fetchLastRemoteUploadedSegmentMetadata()} method. + * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. + *

+ * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check + * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted. + * @param store store to be snapshotted + * @param snapshotId snapshot id + * @param indexId id for the index being snapshotted + * @param snapshotIndexCommit commit point + * @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used + * to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier + * snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit} + * @param snapshotStatus snapshot status + * @param primaryTerm current Primary Term + * @param commitGeneration current commit generation + * @param startTime start time of the snapshot commit, this will be used as the start time for snapshot. + * @param indexFilesToFileLengthMap map of index files to file length + * @param listener listener invoked on completion + */ + default void snapshotRemoteStoreIndexShard( + Store store, + SnapshotId snapshotId, + IndexId indexId, + @Nullable IndexCommit snapshotIndexCommit, + @Nullable String shardStateIdentifier, + IndexShardSnapshotStatus snapshotStatus, + long primaryTerm, + long commitGeneration, + long startTime, + @Nullable Map indexFilesToFileLengthMap, + ActionListener listener + ) { + throw new UnsupportedOperationException(); + } + /** * Restores snapshot of the shard. *

diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 0cbe49690e773..abb840fef5578 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -3843,6 +3843,33 @@ private void writeAtomic(BlobContainer container, final String blobName, final B } } + @Override + public void snapshotRemoteStoreIndexShard( + Store store, + SnapshotId snapshotId, + IndexId indexId, + IndexCommit snapshotIndexCommit, + @Nullable String shardStateIdentifier, + IndexShardSnapshotStatus snapshotStatus, + long primaryTerm, + long startTime, + ActionListener listener + ) { + snapshotRemoteStoreIndexShard( + store, + snapshotId, + indexId, + snapshotIndexCommit, + shardStateIdentifier, + snapshotStatus, + primaryTerm, + snapshotIndexCommit.getGeneration(), + startTime, + null, + listener + ); + } + @Override public void snapshotRemoteStoreIndexShard( Store store, @@ -3852,13 +3879,16 @@ public void snapshotRemoteStoreIndexShard( String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, long primaryTerm, + long commitGeneration, long startTime, + Map indexFilesToFileLengthMap, ActionListener listener ) { if (isReadOnly()) { listener.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository")); return; } + final ShardId shardId = store.shardId(); try { final String generation = snapshotStatus.generation(); @@ -3866,13 +3896,21 @@ public void snapshotRemoteStoreIndexShard( final BlobContainer shardContainer = shardContainer(indexId, shardId); long indexTotalFileSize = 0; - // local store is being used here to fetch the files metadata instead of remote store as currently - // remote store is mirroring the local store. - List fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames()); - Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit); - for (String fileName : fileNames) { - indexTotalFileSize += commitSnapshotMetadata.get(fileName).length(); + List fileNames; + + if (snapshotIndexCommit != null) { + // local store is being used here to fetch the files metadata instead of remote store as currently + // remote store is mirroring the local store. + fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames()); + Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit); + for (String fileName : fileNames) { + indexTotalFileSize += commitSnapshotMetadata.get(fileName).length(); + } + } else { + fileNames = new ArrayList<>(indexFilesToFileLengthMap.keySet()); + indexTotalFileSize = indexFilesToFileLengthMap.values().stream().mapToLong(Long::longValue).sum(); } + int indexTotalNumberOfFiles = fileNames.size(); snapshotStatus.moveToStarted( @@ -3883,7 +3921,7 @@ public void snapshotRemoteStoreIndexShard( indexTotalFileSize ); - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration()); + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(commitGeneration); // now create and write the commit point logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); @@ -3894,7 +3932,7 @@ public void snapshotRemoteStoreIndexShard( snapshotId.getName(), lastSnapshotStatus.getIndexVersion(), primaryTerm, - snapshotIndexCommit.getGeneration(), + commitGeneration, lastSnapshotStatus.getStartTime(), threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(), indexTotalNumberOfFiles, diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java index 1b189da13e92e..26f5cd51573a0 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java @@ -44,6 +44,7 @@ import org.opensearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.opensearch.cluster.SnapshotsInProgress.ShardState; import org.opensearch.cluster.SnapshotsInProgress.State; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -63,6 +64,7 @@ import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.IndexShardSnapshotStatus.Stage; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; @@ -75,7 +77,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.nio.file.NoSuchFileException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -377,7 +378,9 @@ private void snapshot( ActionListener listener ) { try { - final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final IndexShard indexShard = indexService.getShardOrNull(shardId.id()); + final boolean closedIndex = indexService.getMetadata().getState() == IndexMetadata.State.CLOSE; if (indexShard.routingEntry().primary() == false) { throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); } @@ -404,24 +407,42 @@ private void snapshot( if (remoteStoreIndexShallowCopy && indexShard.indexSettings().isRemoteStoreEnabled()) { long startTime = threadPool.relativeTimeInMillis(); long primaryTerm = indexShard.getOperationPrimaryTerm(); - // we flush first to make sure we get the latest writes snapshotted - wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true); - IndexCommit snapshotIndexCommit = wrappedSnapshot.get(); - long commitGeneration = snapshotIndexCommit.getGeneration(); + long commitGeneration = 0L; + Map indexFilesToFileLengthMap = null; + IndexCommit snapshotIndexCommit = null; + try { + if (closedIndex) { + RemoteSegmentMetadata lastRemoteUploadedIndexCommit = indexShard.fetchLastRemoteUploadedSegmentMetadata(); + indexFilesToFileLengthMap = lastRemoteUploadedIndexCommit.getMetadata() + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getLength())); + primaryTerm = lastRemoteUploadedIndexCommit.getPrimaryTerm(); + commitGeneration = lastRemoteUploadedIndexCommit.getGeneration(); + } else { + wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true); + snapshotIndexCommit = wrappedSnapshot.get(); + commitGeneration = snapshotIndexCommit.getGeneration(); + } indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration); - } catch (NoSuchFileException e) { - wrappedSnapshot.close(); - logger.warn( - "Exception while acquiring lock on primaryTerm = {} and generation = {}", - primaryTerm, - commitGeneration - ); - indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true)); - wrappedSnapshot = indexShard.acquireLastIndexCommit(false); - snapshotIndexCommit = wrappedSnapshot.get(); - commitGeneration = snapshotIndexCommit.getGeneration(); - indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration); + } catch (IOException e) { + if (closedIndex) { + logger.warn("Exception while reading latest metadata file from remote store"); + listener.onFailure(e); + } else { + wrappedSnapshot.close(); + logger.warn( + "Exception while acquiring lock on primaryTerm = {} and generation = {}", + primaryTerm, + commitGeneration + ); + indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true)); + wrappedSnapshot = indexShard.acquireLastIndexCommit(false); + snapshotIndexCommit = wrappedSnapshot.get(); + commitGeneration = snapshotIndexCommit.getGeneration(); + indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration); + } } try { repository.snapshotRemoteStoreIndexShard( @@ -429,11 +450,13 @@ private void snapshot( snapshot.getSnapshotId(), indexId, snapshotIndexCommit, - getShardStateId(indexShard, snapshotIndexCommit), + null, snapshotStatus, primaryTerm, + commitGeneration, startTime, - ActionListener.runBefore(listener, wrappedSnapshot::close) + indexFilesToFileLengthMap, + closedIndex ? listener : ActionListener.runBefore(listener, wrappedSnapshot::close) ); } catch (IndexShardSnapshotFailedException e) { logger.error( diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 57a561bc8f2a3..4d85a3c491af8 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -12,6 +12,9 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.Version; import org.opensearch.action.StepListener; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.SnapshotsInProgress; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.settings.Settings; @@ -20,6 +23,7 @@ import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.CheckpointInfoResponse; @@ -32,6 +36,11 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.snapshots.Snapshot; +import org.opensearch.snapshots.SnapshotId; +import org.opensearch.snapshots.SnapshotShardsService; import org.opensearch.test.CorruptionUtils; import org.opensearch.test.junit.annotations.TestLogging; import org.hamcrest.MatcherAssert; @@ -41,6 +50,7 @@ import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -55,6 +65,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -541,6 +553,81 @@ public void onReplicationFailure( } } + public void testShallowCopySnapshotForClosedIndexSuccessful() throws Exception { + try (ReplicationGroup shards = createGroup(0, settings)) { + final IndexShard primaryShard = shards.getPrimary(); + shards.startAll(); + shards.indexDocs(10); + shards.refresh("test"); + shards.flush(); + shards.assertAllEqual(10); + + RepositoriesService repositoriesService = createRepositoriesService(); + BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository("random"); + + doAnswer(invocation -> { + IndexShardSnapshotStatus snapshotStatus = invocation.getArgument(5); + long commitGeneration = invocation.getArgument(7); + long startTime = invocation.getArgument(8); + final Map indexFilesToFileLengthMap = invocation.getArgument(9); + ActionListener listener = invocation.getArgument(10); + if (indexFilesToFileLengthMap != null) { + List fileNames = new ArrayList<>(indexFilesToFileLengthMap.keySet()); + long indexTotalFileSize = indexFilesToFileLengthMap.values().stream().mapToLong(Long::longValue).sum(); + int indexTotalNumberOfFiles = fileNames.size(); + snapshotStatus.moveToStarted(startTime, 0, indexTotalNumberOfFiles, 0, indexTotalFileSize); + // Not performing actual snapshot, just modifying the state + snapshotStatus.moveToFinalize(commitGeneration); + snapshotStatus.moveToDone(System.currentTimeMillis(), snapshotStatus.generation()); + listener.onResponse(snapshotStatus.generation()); + return null; + } + listener.onResponse(snapshotStatus.generation()); + return null; + }).when(repository) + .snapshotRemoteStoreIndexShard(any(), any(), any(), any(), any(), any(), anyLong(), anyLong(), anyLong(), any(), any()); + + final SnapshotShardsService shardsService = getSnapshotShardsService( + primaryShard, + shards.getIndexMetadata(), + true, + repositoriesService + ); + final Snapshot snapshot1 = new Snapshot( + randomAlphaOfLength(10), + new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5)) + ); + + // Initialize the shallow copy snapshot + final ClusterState initState = addSnapshotIndex( + clusterService.state(), + snapshot1, + primaryShard, + SnapshotsInProgress.State.INIT, + true + ); + shardsService.clusterChanged(new ClusterChangedEvent("test", initState, clusterService.state())); + + // start the snapshot + shardsService.clusterChanged( + new ClusterChangedEvent( + "test", + addSnapshotIndex(clusterService.state(), snapshot1, primaryShard, SnapshotsInProgress.State.STARTED, true), + initState + ) + ); + + // Check the snapshot got completed successfully + assertBusy(() -> { + final IndexShardSnapshotStatus.Copy copy = shardsService.currentSnapshotShards(snapshot1) + .get(primaryShard.shardId) + .asCopy(); + final IndexShardSnapshotStatus.Stage stage = copy.getStage(); + assertEquals(IndexShardSnapshotStatus.Stage.DONE, stage); + }); + } + } + private RemoteStoreReplicationSource getRemoteStoreReplicationSource(IndexShard shard, Runnable postGetFilesRunnable) { return new RemoteStoreReplicationSource(shard) { @Override diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 2311fc582616f..f4f94baabd7b0 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -68,6 +68,7 @@ import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.IndexId; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfoTests; @@ -892,10 +893,21 @@ public void testSnapshotWhileFailoverIncomplete() throws Exception { replicateSegments(primaryShard, shards.getReplicas()); shards.assertAllEqual(10); - final SnapshotShardsService shardsService = getSnapshotShardsService(replicaShard); + final SnapshotShardsService shardsService = getSnapshotShardsService( + replicaShard, + shards.getIndexMetadata(), + false, + createRepositoriesService() + ); final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5))); - final ClusterState initState = addSnapshotIndex(clusterService.state(), snapshot, replicaShard, SnapshotsInProgress.State.INIT); + final ClusterState initState = addSnapshotIndex( + clusterService.state(), + snapshot, + replicaShard, + SnapshotsInProgress.State.INIT, + false + ); shardsService.clusterChanged(new ClusterChangedEvent("test", initState, clusterService.state())); CountDownLatch latch = new CountDownLatch(1); @@ -907,7 +919,7 @@ public void testSnapshotWhileFailoverIncomplete() throws Exception { shardsService.clusterChanged( new ClusterChangedEvent( "test", - addSnapshotIndex(clusterService.state(), snapshot, replicaShard, SnapshotsInProgress.State.STARTED), + addSnapshotIndex(clusterService.state(), snapshot, replicaShard, SnapshotsInProgress.State.STARTED, false), initState ) ); @@ -956,21 +968,30 @@ public void testComputeReplicationCheckpointNullInfosReturnsEmptyCheckpoint() th } } - private SnapshotShardsService getSnapshotShardsService(IndexShard replicaShard) { + protected SnapshotShardsService getSnapshotShardsService( + IndexShard indexShard, + IndexMetadata indexMetadata, + boolean closedIdx, + RepositoriesService repositoriesService + ) { final TransportService transportService = mock(TransportService.class); when(transportService.getThreadPool()).thenReturn(threadPool); final IndicesService indicesService = mock(IndicesService.class); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(any())).thenReturn(indexService); - when(indexService.getShardOrNull(anyInt())).thenReturn(replicaShard); - return new SnapshotShardsService(settings, clusterService, createRepositoriesService(), transportService, indicesService); + when(indexService.getShardOrNull(anyInt())).thenReturn(indexShard); + when(indexService.getMetadata()).thenReturn( + new IndexMetadata.Builder(indexMetadata).state(closedIdx ? IndexMetadata.State.CLOSE : IndexMetadata.State.OPEN).build() + ); + return new SnapshotShardsService(settings, clusterService, repositoriesService, transportService, indicesService); } - private ClusterState addSnapshotIndex( + protected ClusterState addSnapshotIndex( ClusterState state, Snapshot snapshot, IndexShard shard, - SnapshotsInProgress.State snapshotState + SnapshotsInProgress.State snapshotState, + boolean shallowCopySnapshot ) { final Map shardsBuilder = new HashMap<>(); ShardRouting shardRouting = shard.shardRouting; @@ -991,7 +1012,7 @@ private ClusterState addSnapshotIndex( null, SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()), - false + shallowCopySnapshot ); return ClusterState.builder(state) .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(Collections.singletonList(entry))) diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index 71460d6248c9e..2415b917338e8 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -779,7 +779,9 @@ public void snapshotRemoteStoreIndexShard( String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, long primaryTerm, + long commitGeneration, long startTime, + Map indexFilesToFileLengthMap, ActionListener listener ) { diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index a5dc13c334513..062ebd2051f6e 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -289,6 +289,10 @@ protected EngineConfigFactory getEngineConfigFactory(IndexSettings indexSettings return new EngineConfigFactory(indexSettings); } + public IndexMetadata getIndexMetadata() { + return indexMetadata; + } + public int indexDocs(final int numOfDoc) throws Exception { for (int doc = 0; doc < numOfDoc; doc++) { final IndexRequest indexRequest = new IndexRequest(index.getName()).id(Integer.toString(docId.incrementAndGet()))