From 95a866ad8f9ef4115425246d467f31dbccaf8af5 Mon Sep 17 00:00:00 2001
From: Suraj Singh <surajrider@gmail.com>
Date: Mon, 8 Jan 2024 10:20:50 -0800
Subject: [PATCH] Fix original shard level unit test

Signed-off-by: Suraj Singh <surajrider@gmail.com>
---
 .../index/shard/RemoteIndexShardTests.java    | 108 +++++++++++++++++-
 .../SegmentReplicationTargetTests.java        |  79 +++----------
 2 files changed, 121 insertions(+), 66 deletions(-)

diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java
index 0561a7cedd44f..eacc504428ef1 100644
--- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java
+++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java
@@ -11,6 +11,7 @@
 import org.apache.lucene.index.IndexFileNames;
 import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.util.Version;
+import org.opensearch.action.StepListener;
 import org.opensearch.cluster.metadata.IndexMetadata;
 import org.opensearch.common.concurrent.GatedCloseable;
 import org.opensearch.common.settings.Settings;
@@ -366,6 +367,98 @@ public void testPrimaryRestart() throws Exception {
         }
     }
 
+    /**
+     * This test validates that unreferenced on disk file are ignored while requesting files from replication source to
+     * prevent FileAlreadyExistsException. It does so by only copying files in first round of segment replication without
+     * committing locally so that in next round of segment replication those files are not considered for download again
+     */
+    public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception {
+        try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
+            shards.startAll();
+            IndexShard primary = shards.getPrimary();
+            final IndexShard replica = shards.getReplicas().get(0);
+
+            shards.indexDocs(10);
+            primary.refresh("Test");
+
+            final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
+            final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
+            when(sourceFactory.get(any())).thenReturn(
+                getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); })
+            );
+            CountDownLatch latch = new CountDownLatch(1);
+
+            logger.info("--> Starting first round of replication");
+            // Start first round of segment replication. This should fail with simulated error but with replica having
+            // files in its local store but not in active reader.
+            final SegmentReplicationTarget target = targetService.startReplication(
+                replica,
+                primary.getLatestReplicationCheckpoint(),
+                new SegmentReplicationTargetService.SegmentReplicationListener() {
+                    @Override
+                    public void onReplicationDone(SegmentReplicationState state) {
+                        latch.countDown();
+                        Assert.fail("Replication should fail with simulated error");
+                    }
+
+                    @Override
+                    public void onReplicationFailure(
+                        SegmentReplicationState state,
+                        ReplicationFailedException e,
+                        boolean sendShardFailure
+                    ) {
+                        latch.countDown();
+                        assertFalse(sendShardFailure);
+                        logger.error("Replication error", e);
+                    }
+                }
+            );
+            latch.await();
+            Set<String> onDiskFiles = new HashSet<>(Arrays.asList(replica.store().directory().listAll()));
+            onDiskFiles.removeIf(name -> EXCLUDE_FILES.contains(name) || name.startsWith(IndexFileNames.SEGMENTS));
+            List<String> activeFiles = replica.getSegmentMetadataMap()
+                .values()
+                .stream()
+                .map(metadata -> metadata.name())
+                .collect(Collectors.toList());
+            assertTrue("Files should not be committed", activeFiles.isEmpty());
+            assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty());
+            assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES);
+
+            // Start next round of segment replication and not throwing exception resulting in commit on replica
+            when(sourceFactory.get(any())).thenReturn(getRemoteStoreReplicationSource(replica, () -> {}));
+            CountDownLatch waitForSecondRound = new CountDownLatch(1);
+            logger.info("--> Starting second round of replication");
+            final SegmentReplicationTarget newTarget = targetService.startReplication(
+                replica,
+                primary.getLatestReplicationCheckpoint(),
+                new SegmentReplicationTargetService.SegmentReplicationListener() {
+                    @Override
+                    public void onReplicationDone(SegmentReplicationState state) {
+                        waitForSecondRound.countDown();
+                    }
+
+                    @Override
+                    public void onReplicationFailure(
+                        SegmentReplicationState state,
+                        ReplicationFailedException e,
+                        boolean sendShardFailure
+                    ) {
+                        waitForSecondRound.countDown();
+                        logger.error("Replication error", e);
+                        Assert.fail("Replication should not fail");
+                    }
+                }
+            );
+            waitForSecondRound.await();
+            assertEquals(newTarget.state().getStage(), SegmentReplicationState.Stage.DONE);
+            activeFiles = replica.getSegmentMetadataMap().values().stream().map(metadata -> metadata.name()).collect(Collectors.toList());
+            assertTrue("Replica should have consistent disk & reader", activeFiles.containsAll(onDiskFiles));
+            shards.removeReplica(replica);
+            closeShards(replica);
+        }
+    }
+
     /**
      * This test validates that local non-readable (corrupt, partially) on disk are deleted vs failing the
      * replication event. This test mimics local files (not referenced by reader) by throwing exception post file copy and
@@ -469,8 +562,19 @@ public void getSegmentFiles(
                 BiConsumer<String, Long> fileProgressTracker,
                 ActionListener<GetSegmentFilesResponse> listener
             ) {
-                super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener);
-                postGetFilesRunnable.run();
+                StepListener<GetSegmentFilesResponse> waitForCopyFilesListener = new StepListener();
+                super.getSegmentFiles(
+                    replicationId,
+                    checkpoint,
+                    filesToFetch,
+                    indexShard,
+                    (fileName, bytesRecovered) -> {},
+                    waitForCopyFilesListener
+                );
+                waitForCopyFilesListener.whenComplete(response -> {
+                    postGetFilesRunnable.run();
+                    listener.onResponse(response);
+                }, listener::onFailure);
             }
 
             @Override
diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java
index 56fc90d130da3..8b4b3aff701b4 100644
--- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java
+++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java
@@ -29,16 +29,21 @@
 import org.opensearch.cluster.metadata.IndexMetadata;
 import org.opensearch.common.settings.Settings;
 import org.opensearch.core.action.ActionListener;
+import org.opensearch.core.index.shard.ShardId;
+import org.opensearch.index.IndexSettings;
 import org.opensearch.index.engine.NRTReplicationEngineFactory;
 import org.opensearch.index.replication.TestReplicationSource;
 import org.opensearch.index.shard.IndexShard;
 import org.opensearch.index.shard.IndexShardTestCase;
 import org.opensearch.index.store.Store;
 import org.opensearch.index.store.StoreFileMetadata;
+import org.opensearch.index.store.StoreTests;
 import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
 import org.opensearch.indices.replication.common.ReplicationFailedException;
 import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
 import org.opensearch.indices.replication.common.ReplicationType;
+import org.opensearch.test.DummyShardLock;
+import org.opensearch.test.IndexSettingsModule;
 import org.junit.Assert;
 
 import java.io.FileNotFoundException;
@@ -76,6 +81,11 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase {
 
     private static final Map<String, StoreFileMetadata> SI_SNAPSHOT_DIFFERENT = Map.of(SEGMENT_FILE_DIFF.name(), SEGMENT_FILE_DIFF);
 
+    private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings(
+        "index",
+        Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build()
+    );
+
     private SegmentInfos testSegmentInfos;
 
     @Override
@@ -431,10 +441,7 @@ public void test_MissingFiles_NotCausingFailure() throws IOException {
         // Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files
         // generated due to delete operations. These two snapshots can then be used in test to mock the primary shard
         // snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element).
-        List<Store.MetadataSnapshot> storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount, spyIndexShard);
-        // Delete on-disk files so that they are not considered for file diff
-        deleteContent(spyIndexShard.store().directory());
-        spyIndexShard.store().close();
+        List<Store.MetadataSnapshot> storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount);
 
         SegmentReplicationSource segrepSource = new TestReplicationSource() {
             @Override
@@ -479,72 +486,14 @@ public void onFailure(Exception e) {
         });
     }
 
-    /**
-     * This tests ensures that on-disk files on replica are taken into consideration while evaluating the files diff
-     * from primary. The test mocks the files referred by active reader to a smaller subset so that logic to filter
-     * out on-disk files be exercised.
-     * @throws IOException if an indexing operation fails or segment replication fails
-     */
-    public void test_OnDiskFiles_ReusedForReplication() throws IOException {
-        int docCount = 1 + random().nextInt(10);
-        // Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files
-        // generated due to delete operations. These two snapshots can then be used in test to mock the primary shard
-        // snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element).
-        List<Store.MetadataSnapshot> storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount, spyIndexShard);
-
-        SegmentReplicationSource segrepSource = new TestReplicationSource() {
-            @Override
-            public void getCheckpointMetadata(
-                long replicationId,
-                ReplicationCheckpoint checkpoint,
-                ActionListener<CheckpointInfoResponse> listener
-            ) {
-                listener.onResponse(new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1).asMap(), buffer.toArrayCopy()));
-            }
-
-            @Override
-            public void getSegmentFiles(
-                long replicationId,
-                ReplicationCheckpoint checkpoint,
-                List<StoreFileMetadata> filesToFetch,
-                IndexShard indexShard,
-                BiConsumer<String, Long> fileProgressTracker,
-                ActionListener<GetSegmentFilesResponse> listener
-            ) {
-                // No files should be requested from replication source
-                assertEquals(0, filesToFetch.size());
-                listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
-            }
-        };
-
-        segrepTarget = new SegmentReplicationTarget(spyIndexShard, repCheckpoint, segrepSource,  mock(
-            SegmentReplicationTargetService.SegmentReplicationListener.class
-        ));
-        // Mask the files returned by active reader. This is needed so that logic to filter out on disk is exercised
-        when(spyIndexShard.getSegmentMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap());
-        segrepTarget.startReplication(new ActionListener<Void>() {
-            @Override
-            public void onResponse(Void replicationResponse) {
-                segrepTarget.markAsDone();
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                Assert.fail();
-            }
-        });
-    }
-
-
     /**
      * Generates a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete
      * operation. A list of snapshots is returned so that identical files have same checksum.
      * @param docCount the number of documents to index in the first snapshot
-     * @param shard The IndexShard object to use for writing
      * @return a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete
      * @throws IOException if one of the indexing operations fails
      */
-    private List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount, IndexShard shard) throws IOException {
+    private List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount) throws IOException {
         List<Document> docList = new ArrayList<>();
         for (int i = 0; i < docCount; i++) {
             Document document = new Document();
@@ -558,7 +507,8 @@ private List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount,
         IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec());
         iwc.setMergePolicy(NoMergePolicy.INSTANCE);
         iwc.setUseCompoundFile(true);
-        Store store = shard.store();
+        final ShardId shardId = new ShardId("index", "_na_", 1);
+        Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
         IndexWriter writer = new IndexWriter(store.directory(), iwc);
         for (Document d : docList) {
             writer.addDocument(d);
@@ -569,6 +519,7 @@ private List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount,
         writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(docCount))));
         writer.commit();
         Store.MetadataSnapshot storeMetadataWithDeletes = store.getMetadata();
+        deleteContent(store.directory());
         writer.close();
         store.close();
         return Arrays.asList(storeMetadata, storeMetadataWithDeletes);