diff --git a/gradle.properties b/gradle.properties index fb790a40..c633c4d9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=3.0.33 \ No newline at end of file +version=3.0.36 \ No newline at end of file diff --git a/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer.java b/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer.java index e1c056f3..92d1f603 100644 --- a/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer.java +++ b/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer.java @@ -55,8 +55,11 @@ public class TaskRunnerConfigurer { private final Map taskToThreadCount; private final Map taskPollTimeout; + private final Map taskPollCount; + private final ConductorClientConfiguration conductorClientConfiguration; private Integer defaultPollTimeout; + private Integer defaultPollCount; private final int threadCount; private final List taskRunners; @@ -77,7 +80,9 @@ private TaskRunnerConfigurer(TaskRunnerConfigurer.Builder builder) { this.taskToDomain = builder.taskToDomain; this.taskToThreadCount = builder.taskToThreadCount; this.taskPollTimeout = builder.taskPollTimeout; + this.taskPollCount = builder.taskPollCount; this.defaultPollTimeout = builder.defaultPollTimeout; + this.defaultPollCount = builder.defaultPollCount; this.shutdownGracePeriodSeconds = builder.shutdownGracePeriodSeconds; this.conductorClientConfiguration = builder.conductorClientConfiguration; this.workers = new LinkedList<>(); @@ -94,14 +99,17 @@ public static class Builder { private int threadCount = -1; private int shutdownGracePeriodSeconds = 10; private int defaultPollTimeout = 100; + + private int defaultPollCount = 20; private final Iterable workers; private EurekaClient eurekaClient; private final TaskClient taskClient; private Map taskToDomain = new HashMap<>(); private Map taskToThreadCount = new HashMap<>(); - private Map taskPollTimeout = - new HashMap<>(); + private Map taskPollTimeout = new HashMap<>(); + + private Map taskPollCount = new HashMap<>(); private ConductorClientConfiguration conductorClientConfiguration = new DefaultConductorClientConfiguration(); @@ -206,6 +214,16 @@ public TaskRunnerConfigurer.Builder withTaskPollTimeout(Integer taskPollTimeout) return this; } + public TaskRunnerConfigurer.Builder withTaskPollCount(Map taskPollCount) { + this.taskPollCount = taskPollCount; + return this; + } + + public TaskRunnerConfigurer.Builder withTaskPollCount(int defaultPollCount) { + this.defaultPollCount = defaultPollCount; + return this; + } + /** * Builds an instance of the TaskRunnerConfigurer. * @@ -346,6 +364,7 @@ private void startPooledGRPCWorker(Worker worker, TaskServiceGrpc.TaskServiceStu final Integer threadCountForTask = this.taskToThreadCount.getOrDefault(worker.getTaskDefName(), threadCount); final Integer taskPollTimeout = this.taskPollTimeout.getOrDefault(worker.getTaskDefName(), defaultPollTimeout); + final Integer taskPollcount = this.taskPollCount.getOrDefault(worker.getTaskDefName(), defaultPollCount); String taskType = worker.getTaskDefName(); String domain = Optional.ofNullable(PropertyFactory.getString(taskType, DOMAIN, null)) @@ -366,6 +385,7 @@ private void startPooledGRPCWorker(Worker worker, TaskServiceGrpc.TaskServiceStu blockingStub, worker, domain, + taskPollcount, taskPollTimeout, executor, threadCountForTask, diff --git a/src/main/java/io/orkes/conductor/client/grpc/PoolWorker.java b/src/main/java/io/orkes/conductor/client/grpc/PoolWorker.java index e1211723..46be5fa2 100644 --- a/src/main/java/io/orkes/conductor/client/grpc/PoolWorker.java +++ b/src/main/java/io/orkes/conductor/client/grpc/PoolWorker.java @@ -99,11 +99,11 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo } private void _updateTask(TaskResult taskResult) { - log.debug("Updating task {}", taskResult.getTaskId()); + log.trace("Updating task {}", taskResult.getTaskId()); taskResult.getOutputData().put("_clientSendTime", System.currentTimeMillis()); TaskServicePb.UpdateTaskRequest request = TaskServicePb.UpdateTaskRequest.newBuilder().setResult(protoMapper.toProto(taskResult)).build(); blockingStub.updateTask(request); - log.debug("Updated task {}", taskResult.getTaskId()); + log.trace("Updated task {}", taskResult.getTaskId()); } private R retryOperation(Function operation, int count, T input, String opName) { diff --git a/src/main/java/io/orkes/conductor/client/grpc/PooledPoller.java b/src/main/java/io/orkes/conductor/client/grpc/PooledPoller.java index ddcd8318..fb617614 100644 --- a/src/main/java/io/orkes/conductor/client/grpc/PooledPoller.java +++ b/src/main/java/io/orkes/conductor/client/grpc/PooledPoller.java @@ -51,11 +51,14 @@ public class PooledPoller implements StreamObserver { private final AtomicLong lastAskedForMessageCount = new AtomicLong(0); private final Semaphore semaphore; + private final int taskPollCount; + public PooledPoller( TaskServiceGrpc.TaskServiceStub asyncStub, TaskServiceGrpc.TaskServiceBlockingStub blockingStub, Worker worker, String domain, + int taskPollCount, Integer taskPollTimeout, ThreadPoolExecutor executor, Integer threadCountForTask, @@ -68,6 +71,7 @@ public PooledPoller( this.executor = executor; this.threadCountForTask = threadCountForTask; this.semaphore = semaphore; + this.taskPollCount = taskPollCount; } public void start() { @@ -164,15 +168,15 @@ public void runAccumulatedRequests() { if (currentPending <= 0) { return; } - if(currentPending > 20) { - currentPending = 20; + if(currentPending > taskPollCount) { + currentPending = taskPollCount; } // Make GRPC call for these many // Observe for results, add them to local queue if (callAgain.get()) { callAgain.set(false); lastAskedForMessageCount.set(currentPending); - log.debug("Accumulated {} for {}", currentPending, worker.getTaskDefName()); + log.trace("Polling {} for {} tasks", worker.getTaskDefName(), currentPending); TaskServicePb.BatchPollRequest request = buildPollRequest(currentPending, 1); asyncStub.batchPoll(request, this); } diff --git a/src/test/java/io/orkes/conductor/client/LocalWorkerTest.java b/src/test/java/io/orkes/conductor/client/LocalWorkerTest.java index b8c966b3..18f870e0 100644 --- a/src/test/java/io/orkes/conductor/client/LocalWorkerTest.java +++ b/src/test/java/io/orkes/conductor/client/LocalWorkerTest.java @@ -46,6 +46,7 @@ public static void main(String[] args) { .withSleepWhenRetry(10) .withTaskThreadCount(taskThreadCount) .withTaskPollTimeout(10) + .withTaskPollCount(5) .build(); configurer.init();