Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support intra-broker throttling (replica.alter.log.dirs.io.max.bytes.per.second) #2145

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,8 @@ private static boolean hasProposalsToExecute(Collection<ExecutionProposal> propo
* (if null, use default.replica.movement.strategies).
* @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers
* when executing proposals (if null, no throttling is applied).
* @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs
* when executing proposals (if null, no throttling is applied).
* @param isTriggeredByUserRequest Whether the execution is triggered by a user request.
* @param uuid UUID of the execution.
* @param skipInterBrokerReplicaConcurrencyAdjustment {@code true} to skip auto adjusting concurrency of inter-broker
Expand All @@ -667,14 +669,15 @@ public void executeProposals(Set<ExecutionProposal> proposals,
Long executionProgressCheckIntervalMs,
ReplicaMovementStrategy replicaMovementStrategy,
Long replicationThrottle,
Long logDirThrottle,
boolean isTriggeredByUserRequest,
String uuid,
boolean skipInterBrokerReplicaConcurrencyAdjustment) throws OngoingExecutionException {
if (hasProposalsToExecute(proposals, uuid)) {
_executor.executeProposals(proposals, unthrottledBrokers, null, _loadMonitor, concurrentInterBrokerPartitionMovements,
maxInterBrokerPartitionMovements, concurrentIntraBrokerPartitionMovements, clusterConcurrentLeaderMovements,
brokerConcurrentLeaderMovements, executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle,
isTriggeredByUserRequest, uuid, isKafkaAssignerMode, skipInterBrokerReplicaConcurrencyAdjustment);
logDirThrottle, isTriggeredByUserRequest, uuid, isKafkaAssignerMode, skipInterBrokerReplicaConcurrencyAdjustment);
} else {
failGeneratingProposalsForExecution(uuid);
}
Expand All @@ -700,6 +703,8 @@ public void executeProposals(Set<ExecutionProposal> proposals,
* (if null, use default.replica.movement.strategies).
* @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers
* when executing remove operations (if null, no throttling is applied).
* @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs
* when executing remove operations (if null, no throttling is applied).
* @param isTriggeredByUserRequest Whether the execution is triggered by a user request.
* @param uuid UUID of the execution.
*/
Expand All @@ -714,13 +719,14 @@ public void executeRemoval(Set<ExecutionProposal> proposals,
Long executionProgressCheckIntervalMs,
ReplicaMovementStrategy replicaMovementStrategy,
Long replicationThrottle,
Long logDirThrottle,
boolean isTriggeredByUserRequest,
String uuid) throws OngoingExecutionException {
if (hasProposalsToExecute(proposals, uuid)) {
_executor.executeProposals(proposals, throttleDecommissionedBroker ? Collections.emptySet() : removedBrokers, removedBrokers,
_loadMonitor, concurrentInterBrokerPartitionMovements, maxInterBrokerPartitionMovements, 0,
clusterLeaderMovementConcurrency, brokerLeaderMovementConcurrency,
executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle,
executionProgressCheckIntervalMs, replicaMovementStrategy, replicationThrottle, logDirThrottle,
isTriggeredByUserRequest, uuid, isKafkaAssignerMode, false);
} else {
failGeneratingProposalsForExecution(uuid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ public final class ExecutorConfig {
public static final String DEFAULT_REPLICATION_THROTTLE_DOC = "The replication throttle applied to replicas being "
+ "moved, in bytes per second.";

/**
* <code>default.log.dir.throttle</code>
*/
public static final String DEFAULT_LOG_DIR_THROTTLE_CONFIG = "default.log.dir.throttle";
public static final Long DEFAULT_DEFAULT_LOG_DIR_THROTTLE = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would that be more reasonable to have it Long.MAX_VALUE rather than null? I'm a bit concerned to have a null default value for a LONG type. I'm not entirely sure what will happen here by default

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm. Just realize that default replication throttle is also null. This is fine then.

public static final String DEFAULT_LOG_DIR_THROTTLE_DOC = "The throttle applied to replicas being moved between "
+ "the log dirs, in bytes per second.";

/**
* <code>replica.movement.strategies</code>
*/
Expand Down Expand Up @@ -741,6 +749,11 @@ public static ConfigDef define(ConfigDef configDef) {
DEFAULT_DEFAULT_REPLICATION_THROTTLE,
ConfigDef.Importance.MEDIUM,
DEFAULT_REPLICATION_THROTTLE_DOC)
.define(DEFAULT_LOG_DIR_THROTTLE_CONFIG,
ConfigDef.Type.LONG,
DEFAULT_DEFAULT_LOG_DIR_THROTTLE,
ConfigDef.Importance.MEDIUM,
DEFAULT_LOG_DIR_THROTTLE_DOC)
.define(REPLICA_MOVEMENT_STRATEGIES_CONFIG,
ConfigDef.Type.LIST,
DEFAULT_REPLICA_MOVEMENT_STRATEGIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AtomicDouble;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.TopicMinIsrCache;
Expand Down Expand Up @@ -800,6 +801,8 @@ public boolean setConcurrencyAdjusterMinIsrCheck(boolean isMinIsrBasedConcurrenc
* @param replicaMovementStrategy The strategy used to determine the execution order of generated replica movement tasks.
* @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers
* when executing a proposal (if null, no throttling is applied).
* @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs
* when executing a proposal (if null, no throttling is applied).
* @param isTriggeredByUserRequest Whether the execution is triggered by a user request.
* @param uuid UUID of the execution.
* @param isKafkaAssignerMode {@code true} if kafka assigner mode, {@code false} otherwise.
Expand All @@ -818,6 +821,7 @@ public synchronized void executeProposals(Collection<ExecutionProposal> proposal
Long requestedExecutionProgressCheckIntervalMs,
ReplicaMovementStrategy replicaMovementStrategy,
Long replicationThrottle,
Long logDirThrottle,
boolean isTriggeredByUserRequest,
String uuid,
boolean isKafkaAssignerMode,
Expand All @@ -830,7 +834,7 @@ public synchronized void executeProposals(Collection<ExecutionProposal> proposal
requestedIntraBrokerPartitionMovementConcurrency, requestedClusterLeadershipMovementConcurrency,
requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy,
isTriggeredByUserRequest, loadMonitor);
startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest);
startExecution(loadMonitor, null, removedBrokers, replicationThrottle, logDirThrottle, isTriggeredByUserRequest);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, does that make more sense to have a separate method for intra-broker throttling since this parameter only applies for inter-broker movement?

} catch (Exception e) {
processExecuteProposalsFailure();
throw e;
Expand Down Expand Up @@ -918,7 +922,7 @@ public synchronized void executeDemoteProposals(Collection<ExecutionProposal> pr
initProposalExecution(proposals, demotedBrokers, concurrentSwaps, null, 0,
requestedClusterLeadershipMovementConcurrency, requestedBrokerLeadershipMovementConcurrency,
requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy, isTriggeredByUserRequest, loadMonitor);
startExecution(loadMonitor, demotedBrokers, null, replicationThrottle, isTriggeredByUserRequest);
startExecution(loadMonitor, demotedBrokers, null, replicationThrottle, replicationThrottle, isTriggeredByUserRequest);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the implication here to have log dir throttling set to replication throttle here?

} catch (Exception e) {
processExecuteProposalsFailure();
throw e;
Expand Down Expand Up @@ -999,12 +1003,15 @@ private int numExecutionStartedInNonKafkaAssignerMode() {
* @param removedBrokers Brokers to be removed, null if no broker has been removed.
* @param replicationThrottle The replication throttle (bytes/second) to apply to both leaders and followers
* while moving partitions (if null, no throttling is applied).
* @param logDirThrottle The throttle (bytes/second) to apply to replicas being moved between the log dirs
* while moving partitions (if null, no throttling is applied).
* @param isTriggeredByUserRequest Whether the execution is triggered by a user request.
*/
private void startExecution(LoadMonitor loadMonitor,
Collection<Integer> demotedBrokers,
Collection<Integer> removedBrokers,
Long replicationThrottle,
Long logDirThrottle,
boolean isTriggeredByUserRequest) throws OngoingExecutionException {
_executionStoppedByUser.set(false);
sanityCheckOngoingMovement();
Expand Down Expand Up @@ -1040,7 +1047,7 @@ private void startExecution(LoadMonitor loadMonitor,
_numExecutionStartedInNonKafkaAssignerMode.incrementAndGet();
}
_proposalExecutor.execute(
new ProposalExecutionRunnable(loadMonitor, demotedBrokers, removedBrokers, replicationThrottle, isTriggeredByUserRequest));
new ProposalExecutionRunnable(loadMonitor, demotedBrokers, removedBrokers, replicationThrottle, logDirThrottle, isTriggeredByUserRequest));
}

/**
Expand Down Expand Up @@ -1293,6 +1300,7 @@ private class ProposalExecutionRunnable implements Runnable {
private final Set<Integer> _recentlyDemotedBrokers;
private final Set<Integer> _recentlyRemovedBrokers;
private final Long _replicationThrottle;
private final Long _logDirThrottle;
private Throwable _executionException;
private final boolean _isTriggeredByUserRequest;
private long _lastSlowTaskReportingTimeMs;
Expand All @@ -1307,6 +1315,7 @@ private class ProposalExecutionRunnable implements Runnable {
Collection<Integer> demotedBrokers,
Collection<Integer> removedBrokers,
Long replicationThrottle,
Long logDirThrottle,
boolean isTriggeredByUserRequest) {
_loadMonitor = loadMonitor;
_demotedBrokers = demotedBrokers;
Expand Down Expand Up @@ -1342,6 +1351,7 @@ private class ProposalExecutionRunnable implements Runnable {
_recentlyDemotedBrokers = recentlyDemotedBrokers();
_recentlyRemovedBrokers = recentlyRemovedBrokers();
_replicationThrottle = replicationThrottle;
_logDirThrottle = logDirThrottle;
_isTriggeredByUserRequest = isTriggeredByUserRequest;
_lastSlowTaskReportingTimeMs = -1L;
if (_removedBrokers != null && !_removedBrokers.isEmpty()) {
Expand Down Expand Up @@ -1602,8 +1612,7 @@ private void updateOngoingExecutionState() {

private void interBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException {
Set<Integer> currentDeadBrokersWithReplicas = _loadMonitor.deadBrokersWithReplicas(MAX_METADATA_WAIT_MS);
ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, _replicationThrottle,
currentDeadBrokersWithReplicas);
ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient, currentDeadBrokersWithReplicas);
int numTotalPartitionMovements = _executionTaskManager.numRemainingInterBrokerPartitionMovements();
long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB();
long startTime = System.currentTimeMillis();
Expand All @@ -1618,7 +1627,10 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc

AlterPartitionReassignmentsResult result = null;
if (!tasksToExecute.isEmpty()) {
throttleHelper.setThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()));
if (_replicationThrottle != null) {
throttleHelper.setReplicationThrottles(tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()),
_replicationThrottle);
}
// Execute the tasks.
_executionTaskManager.markTasksInProgress(tasksToExecute);
result = ExecutionUtils.submitReplicaReassignmentTasks(_adminClient, tasksToExecute);
Expand All @@ -1640,7 +1652,9 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
.collect(Collectors.toList());
inProgressTasks.addAll(inExecutionTasks());

throttleHelper.clearThrottles(completedTasks, inProgressTasks);
if (_replicationThrottle != null) {
throttleHelper.clearInterBrokerThrottles(completedTasks, inProgressTasks);
}
}

// Currently, _executionProgressCheckIntervalMs is only runtime adjusted for inter broker move tasks, not
Expand Down Expand Up @@ -1669,20 +1683,28 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
}
}

private void intraBrokerMoveReplicas() {
private void intraBrokerMoveReplicas() throws InterruptedException, ExecutionException, TimeoutException {
ReplicationThrottleHelper throttleHelper = new ReplicationThrottleHelper(_adminClient);
int numTotalPartitionMovements = _executionTaskManager.numRemainingIntraBrokerPartitionMovements();
long totalDataToMoveInMB = _executionTaskManager.remainingIntraBrokerDataToMoveInMB();
long startTime = System.currentTimeMillis();
LOG.info("Starting {} intra-broker partition movements.", numTotalPartitionMovements);

int partitionsToMove = numTotalPartitionMovements;
Set<Integer> participatingBrokers = Sets.newHashSet();
// Exhaust all the pending partition movements.
while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) {
// Get tasks to execute.
List<ExecutionTask> tasksToExecute = _executionTaskManager.getIntraBrokerReplicaMovementTasks();
LOG.info("Executor will execute {} task(s)", tasksToExecute.size());

if (!tasksToExecute.isEmpty()) {
if (_logDirThrottle != null) {
participatingBrokers = throttleHelper.setLogDirThrottles(
tasksToExecute.stream().map(ExecutionTask::proposal).collect(Collectors.toList()),
_logDirThrottle
);
}
// Execute the tasks.
_executionTaskManager.markTasksInProgress(tasksToExecute);
executeIntraBrokerReplicaMovements(tasksToExecute, _adminClient, _executionTaskManager, _config);
Expand All @@ -1707,6 +1729,11 @@ private void intraBrokerMoveReplicas() {
waitForIntraBrokerReplicaTasksToFinish();
inExecutionTasks = inExecutionTasks();
}

if (_logDirThrottle != null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding, why the underscore in the variable names here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In consistency with _replicationThrottle.

throttleHelper.clearIntraBrokerThrottles(participatingBrokers);
}

if (inExecutionTasks().isEmpty()) {
LOG.info("Intra-broker partition movements finished.");
} else if (_stopSignal.get() != NO_STOP_EXECUTION) {
Expand Down
Loading