From 11cab8c5b180815defc7c619a0db6f53ca3b8128 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 19 Jun 2024 13:37:36 +0530 Subject: [PATCH] Skip allocating more shards when cluster limit breached Signed-off-by: Rishab Nahata --- .../routing/allocation/RerouteBenchmark.java | 127 ++++++++++++++++++ .../cluster/routing/RoutingNodes.java | 4 + .../allocator/LocalShardsBalancer.java | 7 + .../allocation/decider/AllocationDecider.java | 8 ++ .../decider/AllocationDeciders.java | 18 +++ ...ConcurrentRecoveriesAllocationDecider.java | 26 ++++ 6 files changed, 190 insertions(+) create mode 100644 benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/RerouteBenchmark.java diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/RerouteBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/RerouteBenchmark.java new file mode 100644 index 0000000000000..908f7f3e43474 --- /dev/null +++ b/benchmarks/src/main/java/org/opensearch/benchmark/routing/allocation/RerouteBenchmark.java @@ -0,0 +1,127 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.benchmark.routing.allocation; + +import org.openjdk.jmh.annotations.*; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.common.logging.LogConfigurator; +import org.opensearch.common.settings.Settings; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@Fork(1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +@SuppressWarnings("unused") // invoked by benchmarking framework +public class RerouteBenchmark { + @Param({ + // indices| nodes + " 5000| 500|", + }) + public String indicesNodes = "1|1"; + public int numIndices; + public int numNodes; + public int numShards = 9; + public int numReplicas = 5; + + + private AllocationService allocationService; + private ClusterState initialClusterState; + + @Setup + public void setUp() throws Exception { + LogConfigurator.setNodeName("test"); + final String[] params = indicesNodes.split("\\|"); + numIndices = toInt(params[0]); + numNodes = toInt(params[1]); + + int totalShardCount = (numReplicas + 1) * numShards * numIndices; + allocationService = Allocators.createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.load_awareness.provisioned_capacity", numNodes) + .put("cluster.routing.allocation.load_awareness.skew_factor", "50") + .put("cluster.routing.allocation.node_concurrent_recoveries", "2") + .build() + ); + Metadata.Builder mb = Metadata.builder(); + for (int i = 1; i <= numIndices; i++) { + mb.put( + IndexMetadata.builder("test_" + i) + .settings(Settings.builder().put("index.version.created", Version.CURRENT)) + .numberOfShards(numShards) + .numberOfReplicas(numReplicas) + ); + } + + Metadata metadata = mb.build(); + RoutingTable.Builder rb = RoutingTable.builder(); + for (int i = 1; i <= numIndices; i++) { + rb.addAsNew(metadata.index("test_" + i)); + } + RoutingTable routingTable = rb.build(); + initialClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(setUpClusterNodes(numNodes)) + .build(); + +// initialClusterState = allocationService.reroute(initialClusterState, "reroute"); +// while (initialClusterState.getRoutingNodes().hasUnassignedShards()) { +// initialClusterState = allocationService.applyStartedShards( +// initialClusterState, +// initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING) +// ); +// initialClusterState = allocationService.reroute(initialClusterState, "reroute"); +// } +// // Ensure all shards are started +// while (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) { +// initialClusterState = allocationService.applyStartedShards( +// initialClusterState, +// initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING) +// ); +// } +// assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size() == totalShardCount); +// assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 0); +// assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size() == 0); + } + + @Benchmark + public ClusterState measureShardAllocationEmptyCluster() throws Exception { + return allocationService.reroute(initialClusterState, "reroute"); + } + + private int toInt(String v) { + return Integer.valueOf(v.trim()); + } + + private DiscoveryNodes.Builder setUpClusterNodes(int nodes) { + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(); + for (int i = 1; i <= nodes; i++) { + Map attributes = new HashMap<>(); + attributes.put("zone", "zone_" + (i % 3)); + nb.add(Allocators.newNode("node_0_" + i, attributes)); + } + return nb; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index ab455f52c4195..e944b06763eb0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -334,6 +334,10 @@ public int getRelocatingShardCount() { return relocatingShards; } + public int getInitializingShardCount() { + return inactiveShardCount; + } + /** * Returns all shards that are not in the state UNASSIGNED with the same shard * ID as the given shard. diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index ec25d041bda43..15a74f4232387 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -793,6 +793,13 @@ void allocateUnassigned() { int primaryLength = primary.length; ArrayUtil.timSort(primary, comparator); do { + if (allocation.deciders().canAllocateAnyShard(allocation).type() == Decision.Type.THROTTLE) { + logger.info( + "Cannot allocate any shard in the cluster due to cluster concurrent recoveries getting breached" + + ". Skipping shard iteration" + ); + return; + } for (int i = 0; i < primaryLength; i++) { ShardRouting shard = primary[i]; final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDecider.java index 19faacc3a3ae1..dcf81a9243a90 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -152,6 +152,14 @@ public Decision canMoveAnyShard(RoutingAllocation allocation) { return Decision.ALWAYS; } + /** + * Returns a {@link Decision} whether any shard can be allocated in the cluster + * {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}. + */ + public Decision canAllocateAnyShard(RoutingAllocation allocation) { + return Decision.ALWAYS; + } + /** * Returns a {@link Decision} whether any shard on the given * {@link RoutingNode}} can be allocated The default is {@link Decision#ALWAYS}. diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDeciders.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDeciders.java index 1263efd19ac46..97c6515dfded2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -314,6 +314,24 @@ public Decision canMoveAnyShard(RoutingAllocation allocation) { return ret; } + @Override + public Decision canAllocateAnyShard(RoutingAllocation allocation) { + Decision.Multi ret = new Decision.Multi(); + for (AllocationDecider decider: allocations) { + Decision decision = decider.canAllocateAnyShard(allocation); + if (decision.type().canPreemptivelyReturn()) { + if (allocation.debugDecision() == false) { + return decision; + } else { + ret.add(decision); + } + } else { + addDecision(ret, decision, allocation); + } + } + return ret; + } + private void addDecision(Decision.Multi ret, Decision decision, RoutingAllocation allocation) { // We never add ALWAYS decisions and only add YES decisions when requested by debug mode (since Multi default is YES). if (decision != Decision.ALWAYS diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ConcurrentRecoveriesAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ConcurrentRecoveriesAllocationDecider.java index 62276c470a86b..8874c170fa2fb 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ConcurrentRecoveriesAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ConcurrentRecoveriesAllocationDecider.java @@ -90,6 +90,32 @@ public Decision canMoveAnyShard(RoutingAllocation allocation) { ); } + @Override + public Decision canAllocateAnyShard(RoutingAllocation allocation) { + if (clusterConcurrentRecoveries == -1) { + return allocation.decision(Decision.YES, NAME, "undefined cluster concurrent recoveries"); + } + int initializingShards = allocation.routingNodes().getInitializingShardCount(); + if (initializingShards >= clusterConcurrentRecoveries) { + return allocation.decision( + Decision.THROTTLE, + NAME, + "too many shards are concurrently initializing [%d], limit: [%d] cluster setting [%s=%d]", + initializingShards, + clusterConcurrentRecoveries, + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_RECOVERIES_SETTING.getKey(), + clusterConcurrentRecoveries + ); + } + return allocation.decision( + Decision.YES, + NAME, + "below threshold [%d] for concurrent recoveries, current initializing shard count [%d]", + clusterConcurrentRecoveries, + initializingShards + ); + } + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return canMoveAnyShard(allocation);