Skip to content

Commit

Permalink
Change priority for scheduling reroute in timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN authored and Arpit-Bandejiya committed Oct 25, 2024
1 parent ca40ba4 commit fd983f3
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

import static org.opensearch.cluster.action.shard.ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
Expand Down Expand Up @@ -191,6 +193,32 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Setting.Property.Dynamic
);

/**
* Adjusts the priority of the followup reroute task when current round times out. NORMAL is right for reasonable clusters,
* but for a cluster in a messed up state which is starving NORMAL priority tasks, it might be necessary to raise this higher
* to allocate shards.
*/
public static final Setting<Priority> FOLLOW_UP_REROUTE_PRIORITY_SETTING = new Setting<>(
"cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority",
Priority.NORMAL.toString(),
BalancedShardsAllocator::parseReroutePriority,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private static Priority parseReroutePriority(String priorityString) {
final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT));
switch (priority) {
case NORMAL:
case HIGH:
case URGENT:
return priority;
}
throw new IllegalArgumentException(
"priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]"
);
}

private volatile boolean movePrimaryFirst;
private volatile ShardMovementStrategy shardMovementStrategy;

Expand All @@ -204,6 +232,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {

private volatile boolean ignoreThrottleInRestore;
private volatile TimeValue allocatorTimeout;
private volatile Priority followUpRerouteTaskPriority;
private long startTime;
private RerouteService rerouteService;

Expand All @@ -223,6 +252,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
setAllocatorTimeout(ALLOCATOR_TIMEOUT_SETTING.get(settings));
setFollowUpRerouteTaskPriority(FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
Expand All @@ -233,6 +263,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore);
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
clusterSettings.addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING, this::setFollowUpRerouteTaskPriority);
}

@Override
Expand Down Expand Up @@ -321,6 +352,10 @@ private void setAllocatorTimeout(TimeValue allocatorTimeout) {
this.allocatorTimeout = allocatorTimeout;
}

private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) {
this.followUpRerouteTaskPriority = followUpRerouteTaskPriority;
}

protected boolean allocatorTimedOut() {
if (allocatorTimeout.equals(TimeValue.MINUS_ONE)) {
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -417,10 +452,13 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {

private void scheduleRerouteIfAllocatorTimedOut() {
if (allocatorTimedOut()) {
assert rerouteService != null : "RerouteService not set to schedule reroute after allocator time out";
if (rerouteService == null) {
logger.info("RerouteService not set to schedule reroute after allocator time out");
return;
}
rerouteService.reroute(
"reroute after balanced shards allocator timed out",
Priority.HIGH,
followUpRerouteTaskPriority,
ActionListener.wrap(
r -> logger.trace("reroute after balanced shards allocator timed out completed"),
e -> logger.debug("reroute after balanced shards allocator timed out failed", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.THRESHOLD_SETTING,
BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE,
BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING,
BalancedShardsAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_TYPE,
Expand Down Expand Up @@ -353,6 +354,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE,
ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING,
ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING,
ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING,
PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD,
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -82,6 +83,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {

private TimeValue primaryShardsBatchGatewayAllocatorTimeout;
private TimeValue replicaShardsBatchGatewayAllocatorTimeout;
private volatile Priority followUpRerouteTaskPriority;
public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20);
private final ClusterManagerMetrics clusterManagerMetrics;

Expand Down Expand Up @@ -145,6 +147,32 @@ public void validate(TimeValue timeValue) {
Setting.Property.Dynamic
);

/**
* Adjusts the priority of the followup reroute task when current round times out. NORMAL is right for reasonable clusters,
* but for a cluster in a messed up state which is starving NORMAL priority tasks, it might be necessary to raise this higher
* to allocate existing shards.
*/
public static final Setting<Priority> FOLLOW_UP_REROUTE_PRIORITY_SETTING = new Setting<>(
"cluster.routing.allocation.shards_batch_gateway_allocator.schedule_reroute.priority",
Priority.NORMAL.toString(),
ShardsBatchGatewayAllocator::parseReroutePriority,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private static Priority parseReroutePriority(String priorityString) {
final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT));
switch (priority) {
case NORMAL:
case HIGH:
case URGENT:
return priority;
}
throw new IllegalArgumentException(
"priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]"
);
}

private final RerouteService rerouteService;
private final PrimaryShardBatchAllocator primaryShardBatchAllocator;
private final ReplicaShardBatchAllocator replicaShardBatchAllocator;
Expand Down Expand Up @@ -179,6 +207,8 @@ public ShardsBatchGatewayAllocator(
this.replicaShardsBatchGatewayAllocatorTimeout = REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING, this::setReplicaBatchAllocatorTimeout);
this.clusterManagerMetrics = clusterManagerMetrics;
setFollowUpRerouteTaskPriority(FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING, this::setFollowUpRerouteTaskPriority);
}

@Override
Expand Down Expand Up @@ -308,8 +338,8 @@ public void onComplete() {
logger.trace("scheduling reroute after existing shards allocator timed out for primary shards");
assert rerouteService != null;
rerouteService.reroute(
"reroute after existing shards allocator timed out",
Priority.HIGH,
"reroute after existing shards allocator [P] timed out",
followUpRerouteTaskPriority,
ActionListener.wrap(
r -> logger.trace("reroute after existing shards allocator timed out completed"),
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
Expand Down Expand Up @@ -343,8 +373,8 @@ public void onComplete() {
logger.trace("scheduling reroute after existing shards allocator timed out for replica shards");
assert rerouteService != null;
rerouteService.reroute(
"reroute after existing shards allocator timed out",
Priority.HIGH,
"reroute after existing shards allocator [R] timed out",
followUpRerouteTaskPriority,
ActionListener.wrap(
r -> logger.trace("reroute after existing shards allocator timed out completed"),
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
Expand Down Expand Up @@ -920,4 +950,8 @@ protected void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatew
protected void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatewayAllocatorTimeout) {
this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout;
}

protected void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) {
this.followUpRerouteTaskPriority = followUpRerouteTaskPriority;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testAllUnassignedShardsAllocatedWhenNoTimeOutAndRerouteNotScheduled(
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -143,6 +143,49 @@ public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduled() {
System.nanoTime()
);
AtomicBoolean rerouteScheduled = new AtomicBoolean(false);
final RerouteService rerouteService = (reason, priority, listener) -> {
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
} else {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
allocator.allocate(allocation);
List<ShardRouting> initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING);
int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId());
int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId());
int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId());
assertEquals(0, initializingShards.size());
assertEquals(totalShardCount, allocation.routingNodes().unassigned().ignored().size());
assertEquals(0, node1Recoveries + node2Recoveries + node3Recoveries);
assertTrue(rerouteScheduled.get());
}

public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduledWithHighPriority() {
int numberOfIndices = 2;
int numberOfShards = 5;
int numberOfReplicas = 1;
int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1));
Settings.Builder settings = Settings.builder()
.put("cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority", "high");
// passing 0 for timed out latch such that all shard times out
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(0));
Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas);
RoutingTable routingTable = buildRoutingTable(metadata);
setupStateAndService(metadata, routingTable);
RoutingAllocation allocation = new RoutingAllocation(
yesAllocationDeciders(),
new RoutingNodes(state, false),
state,
ClusterInfo.EMPTY,
null,
System.nanoTime()
);
AtomicBoolean rerouteScheduled = new AtomicBoolean(false);
final RerouteService rerouteService = (reason, priority, listener) -> {
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
Expand Down Expand Up @@ -193,7 +236,7 @@ public void testAllocatePartialPrimaryShardsUntilTimedOutAndRerouteScheduled() {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -237,7 +280,7 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOutAndR
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -284,7 +327,7 @@ public void testAllShardsMoveWhenExcludedAndTimeoutNotBreachedAndRerouteNotSched
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -326,7 +369,7 @@ public void testNoShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteScheduled()
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -371,7 +414,7 @@ public void testPartialShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteSchedul
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -416,7 +459,7 @@ public void testClusterRebalancedWhenNotTimedOutAndRerouteNotScheduled() {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -462,7 +505,7 @@ public void testClusterNotRebalancedWhenTimedOutAndRerouteScheduled() {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -522,7 +565,7 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down
Loading

0 comments on commit fd983f3

Please sign in to comment.