Skip to content

Commit

Permalink
extra logging for debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
v1r3n committed Oct 5, 2022
1 parent 799a54c commit 920f410
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ private List<Task> pollTasksForWorker(Worker worker) {
ALL_WORKERS, DOMAIN, null))
.orElse(taskToDomain.get(taskType)));
LOGGER.trace("Polling task of type: {} in domain: '{}'", taskType, domain);
Stopwatch stopwatch = Stopwatch.createStarted();
long now = System.currentTimeMillis();
List<Task> polledTasks =
MetricsContainer.getPollTimer(taskType)
.record(
Expand All @@ -152,6 +154,9 @@ private List<Task> pollTasksForWorker(Worker worker) {
worker.getIdentity(),
domain,
this.getAvailableWorkers()));
stopwatch.stop();
LOGGER.info("Time taken to poll {} tassk with a batch size of {} is {} ms", taskType, getAvailableWorkers(),
stopwatch.elapsed(TimeUnit.MILLISECONDS));
for (Task task : polledTasks) {
if (Objects.nonNull(task) && StringUtils.isNotBlank(task.getTaskId())) {
LOGGER.trace(
Expand All @@ -160,6 +165,7 @@ private List<Task> pollTasksForWorker(Worker worker) {
taskType,
domain,
worker.getIdentity());
LOGGER.info("Task {} stayed in the queue for {} ms", taskType, (now-task.getScheduledTime()));
tasks.add(task);
}
}
Expand Down Expand Up @@ -251,7 +257,10 @@ private void executeTask(Worker worker, Task task) {
worker.getClass().getSimpleName(),
worker.getIdentity(),
result.getStatus());
Stopwatch updateStopWatch = Stopwatch.createStarted();
updateTaskResult(updateRetryCount, task, result, worker);
updateStopWatch.stop();
LOGGER.info("Time taken to update the {} {} ms", task.getTaskType(), updateStopWatch.elapsed(TimeUnit.MILLISECONDS));
}

private void updateTaskResult(int count, Task task, TaskResult result, Worker worker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ private void startWorker(Worker worker) {
this.taskToThreadCount.getOrDefault(worker.getTaskDefName(), threadCount);
final Integer taskPollTimeout =
this.taskPollTimeout.getOrDefault(worker.getTaskDefName(), defaultPollTimeout);

final TaskRunner taskRunner =
new TaskRunner(
eurekaClient,
Expand All @@ -286,6 +287,10 @@ private void startWorker(Worker worker) {
threadCountForTask,
taskPollTimeout);
this.taskRunners.add(taskRunner);

LOGGER.info("Starting task runner for {} with pollTimeout set to {} and polling interval set to {}",
worker.getTaskDefName(), taskPollTimeout, worker.getPollingInterval());

this.scheduledExecutorService.scheduleWithFixedDelay(
() -> taskRunner.poll(worker),
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public static void main(String[] args) {
TaskClient taskClient = new OrkesClients(apiClient).getTaskClient();
Iterable<Worker> workers = Arrays.asList(new MyWorker());
Map<String, String> taskToDomain = new HashMap<>();
taskToDomain.put("simple_task_0", "viren");
TaskRunnerConfigurer configurer =
new TaskRunnerConfigurer.Builder(taskClient, workers)
.withSleepWhenRetry(100)
.withThreadCount(10)
.withTaskPollTimeout(1)
.withWorkerNamePrefix("Hello")
.withTaskToDomain(taskToDomain)
.build();
Expand All @@ -46,7 +46,7 @@ public static void main(String[] args) {
private static class MyWorker implements Worker {
@Override
public String getTaskDefName() {
return "simple_task_0";
return "sample_task_name_simple";
}

@Override
Expand All @@ -60,12 +60,13 @@ public TaskResult execute(Task task) {
+ new Date());
TaskResult result = new TaskResult(task);
result.getOutputData().put("a", "b");
if (task.getPollCount() < 2) {
result.setCallbackAfterSeconds(5);
} else {
result.setStatus(TaskResult.Status.COMPLETED);
}
result.setStatus(TaskResult.Status.COMPLETED);
return result;
}

@Override
public int getPollingInterval() {
return 1000;
}
}
}

0 comments on commit 920f410

Please sign in to comment.