From 920f41072725da295254e1c449afd2c8467c3686 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Wed, 5 Oct 2022 13:18:51 -0700 Subject: [PATCH] extra logging for debugging --- .../conductor/client/automator/TaskRunner.java | 9 +++++++++ .../client/automator/TaskRunnerConfigurer.java | 5 +++++ .../worker/LocalServerWorkflowExecutionTests.java | 15 ++++++++------- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/orkes/conductor/client/automator/TaskRunner.java b/src/main/java/io/orkes/conductor/client/automator/TaskRunner.java index 4eb0816f..fecdd868 100644 --- a/src/main/java/io/orkes/conductor/client/automator/TaskRunner.java +++ b/src/main/java/io/orkes/conductor/client/automator/TaskRunner.java @@ -143,6 +143,8 @@ private List pollTasksForWorker(Worker worker) { ALL_WORKERS, DOMAIN, null)) .orElse(taskToDomain.get(taskType))); LOGGER.trace("Polling task of type: {} in domain: '{}'", taskType, domain); + Stopwatch stopwatch = Stopwatch.createStarted(); + long now = System.currentTimeMillis(); List polledTasks = MetricsContainer.getPollTimer(taskType) .record( @@ -152,6 +154,9 @@ private List pollTasksForWorker(Worker worker) { worker.getIdentity(), domain, this.getAvailableWorkers())); + stopwatch.stop(); + LOGGER.info("Time taken to poll {} tassk with a batch size of {} is {} ms", taskType, getAvailableWorkers(), + stopwatch.elapsed(TimeUnit.MILLISECONDS)); for (Task task : polledTasks) { if (Objects.nonNull(task) && StringUtils.isNotBlank(task.getTaskId())) { LOGGER.trace( @@ -160,6 +165,7 @@ private List pollTasksForWorker(Worker worker) { taskType, domain, worker.getIdentity()); + LOGGER.info("Task {} stayed in the queue for {} ms", taskType, (now-task.getScheduledTime())); tasks.add(task); } } @@ -251,7 +257,10 @@ private void executeTask(Worker worker, Task task) { worker.getClass().getSimpleName(), worker.getIdentity(), result.getStatus()); + Stopwatch updateStopWatch = Stopwatch.createStarted(); updateTaskResult(updateRetryCount, task, result, worker); + updateStopWatch.stop(); + LOGGER.info("Time taken to update the {} {} ms", task.getTaskType(), updateStopWatch.elapsed(TimeUnit.MILLISECONDS)); } private void updateTaskResult(int count, Task task, TaskResult result, Worker worker) { diff --git a/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer.java b/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer.java index de95f3ff..3fdcca40 100644 --- a/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer.java +++ b/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer.java @@ -275,6 +275,7 @@ private void startWorker(Worker worker) { this.taskToThreadCount.getOrDefault(worker.getTaskDefName(), threadCount); final Integer taskPollTimeout = this.taskPollTimeout.getOrDefault(worker.getTaskDefName(), defaultPollTimeout); + final TaskRunner taskRunner = new TaskRunner( eurekaClient, @@ -286,6 +287,10 @@ private void startWorker(Worker worker) { threadCountForTask, taskPollTimeout); this.taskRunners.add(taskRunner); + + LOGGER.info("Starting task runner for {} with pollTimeout set to {} and polling interval set to {}", + worker.getTaskDefName(), taskPollTimeout, worker.getPollingInterval()); + this.scheduledExecutorService.scheduleWithFixedDelay( () -> taskRunner.poll(worker), 0, diff --git a/src/test/java/io/orkes/conductor/client/worker/LocalServerWorkflowExecutionTests.java b/src/test/java/io/orkes/conductor/client/worker/LocalServerWorkflowExecutionTests.java index 1ff2a342..7bb383ac 100644 --- a/src/test/java/io/orkes/conductor/client/worker/LocalServerWorkflowExecutionTests.java +++ b/src/test/java/io/orkes/conductor/client/worker/LocalServerWorkflowExecutionTests.java @@ -32,11 +32,11 @@ public static void main(String[] args) { TaskClient taskClient = new OrkesClients(apiClient).getTaskClient(); Iterable workers = Arrays.asList(new MyWorker()); Map taskToDomain = new HashMap<>(); - taskToDomain.put("simple_task_0", "viren"); TaskRunnerConfigurer configurer = new TaskRunnerConfigurer.Builder(taskClient, workers) .withSleepWhenRetry(100) .withThreadCount(10) + .withTaskPollTimeout(1) .withWorkerNamePrefix("Hello") .withTaskToDomain(taskToDomain) .build(); @@ -46,7 +46,7 @@ public static void main(String[] args) { private static class MyWorker implements Worker { @Override public String getTaskDefName() { - return "simple_task_0"; + return "sample_task_name_simple"; } @Override @@ -60,12 +60,13 @@ public TaskResult execute(Task task) { + new Date()); TaskResult result = new TaskResult(task); result.getOutputData().put("a", "b"); - if (task.getPollCount() < 2) { - result.setCallbackAfterSeconds(5); - } else { - result.setStatus(TaskResult.Status.COMPLETED); - } + result.setStatus(TaskResult.Status.COMPLETED); return result; } + + @Override + public int getPollingInterval() { + return 1000; + } } }