From 8cb4bb4c1776aaee3c4065c5a6b43e86a1e52daa Mon Sep 17 00:00:00 2001 From: Sudarshan baliga Date: Fri, 2 Feb 2024 15:02:42 +0530 Subject: [PATCH] Added transport action for bulk async shard fetch for primary shards (#8218) * Added transport action for bulk async shard fetch for primary shards Signed-off-by: sudarshan baliga Co-authored-by: Shivansh Arora --- .../gateway/GatewayRecoveryTestUtils.java | 77 ++++ .../gateway/RecoveryFromGatewayIT.java | 99 +++++ .../org/opensearch/gateway/GatewayModule.java | 1 + ...ansportNodesGatewayStartedShardHelper.java | 114 +++++ ...ransportNodesListGatewayStartedShards.java | 86 +--- ...ortNodesListGatewayStartedShardsBatch.java | 401 ++++++++++++++++++ 6 files changed, 708 insertions(+), 70 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java create mode 100644 server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java create mode 100644 server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java b/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java new file mode 100644 index 0000000000000..2b6a5b4ee6867 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.indices.store.ShardAttributes; + +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static org.opensearch.test.OpenSearchIntegTestCase.client; +import static org.opensearch.test.OpenSearchIntegTestCase.internalCluster; +import static org.opensearch.test.OpenSearchIntegTestCase.resolveIndex; + +public class GatewayRecoveryTestUtils { + + public static DiscoveryNode[] getDiscoveryNodes() throws ExecutionException, InterruptedException { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + ClusterStateResponse clusterStateResponse = client().admin().cluster().state(clusterStateRequest).get(); + final List nodes = new LinkedList<>(clusterStateResponse.getState().nodes().getDataNodes().values()); + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + return disNodesArr; + } + + public static Map prepareRequestMap(String[] indices, int primaryShardCount) { + Map shardIdShardAttributesMap = new HashMap<>(); + for (String indexName : indices) { + final Index index = resolveIndex(indexName); + final String customDataPath = IndexMetadata.INDEX_DATA_PATH_SETTING.get( + client().admin().indices().prepareGetSettings(indexName).get().getIndexToSettings().get(indexName) + ); + for (int shardIdNum = 0; shardIdNum < primaryShardCount; shardIdNum++) { + final ShardId shardId = new ShardId(index, shardIdNum); + shardIdShardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath)); + } + } + return shardIdShardAttributesMap; + } + + public static void corruptShard(String nodeName, ShardId shardId) throws IOException, InterruptedException { + for (Path path : internalCluster().getInstance(NodeEnvironment.class, nodeName).availableShardPaths(shardId)) { + final Path indexPath = path.resolve(ShardPath.INDEX_FOLDER_NAME); + if (Files.exists(indexPath)) { // multi data path might only have one path in use + try (DirectoryStream stream = Files.newDirectoryStream(indexPath)) { + for (Path item : stream) { + if (item.getFileName().toString().startsWith("segments_")) { + Files.delete(item); + } + } + } + } + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 229cd7bffad2f..9da1336642a64 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -36,6 +36,8 @@ import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; +import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup; +import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.admin.indices.stats.IndexStats; import org.opensearch.action.admin.indices.stats.ShardStats; @@ -60,6 +62,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; +import org.opensearch.indices.store.ShardAttributes; import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalSettingsPlugin; import org.opensearch.test.InternalTestCluster.RestartCallback; @@ -85,6 +88,9 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.gateway.GatewayRecoveryTestUtils.corruptShard; +import static org.opensearch.gateway.GatewayRecoveryTestUtils.getDiscoveryNodes; +import static org.opensearch.gateway.GatewayRecoveryTestUtils.prepareRequestMap; import static org.opensearch.gateway.GatewayService.RECOVER_AFTER_NODES_SETTING; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.index.query.QueryBuilders.termQuery; @@ -734,4 +740,97 @@ public void testMessyElectionsStillMakeClusterGoGreen() throws Exception { internalCluster().fullRestart(); ensureGreen("test"); } + + public void testSingleShardFetchUsingBatchAction() { + String indexName = "test"; + int numOfShards = 1; + prepareIndex(indexName, numOfShards); + Map shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName }, numOfShards); + + ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get(); + + TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response; + response = ActionTestUtils.executeBlocking( + internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class), + new TransportNodesListGatewayStartedShardsBatch.Request(searchShardsResponse.getNodes(), shardIdShardAttributesMap) + ); + final Index index = resolveIndex(indexName); + final ShardId shardId = new ShardId(index, 0); + TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap() + .get(searchShardsResponse.getNodes()[0].getId()) + .getNodeGatewayStartedShardsBatch() + .get(shardId); + assertNodeGatewayStartedShardsHappyCase(nodeGatewayStartedShards); + } + + public void testShardFetchMultiNodeMultiIndexesUsingBatchAction() { + // start node + internalCluster().startNode(); + String indexName1 = "test1"; + String indexName2 = "test2"; + int numShards = internalCluster().numDataNodes(); + // assign one primary shard each to the data nodes + prepareIndex(indexName1, numShards); + prepareIndex(indexName2, numShards); + Map shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName1, indexName2 }, numShards); + ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName1, indexName2).get(); + assertEquals(internalCluster().numDataNodes(), searchShardsResponse.getNodes().length); + TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response; + response = ActionTestUtils.executeBlocking( + internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class), + new TransportNodesListGatewayStartedShardsBatch.Request(searchShardsResponse.getNodes(), shardIdShardAttributesMap) + ); + for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) { + ShardId shardId = clusterSearchShardsGroup.getShardId(); + assertEquals(1, clusterSearchShardsGroup.getShards().length); + String nodeId = clusterSearchShardsGroup.getShards()[0].currentNodeId(); + TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap() + .get(nodeId) + .getNodeGatewayStartedShardsBatch() + .get(shardId); + assertNodeGatewayStartedShardsHappyCase(nodeGatewayStartedShards); + } + } + + public void testShardFetchCorruptedShardsUsingBatchAction() throws Exception { + String indexName = "test"; + int numOfShards = 1; + prepareIndex(indexName, numOfShards); + Map shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName }, numOfShards); + ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get(); + final Index index = resolveIndex(indexName); + final ShardId shardId = new ShardId(index, 0); + corruptShard(searchShardsResponse.getNodes()[0].getName(), shardId); + TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response; + internalCluster().restartNode(searchShardsResponse.getNodes()[0].getName()); + response = ActionTestUtils.executeBlocking( + internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class), + new TransportNodesListGatewayStartedShardsBatch.Request(getDiscoveryNodes(), shardIdShardAttributesMap) + ); + DiscoveryNode[] discoveryNodes = getDiscoveryNodes(); + TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap() + .get(discoveryNodes[0].getId()) + .getNodeGatewayStartedShardsBatch() + .get(shardId); + assertNotNull(nodeGatewayStartedShards.storeException()); + assertNotNull(nodeGatewayStartedShards.allocationId()); + assertTrue(nodeGatewayStartedShards.primary()); + } + + private void assertNodeGatewayStartedShardsHappyCase( + TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards + ) { + assertNull(nodeGatewayStartedShards.storeException()); + assertNotNull(nodeGatewayStartedShards.allocationId()); + assertTrue(nodeGatewayStartedShards.primary()); + } + + private void prepareIndex(String indexName, int numberOfPrimaryShards) { + createIndex( + indexName, + Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numberOfPrimaryShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + index(indexName, "type", "1", Collections.emptyMap()); + flush(indexName); + } } diff --git a/server/src/main/java/org/opensearch/gateway/GatewayModule.java b/server/src/main/java/org/opensearch/gateway/GatewayModule.java index 59ec0243c88c9..847ba01737332 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayModule.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayModule.java @@ -47,6 +47,7 @@ protected void configure() { bind(GatewayService.class).asEagerSingleton(); bind(TransportNodesListGatewayMetaState.class).asEagerSingleton(); bind(TransportNodesListGatewayStartedShards.class).asEagerSingleton(); + bind(TransportNodesListGatewayStartedShardsBatch.class).asEagerSingleton(); bind(LocalAllocateDangledIndices.class).asEagerSingleton(); } } diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java new file mode 100644 index 0000000000000..403e3e96fa209 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java @@ -0,0 +1,114 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.shard.ShardStateMetadata; +import org.opensearch.index.store.Store; +import org.opensearch.indices.IndicesService; + +import java.io.IOException; + +/** + * This class has the common code used in {@link TransportNodesListGatewayStartedShards} and + * {@link TransportNodesListGatewayStartedShardsBatch} to get the shard info on the local node. + *

+ * This class should not be used to add more functions and will be removed when the + * {@link TransportNodesListGatewayStartedShards} will be deprecated and all the code will be moved to + * {@link TransportNodesListGatewayStartedShardsBatch} + * + * @opensearch.internal + */ +public class TransportNodesGatewayStartedShardHelper { + public static TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard getShardInfoOnLocalNode( + Logger logger, + final ShardId shardId, + NamedXContentRegistry namedXContentRegistry, + NodeEnvironment nodeEnv, + IndicesService indicesService, + String shardDataPathInRequest, + Settings settings, + ClusterService clusterService + ) throws IOException { + logger.trace("{} loading local shard state info", shardId); + ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState( + logger, + namedXContentRegistry, + nodeEnv.availableShardPaths(shardId) + ); + if (shardStateMetadata != null) { + if (indicesService.getShardOrNull(shardId) == null + && shardStateMetadata.indexDataLocation == ShardStateMetadata.IndexDataLocation.LOCAL) { + final String customDataPath; + if (shardDataPathInRequest != null) { + customDataPath = shardDataPathInRequest; + } else { + // TODO: Fallback for BWC with older OpenSearch versions. + // Remove once request.getCustomDataPath() always returns non-null + final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); + if (metadata != null) { + customDataPath = new IndexSettings(metadata, settings).customDataPath(); + } else { + logger.trace("{} node doesn't have meta data for the requests index", shardId); + throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); + } + } + // we don't have an open shard on the store, validate the files on disk are openable + ShardPath shardPath = null; + try { + shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); + if (shardPath == null) { + throw new IllegalStateException(shardId + " no shard path found"); + } + Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger); + } catch (Exception exception) { + final ShardPath finalShardPath = shardPath; + logger.trace( + () -> new ParameterizedMessage( + "{} can't open index for shard [{}] in path [{}]", + shardId, + shardStateMetadata, + (finalShardPath != null) ? finalShardPath.resolveIndex() : "" + ), + exception + ); + String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; + return new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard( + allocationId, + shardStateMetadata.primary, + null, + exception + ); + } + } + + logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata); + String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; + final IndexShard shard = indicesService.getShardOrNull(shardId); + return new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard( + allocationId, + shardStateMetadata.primary, + shard != null ? shard.getLatestReplicationCheckpoint() : null + ); + } + logger.trace("{} no local shard info found", shardId); + return new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard(null, false, null); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 601a5c671d67c..0ba872aab9974 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -32,7 +32,6 @@ package org.opensearch.gateway; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionType; @@ -43,7 +42,6 @@ import org.opensearch.action.support.nodes.BaseNodesResponse; import org.opensearch.action.support.nodes.TransportNodesAction; import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -55,11 +53,6 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.NodeEnvironment; -import org.opensearch.index.IndexSettings; -import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.ShardPath; -import org.opensearch.index.shard.ShardStateMetadata; -import org.opensearch.index.store.Store; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.store.ShardAttributes; @@ -72,6 +65,8 @@ import java.util.Map; import java.util.Objects; +import static org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.getShardInfoOnLocalNode; + /** * This transport action is used to fetch the shard version from each node during primary allocation in {@link GatewayAllocator}. * We use this to find out which node holds the latest shard version and which of them used to be a primary in order to allocate @@ -159,72 +154,23 @@ protected NodesGatewayStartedShards newResponse( @Override protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { try { - final ShardId shardId = request.getShardId(); - logger.trace("{} loading local shard state info", shardId); - ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState( + TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard shardInfo = getShardInfoOnLocalNode( logger, + request.getShardId(), namedXContentRegistry, - nodeEnv.availableShardPaths(request.shardId) + nodeEnv, + indicesService, + request.getCustomDataPath(), + settings, + clusterService + ); + return new NodeGatewayStartedShards( + clusterService.localNode(), + shardInfo.allocationId(), + shardInfo.primary(), + shardInfo.replicationCheckpoint(), + shardInfo.storeException() ); - if (shardStateMetadata != null) { - if (indicesService.getShardOrNull(shardId) == null - && shardStateMetadata.indexDataLocation == ShardStateMetadata.IndexDataLocation.LOCAL) { - final String customDataPath; - if (request.getCustomDataPath() != null) { - customDataPath = request.getCustomDataPath(); - } else { - // TODO: Fallback for BWC with older OpenSearch versions. - // Remove once request.getCustomDataPath() always returns non-null - final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); - if (metadata != null) { - customDataPath = new IndexSettings(metadata, settings).customDataPath(); - } else { - logger.trace("{} node doesn't have meta data for the requests index", shardId); - throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); - } - } - // we don't have an open shard on the store, validate the files on disk are openable - ShardPath shardPath = null; - try { - shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); - if (shardPath == null) { - throw new IllegalStateException(shardId + " no shard path found"); - } - Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger); - } catch (Exception exception) { - final ShardPath finalShardPath = shardPath; - logger.trace( - () -> new ParameterizedMessage( - "{} can't open index for shard [{}] in path [{}]", - shardId, - shardStateMetadata, - (finalShardPath != null) ? finalShardPath.resolveIndex() : "" - ), - exception - ); - String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; - return new NodeGatewayStartedShards( - clusterService.localNode(), - allocationId, - shardStateMetadata.primary, - null, - exception - ); - } - } - - logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata); - String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; - final IndexShard shard = indicesService.getShardOrNull(shardId); - return new NodeGatewayStartedShards( - clusterService.localNode(), - allocationId, - shardStateMetadata.primary, - shard != null ? shard.getLatestReplicationCheckpoint() : null - ); - } - logger.trace("{} no local shard info found", shardId); - return new NodeGatewayStartedShards(clusterService.localNode(), null, false, null); } catch (Exception e) { throw new OpenSearchException("failed to load started shards", e); } diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java new file mode 100644 index 0000000000000..bc327c1b85748 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java @@ -0,0 +1,401 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionType; +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.store.ShardAttributes; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.getShardInfoOnLocalNode; + +/** + * This transport action is used to fetch batch of unassigned shard version from each node during primary allocation in {@link GatewayAllocator}. + * We use this to find out which node holds the latest shard version and which of them used to be a primary in order to allocate + * shards after node or cluster restarts. + * + * @opensearch.internal + */ +public class TransportNodesListGatewayStartedShardsBatch extends TransportNodesAction< + TransportNodesListGatewayStartedShardsBatch.Request, + TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch, + TransportNodesListGatewayStartedShardsBatch.NodeRequest, + TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch> + implements + AsyncShardFetch.Lister< + TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch, + TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch> { + + public static final String ACTION_NAME = "internal:gateway/local/started_shards_batch"; + public static final ActionType TYPE = new ActionType<>( + ACTION_NAME, + NodesGatewayStartedShardsBatch::new + ); + + private final Settings settings; + private final NodeEnvironment nodeEnv; + private final IndicesService indicesService; + private final NamedXContentRegistry namedXContentRegistry; + + @Inject + public TransportNodesListGatewayStartedShardsBatch( + Settings settings, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + NodeEnvironment env, + IndicesService indicesService, + NamedXContentRegistry namedXContentRegistry + ) { + super( + ACTION_NAME, + threadPool, + clusterService, + transportService, + actionFilters, + Request::new, + NodeRequest::new, + ThreadPool.Names.FETCH_SHARD_STARTED, + NodeGatewayStartedShardsBatch.class + ); + this.settings = settings; + this.nodeEnv = env; + this.indicesService = indicesService; + this.namedXContentRegistry = namedXContentRegistry; + } + + @Override + public void list( + Map shardAttributesMap, + DiscoveryNode[] nodes, + ActionListener listener + ) { + execute(new Request(nodes, shardAttributesMap), listener); + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest(request); + } + + @Override + protected NodeGatewayStartedShardsBatch newNodeResponse(StreamInput in) throws IOException { + return new NodeGatewayStartedShardsBatch(in); + } + + @Override + protected NodesGatewayStartedShardsBatch newResponse( + Request request, + List responses, + List failures + ) { + return new NodesGatewayStartedShardsBatch(clusterService.getClusterName(), responses, failures); + } + + /** + * This function is similar to nodeOperation method of {@link TransportNodesListGatewayStartedShards} we loop over + * the shards here and populate the data about the shards held by the local node. + * + * @param request Request containing the map shardIdsWithCustomDataPath. + * @return NodeGatewayStartedShardsBatch contains the data about the primary shards held by the local node + */ + @Override + protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) { + Map shardsOnNode = new HashMap<>(); + for (ShardAttributes shardAttr : request.shardAttributes.values()) { + final ShardId shardId = shardAttr.getShardId(); + try { + shardsOnNode.put( + shardId, + getShardInfoOnLocalNode( + logger, + shardId, + namedXContentRegistry, + nodeEnv, + indicesService, + shardAttr.getCustomDataPath(), + settings, + clusterService + ) + ); + } catch (Exception e) { + shardsOnNode.put( + shardId, + new NodeGatewayStartedShard(null, false, null, new OpenSearchException("failed to load started shards", e)) + ); + } + } + return new NodeGatewayStartedShardsBatch(clusterService.localNode(), shardsOnNode); + } + + /** + * This is used in constructing the request for making the transport request to set of other node. + * Refer {@link TransportNodesAction} class start method. + * + * @opensearch.internal + */ + public static class Request extends BaseNodesRequest { + private final Map shardAttributes; + + public Request(StreamInput in) throws IOException { + super(in); + shardAttributes = in.readMap(ShardId::new, ShardAttributes::new); + } + + public Request(DiscoveryNode[] nodes, Map shardAttributes) { + super(nodes); + this.shardAttributes = Objects.requireNonNull(shardAttributes); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(shardAttributes, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); + } + + public Map getShardAttributes() { + return shardAttributes; + } + } + + /** + * Responses received from set of other nodes is clubbed into this class and sent back to the caller + * of this transport request. Refer {@link TransportNodesAction} + * + * @opensearch.internal + */ + public static class NodesGatewayStartedShardsBatch extends BaseNodesResponse { + + public NodesGatewayStartedShardsBatch(StreamInput in) throws IOException { + super(in); + } + + public NodesGatewayStartedShardsBatch( + ClusterName clusterName, + List nodes, + List failures + ) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeGatewayStartedShardsBatch::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + } + + /** + * NodeRequest class is for deserializing the request received by this node from other node for this transport action. + * This is used in {@link TransportNodesAction} + * + * @opensearch.internal + */ + public static class NodeRequest extends TransportRequest { + private final Map shardAttributes; + + public NodeRequest(StreamInput in) throws IOException { + super(in); + shardAttributes = in.readMap(ShardId::new, ShardAttributes::new); + } + + public NodeRequest(Request request) { + this.shardAttributes = Objects.requireNonNull(request.getShardAttributes()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(shardAttributes, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); + } + } + + /** + * This class encapsulates the metadata about a started shard that needs to be persisted or sent between nodes. + * This is used in {@link NodeGatewayStartedShardsBatch} to construct the response for each node, instead of + * {@link TransportNodesListGatewayStartedShards.NodeGatewayStartedShards} because we don't need to save an extra + * {@link DiscoveryNode} object like in {@link TransportNodesListGatewayStartedShards.NodeGatewayStartedShards} + * which reduces memory footprint of its objects. + * + * @opensearch.internal + */ + public static class NodeGatewayStartedShard { + private final String allocationId; + private final boolean primary; + private final Exception storeException; + private final ReplicationCheckpoint replicationCheckpoint; + + public NodeGatewayStartedShard(StreamInput in) throws IOException { + allocationId = in.readOptionalString(); + primary = in.readBoolean(); + if (in.readBoolean()) { + storeException = in.readException(); + } else { + storeException = null; + } + if (in.readBoolean()) { + replicationCheckpoint = new ReplicationCheckpoint(in); + } else { + replicationCheckpoint = null; + } + } + + public NodeGatewayStartedShard(String allocationId, boolean primary, ReplicationCheckpoint replicationCheckpoint) { + this(allocationId, primary, replicationCheckpoint, null); + } + + public NodeGatewayStartedShard( + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint, + Exception storeException + ) { + this.allocationId = allocationId; + this.primary = primary; + this.replicationCheckpoint = replicationCheckpoint; + this.storeException = storeException; + } + + public String allocationId() { + return this.allocationId; + } + + public boolean primary() { + return this.primary; + } + + public ReplicationCheckpoint replicationCheckpoint() { + return this.replicationCheckpoint; + } + + public Exception storeException() { + return this.storeException; + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(allocationId); + out.writeBoolean(primary); + if (storeException != null) { + out.writeBoolean(true); + out.writeException(storeException); + } else { + out.writeBoolean(false); + } + if (replicationCheckpoint != null) { + out.writeBoolean(true); + replicationCheckpoint.writeTo(out); + } else { + out.writeBoolean(false); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + NodeGatewayStartedShard that = (NodeGatewayStartedShard) o; + + return primary == that.primary + && Objects.equals(allocationId, that.allocationId) + && Objects.equals(storeException, that.storeException) + && Objects.equals(replicationCheckpoint, that.replicationCheckpoint); + } + + @Override + public int hashCode() { + int result = (allocationId != null ? allocationId.hashCode() : 0); + result = 31 * result + (primary ? 1 : 0); + result = 31 * result + (storeException != null ? storeException.hashCode() : 0); + result = 31 * result + (replicationCheckpoint != null ? replicationCheckpoint.hashCode() : 0); + return result; + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("NodeGatewayStartedShards[").append("allocationId=").append(allocationId).append(",primary=").append(primary); + if (storeException != null) { + buf.append(",storeException=").append(storeException); + } + if (replicationCheckpoint != null) { + buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString()); + } + buf.append("]"); + return buf.toString(); + } + } + + /** + * This is the response from a single node, this is used in {@link NodesGatewayStartedShardsBatch} for creating + * node to its response mapping for this transport request. + * Refer {@link TransportNodesAction} start method + * + * @opensearch.internal + */ + public static class NodeGatewayStartedShardsBatch extends BaseNodeResponse { + private final Map nodeGatewayStartedShardsBatch; + + public Map getNodeGatewayStartedShardsBatch() { + return nodeGatewayStartedShardsBatch; + } + + public NodeGatewayStartedShardsBatch(StreamInput in) throws IOException { + super(in); + this.nodeGatewayStartedShardsBatch = in.readMap(ShardId::new, NodeGatewayStartedShard::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(nodeGatewayStartedShardsBatch, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); + } + + public NodeGatewayStartedShardsBatch(DiscoveryNode node, Map nodeGatewayStartedShardsBatch) { + super(node); + this.nodeGatewayStartedShardsBatch = nodeGatewayStartedShardsBatch; + } + } +}