Skip to content

Commit

Permalink
Rename "task" to "User task" in logging
Browse files Browse the repository at this point in the history
  • Loading branch information
allenxwang committed Oct 7, 2024
1 parent f0c0af8 commit c07695a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,9 @@ public synchronized void executeProposals(Collection<ExecutionProposal> proposal
startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest);
} catch (Exception e) {
if (e instanceof OngoingExecutionException) {
LOG.info("Task {}: Broker removal operation aborted due to ongoing execution", uuid);
LOG.info("User task {}: Broker removal operation aborted due to ongoing execution", uuid);
} else {
LOG.error("User task {}: Broker removal operation failed due to exception", uuid, e);
}
processExecuteProposalsFailure();
throw e;
Expand Down Expand Up @@ -1141,13 +1143,13 @@ public synchronized void failGeneratingProposalsForExecution(String uuid) {
*/
public synchronized void userTriggeredStopExecution(boolean stopExternalAgent) {
if (stopExecution()) {
LOG.info("Task {}: User requested to stop the ongoing proposal execution.", _uuid);
LOG.info("User task {}: User requested to stop the ongoing proposal execution.", _uuid);
_numExecutionStoppedByUser.incrementAndGet();
_executionStoppedByUser.set(true);
}
if (stopExternalAgent) {
if (maybeStopExternalAgent()) {
LOG.info("Task {}: The request to stop ongoing external agent partition reassignment is submitted successfully.",
LOG.info("User task {}: The request to stop ongoing external agent partition reassignment is submitted successfully.",
_uuid);
}
}
Expand Down Expand Up @@ -1322,7 +1324,7 @@ private class ProposalExecutionRunnable implements Runnable {
_noOngoingExecutionSemaphore.release();
_stopSignal.set(NO_STOP_EXECUTION);
_executionStoppedByUser.set(false);
LOG.error("Task {}: Failed to initialize proposal execution.", _uuid);
LOG.error("User task {}: Failed to initialize proposal execution.", _uuid);
throw new IllegalStateException("User task manager cannot be null.");
}
if (_demotedBrokers != null) {
Expand Down Expand Up @@ -1358,13 +1360,13 @@ private class ProposalExecutionRunnable implements Runnable {
}

public void run() {
LOG.info("Task {}: Starting executing balancing proposals.", _uuid);
LOG.info("User task {}: Starting executing balancing proposals.", _uuid);
final long start = System.currentTimeMillis();
try {
UserTaskManager.UserTaskInfo userTaskInfo = initExecution();
execute(userTaskInfo);
} catch (Exception e) {
LOG.error("Task {}: ProposalExecutionRunnable got exception during run", _uuid, e);
LOG.error("User task {}: ProposalExecutionRunnable got exception during run", _uuid, e);
} finally {
final long duration = System.currentTimeMillis() - start;
_proposalExecutionTimer.update(duration, TimeUnit.MILLISECONDS);
Expand All @@ -1377,9 +1379,9 @@ public void run() {
duration
);
if (_executionException != null) {
LOG.info("Task {}: Execution failed: {}. Exception: {}", _uuid, executionStatusString, _executionException.getMessage());
LOG.info("User task {}: Execution failed: {}. Exception: {}", _uuid, executionStatusString, _executionException.getMessage());
} else {
LOG.info("Task {}: Execution succeeded: {}. ", _uuid, executionStatusString);
LOG.info("User task {}: Execution succeeded: {}. ", _uuid, executionStatusString);
}
// Clear completed execution.
clearCompletedExecution();
Expand Down Expand Up @@ -1490,7 +1492,7 @@ private void execute(UserTaskManager.UserTaskInfo userTaskInfo) {
updateOngoingExecutionState();
}
} catch (Throwable t) {
LOG.error("Task {}: Executor got exception during execution", _uuid, t);
LOG.error("User task {}: Executor got exception during execution", _uuid, t);
_executionException = t;
} finally {
notifyFinishedTask(userTaskInfo);
Expand Down Expand Up @@ -1608,14 +1610,14 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
int numTotalPartitionMovements = _executionTaskManager.numRemainingInterBrokerPartitionMovements();
long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB();
long startTime = System.currentTimeMillis();
LOG.info("Task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements);
LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements);

int partitionsToMove = numTotalPartitionMovements;
// Exhaust all the pending partition movements.
while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) {
// Get tasks to execute.
List<ExecutionTask> tasksToExecute = _executionTaskManager.getInterBrokerReplicaMovementTasks();
LOG.info("Task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size());
LOG.info("User task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size());

AlterPartitionReassignmentsResult result = null;
if (!tasksToExecute.isEmpty()) {
Expand All @@ -1630,7 +1632,7 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc
int numFinishedPartitionMovements = _executionTaskManager.numFinishedInterBrokerPartitionMovements();
long finishedDataMovementInMB = _executionTaskManager.finishedInterBrokerDataMovementInMB();
updatePartitionMovementMetrics(numFinishedPartitionMovements, finishedDataMovementInMB, System.currentTimeMillis() - startTime);
LOG.info("Task {}: {}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.",
LOG.info("User task {}: {}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.",
_uuid,
numFinishedPartitionMovements, numTotalPartitionMovements,
String.format("%.2f", numFinishedPartitionMovements * UNIT_INTERVAL_TO_PERCENTAGE / numTotalPartitionMovements),
Expand All @@ -1652,11 +1654,12 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc

// At this point it is guaranteed that there are no in execution tasks to wait -- i.e. all tasks are completed or dead.
if (_stopSignal.get() == NO_STOP_EXECUTION) {
LOG.info("Task {}: Inter-broker partition movements finished", _uuid);
LOG.info("User task {}: Inter-broker partition movements finished", _uuid);
} else {
ExecutionTasksSummary executionTasksSummary = _executionTaskManager.getExecutionTasksSummary(Collections.emptySet());
Map<ExecutionTaskState, Integer> partitionMovementTasksByState = executionTasksSummary.taskStat().get(INTER_BROKER_REPLICA_ACTION);
LOG.info("Task {}: Inter-broker partition movements stopped. For inter-broker partition movements {} tasks cancelled, {} tasks in-progress, "
LOG.info("User task {}: Inter-broker partition movements stopped. For inter-broker partition movements {} tasks cancelled, "
+ "{} tasks in-progress, "
+ "{} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for intra-broker "
+ "partition movement {} tasks cancelled; for leadership movements {} tasks cancelled.",
_uuid,
Expand All @@ -1676,14 +1679,14 @@ private void intraBrokerMoveReplicas() {
int numTotalPartitionMovements = _executionTaskManager.numRemainingIntraBrokerPartitionMovements();
long totalDataToMoveInMB = _executionTaskManager.remainingIntraBrokerDataToMoveInMB();
long startTime = System.currentTimeMillis();
LOG.info("Task {}: Starting {} intra-broker partition movements.", _uuid, numTotalPartitionMovements);
LOG.info("User task {}: Starting {} intra-broker partition movements.", _uuid, numTotalPartitionMovements);

int partitionsToMove = numTotalPartitionMovements;
// Exhaust all the pending partition movements.
while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) {
// Get tasks to execute.
List<ExecutionTask> tasksToExecute = _executionTaskManager.getIntraBrokerReplicaMovementTasks();
LOG.info("Task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size());
LOG.info("User task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size());

if (!tasksToExecute.isEmpty()) {
// Execute the tasks.
Expand All @@ -1696,7 +1699,7 @@ private void intraBrokerMoveReplicas() {
int numFinishedPartitionMovements = _executionTaskManager.numFinishedIntraBrokerPartitionMovements();
long finishedDataToMoveInMB = _executionTaskManager.finishedIntraBrokerDataToMoveInMB();
updatePartitionMovementMetrics(numFinishedPartitionMovements, finishedDataToMoveInMB, System.currentTimeMillis() - startTime);
LOG.info("Task {}: {}/{} ({}%) intra-broker partition movements completed. {}/{} ({}%) MB have been moved.",
LOG.info("User task {}: {}/{} ({}%) intra-broker partition movements completed. {}/{} ({}%) MB have been moved.",
_uuid,
numFinishedPartitionMovements, numTotalPartitionMovements,
String.format("%.2f", numFinishedPartitionMovements * UNIT_INTERVAL_TO_PERCENTAGE / numTotalPartitionMovements),
Expand All @@ -1712,11 +1715,12 @@ private void intraBrokerMoveReplicas() {
inExecutionTasks = inExecutionTasks();
}
if (inExecutionTasks().isEmpty()) {
LOG.info("Task {}: Intra-broker partition movements finished.", _uuid);
LOG.info("User task {}: Intra-broker partition movements finished.", _uuid);
} else if (_stopSignal.get() != NO_STOP_EXECUTION) {
ExecutionTasksSummary executionTasksSummary = _executionTaskManager.getExecutionTasksSummary(Collections.emptySet());
Map<ExecutionTaskState, Integer> partitionMovementTasksByState = executionTasksSummary.taskStat().get(INTRA_BROKER_REPLICA_ACTION);
LOG.info("Task {}: Intra-broker partition movements stopped. For intra-broker partition movements {} tasks cancelled, {} tasks in-progress, "
LOG.info("User task {}: Intra-broker partition movements stopped. For intra-broker partition movements {} tasks cancelled, "
+ "{} tasks in-progress, "
+ "{} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for leadership "
+ "movements {} tasks cancelled.",
_uuid,
Expand All @@ -1736,20 +1740,20 @@ private void intraBrokerMoveReplicas() {
*/
private void moveLeaderships() {
int numTotalLeadershipMovements = _executionTaskManager.numRemainingLeadershipMovements();
LOG.info("Task {}: Starting {} leadership movements.", _uuid, numTotalLeadershipMovements);
LOG.info("User task {}: Starting {} leadership movements.", _uuid, numTotalLeadershipMovements);
int numFinishedLeadershipMovements = 0;
while (_executionTaskManager.numRemainingLeadershipMovements() != 0 && _stopSignal.get() == NO_STOP_EXECUTION) {
updateOngoingExecutionState();
numFinishedLeadershipMovements += moveLeadershipInBatch();
LOG.info("Task {}: {}/{} ({}%) leadership movements completed.", _uuid, numFinishedLeadershipMovements,
LOG.info("User task {}: {}/{} ({}%) leadership movements completed.", _uuid, numFinishedLeadershipMovements,
numTotalLeadershipMovements, numFinishedLeadershipMovements * 100 / numTotalLeadershipMovements);
}
if (inExecutionTasks().isEmpty()) {
LOG.info("Leadership movements finished.");
} else if (_stopSignal.get() != NO_STOP_EXECUTION) {
Map<ExecutionTaskState, Integer> leadershipMovementTasksByState =
_executionTaskManager.getExecutionTasksSummary(Collections.emptySet()).taskStat().get(LEADER_ACTION);
LOG.info("Task {}: Leadership movements stopped. {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, "
LOG.info("User task {}: Leadership movements stopped. {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, "
+ "{} tasks dead, {} tasks completed.",
_uuid,
leadershipMovementTasksByState.get(ExecutionTaskState.PENDING),
Expand Down Expand Up @@ -1885,7 +1889,7 @@ private List<ExecutionTask> waitForInterBrokerReplicaTasksToFinish(AlterPartitio
}
} while (retry);

LOG.info("Task {}: Finished tasks: {}.{}{}{}", _uuid, finishedTasks,
LOG.info("User task {}: Finished tasks: {}.{}{}{}", _uuid, finishedTasks,
stoppedTaskIds.isEmpty() ? "" : String.format(". [Stopped: %s]", stoppedTaskIds),
deletedTaskIds.isEmpty() ? "" : String.format(". [Deleted: %s]", deletedTaskIds),
deadTaskIds.isEmpty() ? "" : String.format(". [Dead: %s]", deadTaskIds));
Expand Down Expand Up @@ -1960,7 +1964,7 @@ private void waitForLeadershipTasksToFinish(ElectLeadersResult result) {
}
} while (retry);

LOG.info("Task {}: Finished tasks: {}.{}{}{}", _uuid, finishedTasks,
LOG.info("User task {}: Finished tasks: {}.{}{}{}", _uuid, finishedTasks,
stoppedTaskIds.isEmpty() ? "" : String.format(". [Stopped: %s]", stoppedTaskIds),
deletedTaskIds.isEmpty() ? "" : String.format(". [Deleted: %s]", deletedTaskIds),
deadTaskIds.isEmpty() ? "" : String.format(". [Dead: %s]", deadTaskIds));
Expand Down Expand Up @@ -2003,7 +2007,7 @@ private void waitForIntraBrokerReplicaTasksToFinish() {
updateOngoingExecutionState();
} while (!inExecutionTasks().isEmpty() && finishedTasks.isEmpty());

LOG.info("Task {}: Finished tasks: {}.{}{}", _uuid, finishedTasks,
LOG.info("User task {}: Finished tasks: {}.{}{}", _uuid, finishedTasks,
deletedTaskIds.isEmpty() ? "" : String.format(". [Deleted: %s]", deletedTaskIds),
deadTaskIds.isEmpty() ? "" : String.format(". [Dead: %s]", deadTaskIds));
}
Expand Down Expand Up @@ -2047,7 +2051,7 @@ private void handleDeadInterBrokerReplicaTasks(List<ExecutionTask> deadInterBrok

if (_stopSignal.get() == NO_STOP_EXECUTION) {
// If there are dead tasks, Cruise Control stops the execution.
LOG.info("Task {}: Stop the execution due to {} dead tasks: {}.", _uuid, tasksToCancel.size(), tasksToCancel);
LOG.info("User task {}: Stop the execution due to {} dead tasks: {}.", _uuid, tasksToCancel.size(), tasksToCancel);
stopExecution();
}

Expand All @@ -2066,7 +2070,7 @@ private void handleDeadInterBrokerReplicaTasks(List<ExecutionTask> deadInterBrok
break;
}
try {
LOG.info("Task {}: Waiting for the rollback of ongoing inter-broker replica reassignments for {}.",
LOG.info("User task {}: Waiting for the rollback of ongoing inter-broker replica reassignments for {}.",
_uuid, intersection);
Thread.sleep(executionProgressCheckIntervalMs());
} catch (InterruptedException e) {
Expand Down Expand Up @@ -2116,11 +2120,12 @@ private boolean maybeMarkTaskAsDead(Cluster cluster,
case LEADER_ACTION:
if (cluster.nodeById(task.proposal().newLeader().brokerId()) == null) {
_executionTaskManager.markTaskDead(task);
LOG.warn("Task {}: Killing execution for task {} because the target leader is down.", _uuid, task);
LOG.warn("User task {}: Killing execution for task {} because the target leader is down.", _uuid, task);
return true;
} else if (_time.milliseconds() > task.startTimeMs() + _leaderMovementTimeoutMs) {
_executionTaskManager.markTaskDead(task);
LOG.warn("Task {}: Killing execution for task {} because it took longer than {} to finish.", _uuid, task, _leaderMovementTimeoutMs);
LOG.warn("User task {}: Killing execution for task {} because it took longer than {} to finish.",
_uuid, task, _leaderMovementTimeoutMs);
return true;
}
break;
Expand All @@ -2130,7 +2135,7 @@ private boolean maybeMarkTaskAsDead(Cluster cluster,
if (cluster.nodeById(broker.brokerId()) == null
|| deadInterBrokerReassignments.contains(task.proposal().topicPartition())) {
_executionTaskManager.markTaskDead(task);
LOG.warn("Task {}: Killing execution for task {} because the new replica {} is down.", _uuid, task, broker);
LOG.warn("User task {}: Killing execution for task {} because the new replica {} is down.", _uuid, task, broker);
return true;
}
}
Expand All @@ -2139,7 +2144,7 @@ private boolean maybeMarkTaskAsDead(Cluster cluster,
case INTRA_BROKER_REPLICA_ACTION:
if (!logdirInfoByTask.containsKey(task)) {
_executionTaskManager.markTaskDead(task);
LOG.warn("Task {}: Killing execution for task {} because the destination disk is down.", _uuid, task);
LOG.warn("User task {}: Killing execution for task {} because the destination disk is down.", _uuid, task);
return true;
}
break;
Expand Down Expand Up @@ -2169,7 +2174,7 @@ private void maybeReexecuteInterBrokerReplicaTasks(Set<TopicPartition> deleted,
candidateInterBrokerReplicaTasksToReexecute);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
// This may indicate transient (e.g. network) issues.
LOG.warn("Task {}: Failed to retrieve partitions being reassigned. Skipping reexecution check for inter-broker replica actions.",
LOG.warn("User task {}: Failed to retrieve partitions being reassigned. Skipping reexecution check for inter-broker replica actions.",
_uuid, e);
tasksToReexecute = Collections.emptyList();
}
Expand Down Expand Up @@ -2200,7 +2205,7 @@ private void maybeReexecuteIntraBrokerReplicaTasks() {
}
});
if (!intraBrokerReplicaTasksToReexecute.isEmpty()) {
LOG.info("Task {}: Reexecuting tasks {}", _uuid, intraBrokerReplicaTasksToReexecute);
LOG.info("User task {}: Reexecuting tasks {}", _uuid, intraBrokerReplicaTasksToReexecute);
executeIntraBrokerReplicaMovements(intraBrokerReplicaTasksToReexecute, _adminClient, _executionTaskManager, _config);
}
}
Expand All @@ -2215,7 +2220,7 @@ private void maybeReexecuteIntraBrokerReplicaTasks() {
private void maybeReexecuteLeadershipTasks(Set<TopicPartition> deleted) {
List<ExecutionTask> leaderActionsToReexecute = new ArrayList<>(_executionTaskManager.inExecutionTasks(Collections.singleton(LEADER_ACTION)));
if (!leaderActionsToReexecute.isEmpty()) {
LOG.info("Task {}: Reexecuting tasks {}", _uuid, leaderActionsToReexecute);
LOG.info("User task {}: Reexecuting tasks {}", _uuid, leaderActionsToReexecute);
ElectLeadersResult electLeadersResult = ExecutionUtils.submitPreferredLeaderElection(_adminClient, leaderActionsToReexecute);
ExecutionUtils.processElectLeadersResult(electLeadersResult, deleted);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ protected abstract OptimizerResult workWithClusterModel()
protected void finish() {
if (_operationProgress != null) {
long totalTimeMs = _operationProgress.getCurrentTotalExecutionTimeMs();
LOG.info("Operation {} finished with uuid {}; total time: {}ms; steps: {}",
_operationProgress.getOperation(), _uuid, totalTimeMs, _operationProgress);
LOG.info("User task {}: {} finished with total time: {}ms; steps: {}",
_uuid, _operationProgress.getOperation(), totalTimeMs, _operationProgress);
}
_operationProgress = null;
_combinedCompletenessRequirements = null;
Expand Down
Loading

0 comments on commit c07695a

Please sign in to comment.