From 941b1b73c4e1224ff35ade11a5195af542fbed83 Mon Sep 17 00:00:00 2001 From: RS146BIJAY Date: Mon, 15 Jul 2024 14:51:58 +0530 Subject: [PATCH] Optmising AwarenessAllocationDecider for hashmap.get call Signed-off-by: RS146BIJAY --- .../allocation/AwarenessAllocationIT.java | 68 ++ .../cluster/node/DiscoveryNode.java | 15 + .../decider/AwarenessAllocationDecider.java | 277 ++++- .../common/settings/ClusterSettings.java | 1 + ...enessAllocationWithZoneOptimisedTests.java | 1038 +++++++++++++++++ 5 files changed, 1368 insertions(+), 31 deletions(-) create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationWithZoneOptimisedTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java index 522d63b22a0da..61f389a848ac4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/AwarenessAllocationIT.java @@ -139,6 +139,74 @@ public void testSimpleAwareness() throws Exception { }, 10, TimeUnit.SECONDS); } + public void testSimpleAwarenessWithZoneOptimised() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.zone_optimised", true) + .build(); + + logger.info("--> starting 2 nodes on the same rack"); + internalCluster().startNodes(2, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); + + Settings settings = Settings.builder() + .put(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.getKey(), false) + .build(); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(settings); + + createIndex("test1"); + createIndex("test2"); + + NumShards test1 = getNumShards("test1"); + NumShards test2 = getNumShards("test2"); + // no replicas will be allocated as both indices end up on a single node + final int totalPrimaries = test1.numPrimaries + test2.numPrimaries; + + ensureGreen(); + + final List indicesToClose = randomSubsetOf(Arrays.asList("test1", "test2")); + indicesToClose.forEach(indexToClose -> assertAcked(client().admin().indices().prepareClose(indexToClose).get())); + + logger.info("--> starting 1 node on a different rack"); + final String node3 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()); + + // On slow machines the initial relocation might be delayed + assertBusy(() -> { + logger.info("--> waiting for no relocation"); + ClusterHealthResponse clusterHealth = client().admin() + .cluster() + .prepareHealth() + .setIndices("test1", "test2") + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes("3") + .setWaitForNoRelocatingShards(true) + .get(); + + assertThat("Cluster health request timed out", clusterHealth.isTimedOut(), equalTo(false)); + + logger.info("--> checking current state"); + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + + // check that closed indices are effectively closed + final List notClosedIndices = indicesToClose.stream() + .filter(index -> clusterState.metadata().index(index).getState() != State.CLOSE) + .collect(Collectors.toList()); + assertThat("Some indices not closed", notClosedIndices, empty()); + + // verify that we have all the primaries on node3 + final Map counts = new HashMap<>(); + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum); + } + } + } + assertThat(counts.get(node3), equalTo(totalPrimaries)); + }, 10, TimeUnit.SECONDS); + } + public void testAwarenessZones() { Settings commonSettings = Settings.builder() .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b") diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 653f81830ed17..ab29248fb5b48 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -143,6 +143,8 @@ public static boolean isDedicatedSearchNode(Settings settings) { private final Map attributes; private final Version version; private final SortedSet roles; + private final String zoneValue; + private final boolean hasZoneAttribute; /** * Creates a new {@link DiscoveryNode} @@ -268,6 +270,8 @@ public DiscoveryNode( this.version = version; } this.attributes = Collections.unmodifiableMap(attributes); + this.zoneValue = this.attributes.get("zone"); + this.hasZoneAttribute = attributes.containsKey("zone"); // verify that no node roles are being provided as attributes Predicate> predicate = (attrs) -> { boolean success = true; @@ -329,6 +333,9 @@ public DiscoveryNode(StreamInput in) throws IOException { for (int i = 0; i < size; i++) { this.attributes.put(in.readString(), in.readString()); } + + this.zoneValue = this.attributes.get("zone"); + this.hasZoneAttribute = attributes.containsKey("zone"); int rolesSize = in.readVInt(); final Set roles = new HashSet<>(rolesSize); for (int i = 0; i < rolesSize; i++) { @@ -458,6 +465,14 @@ public boolean isRemoteClusterClient() { return roles.contains(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE); } + public String getZoneValue() { + return zoneValue; + } + + public boolean hasZoneAttribute() { + return hasZoneAttribute; + } + /** * Returns whether the node is dedicated to provide search capability. * diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index 5344d95b217a7..4e3e1d724ee9b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -110,13 +110,23 @@ public class AwarenessAllocationDecider extends AllocationDecider { Property.NodeScope ); + public static final Setting CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED = Setting.boolSetting( + "cluster.routing.allocation.awareness.zone_optimised", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + private volatile List awarenessAttributes; + private volatile boolean isAllocationZoneOptimised; private volatile Map> forcedAwarenessAttributes; public AwarenessAllocationDecider(Settings settings, ClusterSettings clusterSettings) { this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes); + this.isAllocationZoneOptimised = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.get(settings); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED, this::setAllocationZoneOptimised); setForcedAwarenessAttributes(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer( CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, @@ -140,6 +150,10 @@ private void setAwarenessAttributes(List awarenessAttributes) { this.awarenessAttributes = awarenessAttributes; } + private void setAllocationZoneOptimised(boolean isAllocationZoneOptimised) { + this.isAllocationZoneOptimised = isAllocationZoneOptimised; + } + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return underCapacity(shardRouting, node, allocation, true); @@ -150,6 +164,95 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl return underCapacity(shardRouting, node, allocation, false); } + // private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) { + // if (awarenessAttributes.isEmpty()) { + // return allocation.decision( + // Decision.YES, + // NAME, + // "allocation awareness is not enabled, set cluster setting [%s] to enable it", + // CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey() + // ); + // } + // + // IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index()); + // int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary + // for (String awarenessAttribute : awarenessAttributes) { + // // the node the shard exists on must be associated with an awareness attribute + // if (node.node().getAttributes().containsKey(awarenessAttribute) == false) { + // return allocation.decision( + // Decision.NO, + // NAME, + // "node does not contain the awareness attribute [%s]; required attributes cluster setting [%s=%s]", + // awarenessAttribute, + // CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), + // allocation.debugDecision() ? Strings.collectionToCommaDelimitedString(awarenessAttributes) : null + // ); + // } + // + // // build attr_value -> nodes map + // Set nodesPerAttribute = allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute); + // + // // build the count of shards per attribute value + // Map shardPerAttribute = new HashMap<>(); + // for (ShardRouting assignedShard : allocation.routingNodes().assignedShards(shardRouting.shardId())) { + // if (assignedShard.started() || assignedShard.initializing()) { + // // Note: this also counts relocation targets as that will be the new location of the shard. + // // Relocation sources should not be counted as the shard is moving away + // RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId()); + // shardPerAttribute.merge(routingNode.node().getAttributes().get(awarenessAttribute), 1, Integer::sum); + // } + // } + // + // if (moveToNode) { + // if (shardRouting.assignedToNode()) { + // String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId(); + // if (node.nodeId().equals(nodeId) == false) { + // // we work on different nodes, move counts around + // shardPerAttribute.compute( + // allocation.routingNodes().node(nodeId).node().getAttributes().get(awarenessAttribute), + // (k, v) -> (v == null) ? 0 : v - 1 + // ); + // shardPerAttribute.merge(node.node().getAttributes().get(awarenessAttribute), 1, Integer::sum); + // } + // } else { + // shardPerAttribute.merge(node.node().getAttributes().get(awarenessAttribute), 1, Integer::sum); + // } + // } + // + // int numberOfAttributes = nodesPerAttribute.size(); + // List fullValues = forcedAwarenessAttributes.get(awarenessAttribute); + // + // if (fullValues != null) { + // // If forced awareness is enabled, numberOfAttributes = count(distinct((union(discovered_attributes, forced_attributes))) + // Set attributesSet = new HashSet<>(fullValues); + // for (String stringObjectCursor : nodesPerAttribute) { + // attributesSet.add(stringObjectCursor); + // } + // numberOfAttributes = attributesSet.size(); + // } + // // TODO should we remove ones that are not part of full list? + // + // final int currentNodeCount = shardPerAttribute.get(node.node().getAttributes().get(awarenessAttribute)); + // final int maximumNodeCount = (shardCount + numberOfAttributes - 1) / numberOfAttributes; // ceil(shardCount/numberOfAttributes) + // if (currentNodeCount > maximumNodeCount) { + // return allocation.decision( + // Decision.NO, + // NAME, + // "there are too many copies of the shard allocated to nodes with attribute [%s], there are [%d] total configured " + // + "shard copies for this shard id and [%d] total attribute values, expected the allocated shard count per " + // + "attribute [%d] to be less than or equal to the upper bound of the required number of shards per attribute [%d]", + // awarenessAttribute, + // shardCount, + // numberOfAttributes, + // currentNodeCount, + // maximumNodeCount + // ); + // } + // } + // + // return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements"); + // } + private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) { if (awarenessAttributes.isEmpty()) { return allocation.decision( @@ -164,7 +267,7 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary for (String awarenessAttribute : awarenessAttributes) { // the node the shard exists on must be associated with an awareness attribute - if (node.node().getAttributes().containsKey(awarenessAttribute) == false) { + if (isAwarenessAttributeAssociatedWithNode(node, awarenessAttribute) == false) { return allocation.decision( Decision.NO, NAME, @@ -175,36 +278,10 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout ); } + int currentNodeCount = getCurrentNodeCountForAttribute(shardRouting, node, allocation, moveToNode, awarenessAttribute); + // build attr_value -> nodes map Set nodesPerAttribute = allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute); - - // build the count of shards per attribute value - Map shardPerAttribute = new HashMap<>(); - for (ShardRouting assignedShard : allocation.routingNodes().assignedShards(shardRouting.shardId())) { - if (assignedShard.started() || assignedShard.initializing()) { - // Note: this also counts relocation targets as that will be the new location of the shard. - // Relocation sources should not be counted as the shard is moving away - RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId()); - shardPerAttribute.merge(routingNode.node().getAttributes().get(awarenessAttribute), 1, Integer::sum); - } - } - - if (moveToNode) { - if (shardRouting.assignedToNode()) { - String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId(); - if (node.nodeId().equals(nodeId) == false) { - // we work on different nodes, move counts around - shardPerAttribute.compute( - allocation.routingNodes().node(nodeId).node().getAttributes().get(awarenessAttribute), - (k, v) -> (v == null) ? 0 : v - 1 - ); - shardPerAttribute.merge(node.node().getAttributes().get(awarenessAttribute), 1, Integer::sum); - } - } else { - shardPerAttribute.merge(node.node().getAttributes().get(awarenessAttribute), 1, Integer::sum); - } - } - int numberOfAttributes = nodesPerAttribute.size(); List fullValues = forcedAwarenessAttributes.get(awarenessAttribute); @@ -216,9 +293,8 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout } numberOfAttributes = attributesSet.size(); } - // TODO should we remove ones that are not part of full list? - final int currentNodeCount = shardPerAttribute.get(node.node().getAttributes().get(awarenessAttribute)); + // TODO should we remove ones that are not part of full list? final int maximumNodeCount = (shardCount + numberOfAttributes - 1) / numberOfAttributes; // ceil(shardCount/numberOfAttributes) if (currentNodeCount > maximumNodeCount) { return allocation.decision( @@ -238,4 +314,143 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements"); } + + private int getCurrentNodeCountForAttribute( + ShardRouting shardRouting, + RoutingNode node, + RoutingAllocation allocation, + boolean moveToNode, + String awarenessAttribute + ) { + // build the count of shards per attribute value + final String shardAttributeForNode = getAttributeValueForNode(node, awarenessAttribute); + int currentNodeCount = 0; + final List assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId()); + for (ShardRouting assignedShard : assignedShards) { + if (assignedShard.started() || assignedShard.initializing()) { + // Note: this also counts relocation targets as that will be the new location of the shard. + // Relocation sources should not be counted as the shard is moving away + RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId()); + // Increase node count when + if (getAttributeValueForNode(routingNode, awarenessAttribute).equals(shardAttributeForNode)) { + ++currentNodeCount; + } + } + } + + if (moveToNode) { + if (shardRouting.assignedToNode()) { + String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId(); + if (node.nodeId().equals(nodeId) == false) { + // we work on different nodes, move counts around + if (getAttributeValueForNode(allocation.routingNodes().node(nodeId), awarenessAttribute).equals(shardAttributeForNode) + && currentNodeCount > 0) { + --currentNodeCount; + } + + ++currentNodeCount; + } + } else { + ++currentNodeCount; + } + } + + return currentNodeCount; + } + + private boolean isAwarenessAttributeAssociatedWithNode(RoutingNode node, String awarenessAttribute) { + if (isAllocationZoneOptimised) { + return node.node().hasZoneAttribute(); + } else { + return node.node().getAttributes().containsKey(awarenessAttribute); + } + } + + private String getAttributeValueForNode(final RoutingNode node, final String awarenessAttribute) { + if (isAllocationZoneOptimised) { + return node.node().getZoneValue(); + } else { + return node.node().getAttributes().get(awarenessAttribute); + } + } + + private Decision underCapacityOptimised(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) { + if (awarenessAttributes.isEmpty()) { + return allocation.decision( + Decision.YES, + NAME, + "allocation awareness is not enabled, set cluster setting [%s] to enable it", + CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey() + ); + } + + IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index()); + int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary + // the node the shard exists on must be associated with an awareness attribute + if (node.node().hasZoneAttribute() == false) { + return allocation.decision( + Decision.NO, + NAME, + "node does not contain the awareness attribute [%s]; required attributes cluster setting [%s=%s]", + "zone", + CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), + allocation.debugDecision() ? Strings.collectionToCommaDelimitedString(awarenessAttributes) : null + ); + } + + // build attr_value -> nodes map + Set nodesPerAttribute = allocation.routingNodes().nodesPerAttributesCounts("zone"); + + final String shardAttributeForNode = node.node().getZoneValue(); + int currentNodeCount = 0; + // build the count of shards per attribute value + final List assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId()); + for (ShardRouting assignedShard : assignedShards) { + if (assignedShard.started() || assignedShard.initializing()) { + // Note: this also counts relocation targets as that will be the new location of the shard. + // Relocation sources should not be counted as the shard is moving away + RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId()); + if (routingNode.node().getZoneValue().equals(shardAttributeForNode)) { + ++currentNodeCount; + } + } + } + + if (moveToNode) { + if (shardRouting.assignedToNode()) { + String nodeId = shardRouting.relocating() ? shardRouting.relocatingNodeId() : shardRouting.currentNodeId(); + if (node.nodeId().equals(nodeId) == false) { + // we work on different nodes, move counts around + if (allocation.routingNodes().node(nodeId).node().getZoneValue().equals(shardAttributeForNode) + && currentNodeCount > 0) { + --currentNodeCount; + } + + ++currentNodeCount; + } + } else { + ++currentNodeCount; + } + } + + int numberOfAttributes = nodesPerAttribute.size(); + List fullValues = forcedAwarenessAttributes.get("zone"); + + if (fullValues != null) { + // If forced awareness is enabled, numberOfAttributes = count(distinct((union(discovered_attributes, forced_attributes))) + Set attributesSet = new HashSet<>(fullValues); + for (String stringObjectCursor : nodesPerAttribute) { + attributesSet.add(stringObjectCursor); + } + numberOfAttributes = attributesSet.size(); + } + // TODO should we remove ones that are not part of full list? + final int maximumNodeCount = (shardCount + numberOfAttributes - 1) / numberOfAttributes; // ceil(shardCount/numberOfAttributes) + if (currentNodeCount > maximumNodeCount) { + return allocation.decision(Decision.NO, NAME, "there are too many copies of the shard allocated to nodes"); + } + + return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements"); + } + } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 5dcf23ae52294..23c05365959be 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -257,6 +257,7 @@ public void apply(Settings value, Settings current, Settings previous) { new HashSet<>( Arrays.asList( AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, + AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED, AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING, BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationWithZoneOptimisedTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationWithZoneOptimisedTests.java new file mode 100644 index 0000000000000..5c254fae85934 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationWithZoneOptimisedTests.java @@ -0,0 +1,1038 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.command.AllocationCommands; +import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; +import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; +import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.sameInstance; + +public class AwarenessAllocationWithZoneOptimisedTests extends OpenSearchAllocationTestCase { + + private final Logger logger = LogManager.getLogger(AwarenessAllocationTests.class); + + public void testMoveShardOnceNewNodeWithAttributeAdded1() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.getKey(), true) + .build() + ); + + logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded1'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder().add(newNode("node1", singletonMap("zone", "a"))).add(newNode("node2", singletonMap("zone", "a"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3", singletonMap("zone", "b")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat( + clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).get(0).relocatingNodeId(), + equalTo("node3") + ); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, make sure nothing moves"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node4", singletonMap("node.attr.zone", "c")))) + .build(); + ClusterState newState = strategy.reroute(clusterState, "reroute"); + assertThat(newState, equalTo(clusterState)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + } + + public void testMoveShardOnceNewNodeWithAttributeAdded2() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.getKey(), true) + .build() + ); + + logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded2'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", singletonMap("zone", "a"))) + .add(newNode("node2", singletonMap("zone", "a"))) + .add(newNode("node3", singletonMap("zone", "a"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node4", singletonMap("zone", "b")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat( + clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).get(0).relocatingNodeId(), + equalTo("node4") + ); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, make sure nothing moves"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node5", singletonMap("zone", "c")))) + .build(); + ClusterState newState = strategy.reroute(clusterState, "reroute"); + assertThat(newState, equalTo(clusterState)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + } + + public void testMoveShardOnceNewNodeWithAttributeAdded3() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.getKey(), true) + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.balance.index", 0.0f) + .put("cluster.routing.allocation.balance.replica", 1.0f) + .put("cluster.routing.allocation.balance.primary", 0.0f) + .build() + ); + + logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded3'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder().add(newNode("node1", singletonMap("zone", "a"))).add(newNode("node2", singletonMap("zone", "a"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + logger.info("Initializing shards: {}", clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + logger.info("Started shards: {}", clusterState.getRoutingNodes().shardsWithState(STARTED)); + logger.info("Relocating shards: {}", clusterState.getRoutingNodes().shardsWithState(RELOCATING)); + logger.info("Unassigned shards: {}", clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)); + + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(10)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3", singletonMap("zone", "b")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(5)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(5)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(5)); + assertThat( + clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).get(0).relocatingNodeId(), + equalTo("node3") + ); + + logger.info("--> complete initializing"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> run it again, since we still might have relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(10)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, some more relocation should happen"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node4", singletonMap("zone", "c")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(RELOCATING).size(), greaterThan(0)); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(10)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + } + + public void testMoveShardOnceNewNodeWithAttributeAdded4() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.getKey(), true) + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .put("cluster.routing.allocation.awareness.attributes", "zone") + .build() + ); + + logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded4'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1)) + .put(IndexMetadata.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder() + .addAsNew(metadata.index("test1")) + .addAsNew(metadata.index("test2")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder().add(newNode("node1", singletonMap("zone", "a"))).add(newNode("node2", singletonMap("zone", "a"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(10)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(20)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3", singletonMap("zone", "b")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(10)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(10)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(10)); + assertThat( + clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).get(0).relocatingNodeId(), + equalTo("node3") + ); + + logger.info("--> complete initializing"); + for (int i = 0; i < 2; i++) { + logger.info("--> complete initializing round: [{}]", i); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + } + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(20)); + assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(10)); + assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(5)); + assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(5)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, some more relocation should happen"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node4", singletonMap("zone", "c")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(RELOCATING).size(), greaterThan(0)); + + logger.info("--> complete relocation"); + for (int i = 0; i < 2; i++) { + logger.info("--> complete initializing round: [{}]", i); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + } + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(20)); + assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(5)); + assertThat(clusterState.getRoutingNodes().node("node4").size(), equalTo(5)); + assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(5)); + assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(5)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + } + + public void testMoveShardOnceNewNodeWithAttributeAdded5() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.getKey(), true) + .put("cluster.routing.allocation.awareness.attributes", "zone") + .build() + ); + + logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded5'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder().add(newNode("node1", singletonMap("zone", "a"))).add(newNode("node2", singletonMap("zone", "a"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3", singletonMap("zone", "b")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo("node3")); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(3)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, we will have another relocation"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node4", singletonMap("zone", "c")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat( + clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).get(0).relocatingNodeId(), + equalTo("node4") + ); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(3)); + + logger.info("--> make sure another reroute does not move things"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + } + + public void testMoveShardOnceNewNodeWithAttributeAdded6() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.getKey(), true) + .put("cluster.routing.allocation.awareness.attributes", "zone") + .build() + ); + + logger.info("Building initial routing table for 'moveShardOnceNewNodeWithAttributeAdded6'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(3)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", singletonMap("zone", "a"))) + .add(newNode("node2", singletonMap("zone", "a"))) + .add(newNode("node3", singletonMap("zone", "a"))) + .add(newNode("node4", singletonMap("zone", "a"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> start the shards (replicas)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node5", singletonMap("zone", "b")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(3)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat( + clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).get(0).relocatingNodeId(), + equalTo("node5") + ); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, we will have another relocation"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node6", singletonMap("zone", "c")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(3)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat( + clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).get(0).relocatingNodeId(), + equalTo("node6") + ); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4)); + + logger.info("--> make sure another reroute does not move things"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + } + + public void testFullAwareness1() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.getKey(), true) + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b") + .put("cluster.routing.allocation.awareness.attributes", "zone") + .build() + ); + + logger.info("Building initial routing table for 'fullAwareness1'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder().add(newNode("node1", singletonMap("zone", "a"))).add(newNode("node2", singletonMap("zone", "a"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> replica will not start because we have only one rack value"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3", singletonMap("zone", "b")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo("node3")); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, make sure nothing moves"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node4", singletonMap("zone", "c")))) + .build(); + ClusterState newState = strategy.reroute(clusterState, "reroute"); + assertThat(newState, equalTo(clusterState)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + } + + public void testFullAwareness2() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.getKey(), true) + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b") + .put("cluster.routing.allocation.awareness.attributes", "zone") + .build() + ); + + logger.info("Building initial routing table for 'fullAwareness2'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", singletonMap("zone", "a"))) + .add(newNode("node2", singletonMap("zone", "a"))) + .add(newNode("node3", singletonMap("zone", "a"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> replica will not start because we have only one rack value"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node4", singletonMap("zone", "b")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo("node4")); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(2)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, make sure nothing moves"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node5", singletonMap("zone", "c")))) + .build(); + ClusterState newState = strategy.reroute(clusterState, "reroute"); + assertThat(newState, equalTo(clusterState)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + } + + public void testFullAwareness3() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.getKey(), true) + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b") + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.balance.index", 0.0f) + .put("cluster.routing.allocation.balance.replica", 1.0f) + .put("cluster.routing.allocation.balance.primary", 0.0f) + .build() + ); + + logger.info("Building initial routing table for 'fullAwareness3'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1)) + .put(IndexMetadata.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder() + .addAsNew(metadata.index("test1")) + .addAsNew(metadata.index("test2")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding two nodes on same rack and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder().add(newNode("node1", singletonMap("zone", "a"))).add(newNode("node2", singletonMap("zone", "a"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(10)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(10)); + + logger.info("--> add a new node with a new rack and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3", singletonMap("zone", "b")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(10)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(10)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo("node3")); + + logger.info("--> complete initializing"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("--> run it again, since we still might have relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(20)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + + logger.info("--> add another node with a new rack, some more relocation should happen"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node4", singletonMap("zone", "c")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(RELOCATING).size(), greaterThan(0)); + + logger.info("--> complete relocation"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(20)); + + logger.info("--> do another reroute, make sure nothing moves"); + assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); + } + + public void testUnbalancedZones() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b") + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.getKey(), true) + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .build() + ); + + logger.info("Building initial routing table for 'testUnbalancedZones'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding two nodes in different zones and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("A-0", singletonMap("zone", "a"))).add(newNode("B-0", singletonMap("zone", "b")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(0)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(5)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); + + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + logger.info("--> all replicas are allocated and started since we have on node in each zone"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(10)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + + logger.info("--> add a new node in zone 'a' and reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("A-1", singletonMap("zone", "a")))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(8)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo("A-1")); + logger.info("--> starting initializing shards on the new node"); + + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(10)); + assertThat(clusterState.getRoutingNodes().node("A-1").size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().node("A-0").size(), equalTo(3)); + assertThat(clusterState.getRoutingNodes().node("B-0").size(), equalTo(5)); + } + + public void testUnassignedShardsWithUnbalancedZones() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.getKey(), true) + .put("cluster.routing.allocation.awareness.attributes", "zone") + .build() + ); + + logger.info("Building initial routing table for 'testUnassignedShardsWithUnbalancedZones'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(4)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding 5 nodes in different zones and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("A-0", singletonMap("zone", "a"))) + .add(newNode("A-1", singletonMap("zone", "a"))) + .add(newNode("A-2", singletonMap("zone", "a"))) + .add(newNode("A-3", singletonMap("zone", "a"))) + .add(newNode("A-4", singletonMap("zone", "a"))) + .add(newNode("B-0", singletonMap("zone", "b"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(0)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shard (primary)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(3)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1)); // Unassigned shard is expected. + + // Cancel all initializing shards and move started primary to another node. + AllocationCommands commands = new AllocationCommands(); + String primaryNode = null; + for (ShardRouting routing : clusterState.routingTable().allShards()) { + if (routing.primary()) { + primaryNode = routing.currentNodeId(); + } else if (routing.initializing()) { + commands.add(new CancelAllocationCommand(routing.shardId().getIndexName(), routing.id(), routing.currentNodeId(), false)); + } + } + commands.add(new MoveAllocationCommand("test", 0, primaryNode, "A-4")); + + clusterState = strategy.reroute(clusterState, commands, false, false).getClusterState(); + + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(0)); + assertThat(clusterState.getRoutingNodes().shardsWithState(RELOCATING).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(4)); // +1 for relocating shard. + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1)); // Still 1 unassigned. + } + + public void testMultipleAwarenessAttributes() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone, rack") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a, b") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "rack.values", "c, d") + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.getKey(), true) + .build() + ); + + logger.info("Building initial routing table for 'testUnbalancedZones'"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding two nodes in different zones and do rerouting"); + Map nodeAAttributes = new HashMap<>(); + nodeAAttributes.put("zone", "a"); + nodeAAttributes.put("rack", "c"); + Map nodeBAttributes = new HashMap<>(); + nodeBAttributes.put("zone", "b"); + nodeBAttributes.put("rack", "d"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("A-0", nodeAAttributes)).add(newNode("B-0", nodeBAttributes))) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(0)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shards (primaries)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + logger.info("--> all replicas are allocated and started since we have one node in each zone and rack"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + } + + public void testAllocationExplainForUnassignedShardsWithUnbalancedZones() { + Settings settings = Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ZONE_OPTIMISED.getKey(), true) + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + AllocationService strategy = createAllocationService(settings); + + logger.info("Building initial routing table for 'testAllocationExplainForUnassignedShardsWithUnbalancedZones'"); + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding 3 nodes in different zones and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("A-0", singletonMap("zone", "a"))) + .add(newNode("A-1", singletonMap("zone", "a"))) + .add(newNode("B-0", singletonMap("zone", "b"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(0)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shard (primary)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + // One Shard is unassigned due to forced zone awareness + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1)); + + List unassignedShards = clusterState.getRoutingTable().shardsWithState(UNASSIGNED); + + ClusterSettings EMPTY_CLUSTER_SETTINGS = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + // Add a new node in zone c + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("C-0", singletonMap("zone", "c")))) + .build(); + + final AwarenessAllocationDecider decider = new AwarenessAllocationDecider(settings, EMPTY_CLUSTER_SETTINGS); + + final RoutingAllocation allocation = new RoutingAllocation( + new AllocationDeciders(Collections.singleton(decider)), + clusterState.getRoutingNodes(), + clusterState, + null, + null, + 0L + ); + allocation.debugDecision(true); + + Decision decision = null; + RoutingNodes nodes = clusterState.getRoutingNodes(); + + for (RoutingNode node : nodes) { + // Try to allocate unassigned shard to A-0, fails because of forced zone awareness + if (node.nodeId().equals("A-0")) { + decision = decider.canAllocate(unassignedShards.get(0), node, allocation); + assertEquals(Decision.Type.NO, decision.type()); + assertEquals( + decision.getExplanation(), + "there are too many copies of the shard allocated to nodes with attribute" + + " [zone], there are [3] total configured shard copies for this shard id and [3]" + + " total attribute values, expected the allocated shard count per attribute [2] to" + + " be less than or equal to the upper bound of the required number of shards per attribute [1]" + ); + } + + } + } +}