From 44e72173789fced100f5b49ce39815c6a4156867 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Tue, 9 Apr 2024 10:23:15 +0530 Subject: [PATCH] Integrate local recovery with remote store seeding during migration Signed-off-by: Bhumika Saini --- .../RemotePrimaryLocalRecoveryIT.java | 161 ++++++++++++++++++ .../org/opensearch/index/IndexService.java | 5 + .../opensearch/index/shard/IndexShard.java | 2 +- .../opensearch/index/shard/StoreRecovery.java | 7 + 4 files changed, 174 insertions(+), 1 deletion(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryLocalRecoveryIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryLocalRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryLocalRecoveryIT.java new file mode 100644 index 0000000000000..3cb7848903840 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryLocalRecoveryIT.java @@ -0,0 +1,161 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotemigration; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.client.Client; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.remote.RemoteSegmentStats; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +import java.util.Map; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemotePrimaryLocalRecoveryIT extends MigrationBaseTestCase { + + /** + * Tests local recovery sanity in the happy path flow + * @throws Exception + */ + public void testLocalRecoveryRollingRestart() throws Exception { + triggerRollingRestartForRemoteMigration(); + } + + /** + * Tests local recovery sanity during remote migration with a node restart in between + * @throws Exception + */ + public void testLocalRecoveryRollingRestartAndNodeFailure() throws Exception { + triggerRollingRestartForRemoteMigration(); + + DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); + String nodeToRestart = discoveryNodes.getClusterManagerNodeId(); + internalCluster().restartNode(nodeToRestart); + + Map shardStatsMap = internalCluster().client().admin().indices().prepareStats("idx1").get().asMap(); + shardStatsMap.forEach((shardRouting, shardStats) -> { + if (nodeToRestart.equals(shardRouting.currentNodeId())) { + RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats(); + assertTrue(remoteSegmentStats.getTotalUploadTime() > 0); + assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); + } + }); + } + + /** + * Tests local recovery sanity during remote migration with a node replacement in between + * @throws Exception + */ + public void testLocalRecoveryRollingRestartAndNodeReplacement() throws Exception { + triggerRollingRestartForRemoteMigration(); + + internalCluster().stopCurrentClusterManagerNode(); + String newNode = internalCluster().startNode(); + + Map shardStatsMap = internalCluster().client().admin().indices().prepareStats("idx1").get().asMap(); + shardStatsMap.forEach((shardRouting, shardStats) -> { + if (newNode.equals(shardRouting.currentNodeId())) { + RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats(); + assertTrue(remoteSegmentStats.getTotalUploadTime() > 0); + assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); + } + }); + } + + /** + * Tests local recovery sanity during remote migration with all node replacement in between + * @throws Exception + */ + public void testLocalRecoveryRollingRestartAndAllNodeReplacement() throws Exception { + triggerRollingRestartForRemoteMigration(); + + internalCluster().stopAllNodes(); + String newNode1 = internalCluster().startNode(); + String newNode2 = internalCluster().startNode(); + + Map shardStatsMap = internalCluster().client().admin().indices().prepareStats("idx1").get().asMap(); + DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); + shardStatsMap.forEach((shardRouting, shardStats) -> { + if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()) { + RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats(); + assertTrue(remoteSegmentStats.getTotalUploadTime() > 0); + assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); + } + }); + } + + /** + * Helper method to run a rolling restart for migration to remote backed cluster + * @throws Exception + */ + private void triggerRollingRestartForRemoteMigration() throws Exception { + String docRepNode = internalCluster().startNode(); + Client client = internalCluster().client(docRepNode); + + // create index + client().admin().indices().prepareCreate("idx1").setSettings(indexSettings()).setMapping("field", "type=text").get(); + ensureGreen("idx1"); + + indexBulk("idx1", randomIntBetween(10, 100)); + refresh("idx1"); + + // Index some more docs + indexBulk("idx1", randomIntBetween(10, 100)); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // add remote node in mixed mode cluster + addRemote = true; + String remoteNode = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + + updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // rolling restart + final Settings remoteNodeAttributes = remoteStoreClusterSettings( + REPOSITORY_NAME, + segmentRepoPath, + REPOSITORY_2_NAME, + translogRepoPath + ); + internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() { + // Update remote attributes + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + return remoteNodeAttributes; + } + }); + + ensureStableCluster(2); + ensureGreen("idx1"); + assertEquals(internalCluster().size(), 2); + + // Assert on remote uploads + Map shardStatsMap = internalCluster().client().admin().indices().prepareStats("idx1").get().asMap(); + DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes(); + shardStatsMap.forEach((shardRouting, shardStats) -> { + if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode()) { + RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats(); + assertTrue(remoteSegmentStats.getTotalUploadTime() > 0); + assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); + } + }); + } +} diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index a7b29314210df..831025dd2be69 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -506,6 +506,11 @@ public synchronized IndexShard createShard( } logger.info("DocRep shard {} is migrating to remote", shardId); seedRemote = true; + } else if (sourceNode == null && routing.primary()) { + // seeding when index metadata says so + // ToDo Check for this shard is seeded or not from index metadata + logger.info("DocRep shard {} is migrating to remote during restart ", shardId); + seedRemote = true; } remoteDirectory = ((RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory).newDirectory( RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(this.indexSettings.getNodeSettings()), diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 26dbbbcdee7c0..346690576a3ad 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3702,7 +3702,7 @@ public void startRecovery( RepositoriesService repositoriesService, Consumer mappingUpdateConsumer, IndicesService indicesService - ) { + ) throws IOException { // TODO: Create a proper object to encapsulate the recovery context // all of the current methods here follow a pattern of: // resolve context which isn't really dependent on the local shards and then async diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index f5e342d28fde1..5503264cdbe32 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -661,6 +661,13 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe indexShard.recoveryState().getIndex().setFileDetailsComplete(); } indexShard.openEngineAndRecoverFromTranslog(); + if (indexShard.shouldSeedRemoteStore()) { + indexShard.getThreadPool() + .executor(ThreadPool.Names.GENERIC) + .execute(() -> { indexShard.refresh("remote store migration"); }); + indexShard.waitForRemoteStoreSync(); + logger.info("Remote Store is now seeded via local recovery for {}", indexShard.shardId()); + } indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from shard_store");