From c07695ab33ad2e9a3b7eafdc40c616aea20f1ba0 Mon Sep 17 00:00:00 2001 From: Allen Wang Date: Mon, 7 Oct 2024 15:49:27 -0700 Subject: [PATCH] Rename "task" to "User task" in logging --- .../cruisecontrol/executor/Executor.java | 73 ++++++++++--------- .../runnable/GoalBasedOperationRunnable.java | 4 +- .../async/runnable/RemoveBrokersRunnable.java | 12 +-- 3 files changed, 48 insertions(+), 41 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index 828033efb..9be07542b 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -833,7 +833,9 @@ public synchronized void executeProposals(Collection proposal startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest); } catch (Exception e) { if (e instanceof OngoingExecutionException) { - LOG.info("Task {}: Broker removal operation aborted due to ongoing execution", uuid); + LOG.info("User task {}: Broker removal operation aborted due to ongoing execution", uuid); + } else { + LOG.error("User task {}: Broker removal operation failed due to exception", uuid, e); } processExecuteProposalsFailure(); throw e; @@ -1141,13 +1143,13 @@ public synchronized void failGeneratingProposalsForExecution(String uuid) { */ public synchronized void userTriggeredStopExecution(boolean stopExternalAgent) { if (stopExecution()) { - LOG.info("Task {}: User requested to stop the ongoing proposal execution.", _uuid); + LOG.info("User task {}: User requested to stop the ongoing proposal execution.", _uuid); _numExecutionStoppedByUser.incrementAndGet(); _executionStoppedByUser.set(true); } if (stopExternalAgent) { if (maybeStopExternalAgent()) { - LOG.info("Task {}: The request to stop ongoing external agent partition reassignment is submitted successfully.", + LOG.info("User task {}: The request to stop ongoing external agent partition reassignment is submitted successfully.", _uuid); } } @@ -1322,7 +1324,7 @@ private class ProposalExecutionRunnable implements Runnable { _noOngoingExecutionSemaphore.release(); _stopSignal.set(NO_STOP_EXECUTION); _executionStoppedByUser.set(false); - LOG.error("Task {}: Failed to initialize proposal execution.", _uuid); + LOG.error("User task {}: Failed to initialize proposal execution.", _uuid); throw new IllegalStateException("User task manager cannot be null."); } if (_demotedBrokers != null) { @@ -1358,13 +1360,13 @@ private class ProposalExecutionRunnable implements Runnable { } public void run() { - LOG.info("Task {}: Starting executing balancing proposals.", _uuid); + LOG.info("User task {}: Starting executing balancing proposals.", _uuid); final long start = System.currentTimeMillis(); try { UserTaskManager.UserTaskInfo userTaskInfo = initExecution(); execute(userTaskInfo); } catch (Exception e) { - LOG.error("Task {}: ProposalExecutionRunnable got exception during run", _uuid, e); + LOG.error("User task {}: ProposalExecutionRunnable got exception during run", _uuid, e); } finally { final long duration = System.currentTimeMillis() - start; _proposalExecutionTimer.update(duration, TimeUnit.MILLISECONDS); @@ -1377,9 +1379,9 @@ public void run() { duration ); if (_executionException != null) { - LOG.info("Task {}: Execution failed: {}. Exception: {}", _uuid, executionStatusString, _executionException.getMessage()); + LOG.info("User task {}: Execution failed: {}. Exception: {}", _uuid, executionStatusString, _executionException.getMessage()); } else { - LOG.info("Task {}: Execution succeeded: {}. ", _uuid, executionStatusString); + LOG.info("User task {}: Execution succeeded: {}. ", _uuid, executionStatusString); } // Clear completed execution. clearCompletedExecution(); @@ -1490,7 +1492,7 @@ private void execute(UserTaskManager.UserTaskInfo userTaskInfo) { updateOngoingExecutionState(); } } catch (Throwable t) { - LOG.error("Task {}: Executor got exception during execution", _uuid, t); + LOG.error("User task {}: Executor got exception during execution", _uuid, t); _executionException = t; } finally { notifyFinishedTask(userTaskInfo); @@ -1608,14 +1610,14 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc int numTotalPartitionMovements = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); - LOG.info("Task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements); + LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements); int partitionsToMove = numTotalPartitionMovements; // Exhaust all the pending partition movements. while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) { // Get tasks to execute. List tasksToExecute = _executionTaskManager.getInterBrokerReplicaMovementTasks(); - LOG.info("Task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size()); + LOG.info("User task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size()); AlterPartitionReassignmentsResult result = null; if (!tasksToExecute.isEmpty()) { @@ -1630,7 +1632,7 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc int numFinishedPartitionMovements = _executionTaskManager.numFinishedInterBrokerPartitionMovements(); long finishedDataMovementInMB = _executionTaskManager.finishedInterBrokerDataMovementInMB(); updatePartitionMovementMetrics(numFinishedPartitionMovements, finishedDataMovementInMB, System.currentTimeMillis() - startTime); - LOG.info("Task {}: {}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.", + LOG.info("User task {}: {}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.", _uuid, numFinishedPartitionMovements, numTotalPartitionMovements, String.format("%.2f", numFinishedPartitionMovements * UNIT_INTERVAL_TO_PERCENTAGE / numTotalPartitionMovements), @@ -1652,11 +1654,12 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc // At this point it is guaranteed that there are no in execution tasks to wait -- i.e. all tasks are completed or dead. if (_stopSignal.get() == NO_STOP_EXECUTION) { - LOG.info("Task {}: Inter-broker partition movements finished", _uuid); + LOG.info("User task {}: Inter-broker partition movements finished", _uuid); } else { ExecutionTasksSummary executionTasksSummary = _executionTaskManager.getExecutionTasksSummary(Collections.emptySet()); Map partitionMovementTasksByState = executionTasksSummary.taskStat().get(INTER_BROKER_REPLICA_ACTION); - LOG.info("Task {}: Inter-broker partition movements stopped. For inter-broker partition movements {} tasks cancelled, {} tasks in-progress, " + LOG.info("User task {}: Inter-broker partition movements stopped. For inter-broker partition movements {} tasks cancelled, " + + "{} tasks in-progress, " + "{} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for intra-broker " + "partition movement {} tasks cancelled; for leadership movements {} tasks cancelled.", _uuid, @@ -1676,14 +1679,14 @@ private void intraBrokerMoveReplicas() { int numTotalPartitionMovements = _executionTaskManager.numRemainingIntraBrokerPartitionMovements(); long totalDataToMoveInMB = _executionTaskManager.remainingIntraBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); - LOG.info("Task {}: Starting {} intra-broker partition movements.", _uuid, numTotalPartitionMovements); + LOG.info("User task {}: Starting {} intra-broker partition movements.", _uuid, numTotalPartitionMovements); int partitionsToMove = numTotalPartitionMovements; // Exhaust all the pending partition movements. while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) { // Get tasks to execute. List tasksToExecute = _executionTaskManager.getIntraBrokerReplicaMovementTasks(); - LOG.info("Task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size()); + LOG.info("User task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size()); if (!tasksToExecute.isEmpty()) { // Execute the tasks. @@ -1696,7 +1699,7 @@ private void intraBrokerMoveReplicas() { int numFinishedPartitionMovements = _executionTaskManager.numFinishedIntraBrokerPartitionMovements(); long finishedDataToMoveInMB = _executionTaskManager.finishedIntraBrokerDataToMoveInMB(); updatePartitionMovementMetrics(numFinishedPartitionMovements, finishedDataToMoveInMB, System.currentTimeMillis() - startTime); - LOG.info("Task {}: {}/{} ({}%) intra-broker partition movements completed. {}/{} ({}%) MB have been moved.", + LOG.info("User task {}: {}/{} ({}%) intra-broker partition movements completed. {}/{} ({}%) MB have been moved.", _uuid, numFinishedPartitionMovements, numTotalPartitionMovements, String.format("%.2f", numFinishedPartitionMovements * UNIT_INTERVAL_TO_PERCENTAGE / numTotalPartitionMovements), @@ -1712,11 +1715,12 @@ private void intraBrokerMoveReplicas() { inExecutionTasks = inExecutionTasks(); } if (inExecutionTasks().isEmpty()) { - LOG.info("Task {}: Intra-broker partition movements finished.", _uuid); + LOG.info("User task {}: Intra-broker partition movements finished.", _uuid); } else if (_stopSignal.get() != NO_STOP_EXECUTION) { ExecutionTasksSummary executionTasksSummary = _executionTaskManager.getExecutionTasksSummary(Collections.emptySet()); Map partitionMovementTasksByState = executionTasksSummary.taskStat().get(INTRA_BROKER_REPLICA_ACTION); - LOG.info("Task {}: Intra-broker partition movements stopped. For intra-broker partition movements {} tasks cancelled, {} tasks in-progress, " + LOG.info("User task {}: Intra-broker partition movements stopped. For intra-broker partition movements {} tasks cancelled, " + + "{} tasks in-progress, " + "{} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for leadership " + "movements {} tasks cancelled.", _uuid, @@ -1736,12 +1740,12 @@ private void intraBrokerMoveReplicas() { */ private void moveLeaderships() { int numTotalLeadershipMovements = _executionTaskManager.numRemainingLeadershipMovements(); - LOG.info("Task {}: Starting {} leadership movements.", _uuid, numTotalLeadershipMovements); + LOG.info("User task {}: Starting {} leadership movements.", _uuid, numTotalLeadershipMovements); int numFinishedLeadershipMovements = 0; while (_executionTaskManager.numRemainingLeadershipMovements() != 0 && _stopSignal.get() == NO_STOP_EXECUTION) { updateOngoingExecutionState(); numFinishedLeadershipMovements += moveLeadershipInBatch(); - LOG.info("Task {}: {}/{} ({}%) leadership movements completed.", _uuid, numFinishedLeadershipMovements, + LOG.info("User task {}: {}/{} ({}%) leadership movements completed.", _uuid, numFinishedLeadershipMovements, numTotalLeadershipMovements, numFinishedLeadershipMovements * 100 / numTotalLeadershipMovements); } if (inExecutionTasks().isEmpty()) { @@ -1749,7 +1753,7 @@ private void moveLeaderships() { } else if (_stopSignal.get() != NO_STOP_EXECUTION) { Map leadershipMovementTasksByState = _executionTaskManager.getExecutionTasksSummary(Collections.emptySet()).taskStat().get(LEADER_ACTION); - LOG.info("Task {}: Leadership movements stopped. {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, " + LOG.info("User task {}: Leadership movements stopped. {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, " + "{} tasks dead, {} tasks completed.", _uuid, leadershipMovementTasksByState.get(ExecutionTaskState.PENDING), @@ -1885,7 +1889,7 @@ private List waitForInterBrokerReplicaTasksToFinish(AlterPartitio } } while (retry); - LOG.info("Task {}: Finished tasks: {}.{}{}{}", _uuid, finishedTasks, + LOG.info("User task {}: Finished tasks: {}.{}{}{}", _uuid, finishedTasks, stoppedTaskIds.isEmpty() ? "" : String.format(". [Stopped: %s]", stoppedTaskIds), deletedTaskIds.isEmpty() ? "" : String.format(". [Deleted: %s]", deletedTaskIds), deadTaskIds.isEmpty() ? "" : String.format(". [Dead: %s]", deadTaskIds)); @@ -1960,7 +1964,7 @@ private void waitForLeadershipTasksToFinish(ElectLeadersResult result) { } } while (retry); - LOG.info("Task {}: Finished tasks: {}.{}{}{}", _uuid, finishedTasks, + LOG.info("User task {}: Finished tasks: {}.{}{}{}", _uuid, finishedTasks, stoppedTaskIds.isEmpty() ? "" : String.format(". [Stopped: %s]", stoppedTaskIds), deletedTaskIds.isEmpty() ? "" : String.format(". [Deleted: %s]", deletedTaskIds), deadTaskIds.isEmpty() ? "" : String.format(". [Dead: %s]", deadTaskIds)); @@ -2003,7 +2007,7 @@ private void waitForIntraBrokerReplicaTasksToFinish() { updateOngoingExecutionState(); } while (!inExecutionTasks().isEmpty() && finishedTasks.isEmpty()); - LOG.info("Task {}: Finished tasks: {}.{}{}", _uuid, finishedTasks, + LOG.info("User task {}: Finished tasks: {}.{}{}", _uuid, finishedTasks, deletedTaskIds.isEmpty() ? "" : String.format(". [Deleted: %s]", deletedTaskIds), deadTaskIds.isEmpty() ? "" : String.format(". [Dead: %s]", deadTaskIds)); } @@ -2047,7 +2051,7 @@ private void handleDeadInterBrokerReplicaTasks(List deadInterBrok if (_stopSignal.get() == NO_STOP_EXECUTION) { // If there are dead tasks, Cruise Control stops the execution. - LOG.info("Task {}: Stop the execution due to {} dead tasks: {}.", _uuid, tasksToCancel.size(), tasksToCancel); + LOG.info("User task {}: Stop the execution due to {} dead tasks: {}.", _uuid, tasksToCancel.size(), tasksToCancel); stopExecution(); } @@ -2066,7 +2070,7 @@ private void handleDeadInterBrokerReplicaTasks(List deadInterBrok break; } try { - LOG.info("Task {}: Waiting for the rollback of ongoing inter-broker replica reassignments for {}.", + LOG.info("User task {}: Waiting for the rollback of ongoing inter-broker replica reassignments for {}.", _uuid, intersection); Thread.sleep(executionProgressCheckIntervalMs()); } catch (InterruptedException e) { @@ -2116,11 +2120,12 @@ private boolean maybeMarkTaskAsDead(Cluster cluster, case LEADER_ACTION: if (cluster.nodeById(task.proposal().newLeader().brokerId()) == null) { _executionTaskManager.markTaskDead(task); - LOG.warn("Task {}: Killing execution for task {} because the target leader is down.", _uuid, task); + LOG.warn("User task {}: Killing execution for task {} because the target leader is down.", _uuid, task); return true; } else if (_time.milliseconds() > task.startTimeMs() + _leaderMovementTimeoutMs) { _executionTaskManager.markTaskDead(task); - LOG.warn("Task {}: Killing execution for task {} because it took longer than {} to finish.", _uuid, task, _leaderMovementTimeoutMs); + LOG.warn("User task {}: Killing execution for task {} because it took longer than {} to finish.", + _uuid, task, _leaderMovementTimeoutMs); return true; } break; @@ -2130,7 +2135,7 @@ private boolean maybeMarkTaskAsDead(Cluster cluster, if (cluster.nodeById(broker.brokerId()) == null || deadInterBrokerReassignments.contains(task.proposal().topicPartition())) { _executionTaskManager.markTaskDead(task); - LOG.warn("Task {}: Killing execution for task {} because the new replica {} is down.", _uuid, task, broker); + LOG.warn("User task {}: Killing execution for task {} because the new replica {} is down.", _uuid, task, broker); return true; } } @@ -2139,7 +2144,7 @@ private boolean maybeMarkTaskAsDead(Cluster cluster, case INTRA_BROKER_REPLICA_ACTION: if (!logdirInfoByTask.containsKey(task)) { _executionTaskManager.markTaskDead(task); - LOG.warn("Task {}: Killing execution for task {} because the destination disk is down.", _uuid, task); + LOG.warn("User task {}: Killing execution for task {} because the destination disk is down.", _uuid, task); return true; } break; @@ -2169,7 +2174,7 @@ private void maybeReexecuteInterBrokerReplicaTasks(Set deleted, candidateInterBrokerReplicaTasksToReexecute); } catch (TimeoutException | InterruptedException | ExecutionException e) { // This may indicate transient (e.g. network) issues. - LOG.warn("Task {}: Failed to retrieve partitions being reassigned. Skipping reexecution check for inter-broker replica actions.", + LOG.warn("User task {}: Failed to retrieve partitions being reassigned. Skipping reexecution check for inter-broker replica actions.", _uuid, e); tasksToReexecute = Collections.emptyList(); } @@ -2200,7 +2205,7 @@ private void maybeReexecuteIntraBrokerReplicaTasks() { } }); if (!intraBrokerReplicaTasksToReexecute.isEmpty()) { - LOG.info("Task {}: Reexecuting tasks {}", _uuid, intraBrokerReplicaTasksToReexecute); + LOG.info("User task {}: Reexecuting tasks {}", _uuid, intraBrokerReplicaTasksToReexecute); executeIntraBrokerReplicaMovements(intraBrokerReplicaTasksToReexecute, _adminClient, _executionTaskManager, _config); } } @@ -2215,7 +2220,7 @@ private void maybeReexecuteIntraBrokerReplicaTasks() { private void maybeReexecuteLeadershipTasks(Set deleted) { List leaderActionsToReexecute = new ArrayList<>(_executionTaskManager.inExecutionTasks(Collections.singleton(LEADER_ACTION))); if (!leaderActionsToReexecute.isEmpty()) { - LOG.info("Task {}: Reexecuting tasks {}", _uuid, leaderActionsToReexecute); + LOG.info("User task {}: Reexecuting tasks {}", _uuid, leaderActionsToReexecute); ElectLeadersResult electLeadersResult = ExecutionUtils.submitPreferredLeaderElection(_adminClient, leaderActionsToReexecute); ExecutionUtils.processElectLeadersResult(electLeadersResult, deleted); } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/GoalBasedOperationRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/GoalBasedOperationRunnable.java index 1a75063e0..66fd1a727 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/GoalBasedOperationRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/GoalBasedOperationRunnable.java @@ -212,8 +212,8 @@ protected abstract OptimizerResult workWithClusterModel() protected void finish() { if (_operationProgress != null) { long totalTimeMs = _operationProgress.getCurrentTotalExecutionTimeMs(); - LOG.info("Operation {} finished with uuid {}; total time: {}ms; steps: {}", - _operationProgress.getOperation(), _uuid, totalTimeMs, _operationProgress); + LOG.info("User task {}: {} finished with total time: {}ms; steps: {}", + _uuid, _operationProgress.getOperation(), totalTimeMs, _operationProgress); } _operationProgress = null; _combinedCompletenessRequirements = null; diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java index 4941705e9..16e05b96e 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java @@ -101,6 +101,7 @@ protected OptimizationResult getResult() throws Exception { @Override protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlException, TimeoutException, NotEnoughValidWindowsException { + long start = System.currentTimeMillis(); ClusterModel clusterModel = _kafkaCruiseControl.clusterModel(_combinedCompletenessRequirements, _allowCapacityEstimation, _operationProgress); sanityCheckBrokersHavingOfflineReplicasOnBadDisks(_goals, clusterModel); _removedBrokerIds.forEach(id -> clusterModel.setBrokerState(id, Broker.State.DEAD)); @@ -110,7 +111,6 @@ protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlExcept if (!_destinationBrokerIds.isEmpty()) { _kafkaCruiseControl.sanityCheckBrokerPresence(_destinationBrokerIds); } - OptimizationOptions optimizationOptions = computeOptimizationOptions(clusterModel, false, _kafkaCruiseControl, @@ -122,13 +122,15 @@ protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlExcept _destinationBrokerIds, false, _fastMode); - LOG.info("Task {}: Optimization options: {}", _uuid, optimizationOptions); + LOG.info("User task {}: Optimization options: {}", _uuid, optimizationOptions); OptimizerResult result = _kafkaCruiseControl.optimizations(clusterModel, _goalsByPriority, _operationProgress, null, optimizationOptions); - LOG.info("Task {}: Optimization result: {}", _uuid, result.getProposalSummary()); + long goalProposalGenerationTimeMs = System.currentTimeMillis() - start; + LOG.info("User task {}: Time in proposals generation: {}ms; Optimization result: {}", _uuid, + goalProposalGenerationTimeMs, result.getProposalSummary()); Set goalProposals = result.goalProposals(); - OPERATION_LOG.info("Task {}: Goal proposals: {}", _uuid, goalProposals); + OPERATION_LOG.info("User task {}: Goal proposals: {}", _uuid, goalProposals); if (!_dryRun) { - LOG.info("Task {}: Execute broker removal. throttleRemovedBrokers={}, " + LOG.info("User task {}: Execute broker removal. throttleRemovedBrokers={}, " + "removedBrokerIds={}, " + "concurrentInterBrokerPartitionMovements={}, " + "maxInterBrokerPartitionMovements={}, "