Skip to content

Commit

Permalink
client side improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
v1r3n committed Oct 20, 2022
1 parent 34083d8 commit 690d869
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 8 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=3.0.33
version=3.0.36
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ public class TaskRunnerConfigurer {
private final Map<String /* taskType */, Integer /* threadCount */> taskToThreadCount;
private final Map<String /* taskType */, Integer /* timeoutInMillisecond */> taskPollTimeout;

private final Map<String /* taskType */, Integer /* timeoutInMillisecond */> taskPollCount;

private final ConductorClientConfiguration conductorClientConfiguration;
private Integer defaultPollTimeout;
private Integer defaultPollCount;
private final int threadCount;

private final List<TaskRunner> taskRunners;
Expand All @@ -77,7 +80,9 @@ private TaskRunnerConfigurer(TaskRunnerConfigurer.Builder builder) {
this.taskToDomain = builder.taskToDomain;
this.taskToThreadCount = builder.taskToThreadCount;
this.taskPollTimeout = builder.taskPollTimeout;
this.taskPollCount = builder.taskPollCount;
this.defaultPollTimeout = builder.defaultPollTimeout;
this.defaultPollCount = builder.defaultPollCount;
this.shutdownGracePeriodSeconds = builder.shutdownGracePeriodSeconds;
this.conductorClientConfiguration = builder.conductorClientConfiguration;
this.workers = new LinkedList<>();
Expand All @@ -94,14 +99,17 @@ public static class Builder {
private int threadCount = -1;
private int shutdownGracePeriodSeconds = 10;
private int defaultPollTimeout = 100;

private int defaultPollCount = 20;
private final Iterable<Worker> workers;
private EurekaClient eurekaClient;
private final TaskClient taskClient;
private Map<String /* taskType */, String /* domain */> taskToDomain = new HashMap<>();
private Map<String /* taskType */, Integer /* threadCount */> taskToThreadCount =
new HashMap<>();
private Map<String /* taskType */, Integer /* timeoutInMillisecond */> taskPollTimeout =
new HashMap<>();
private Map<String /* taskType */, Integer /* timeoutInMillisecond */> taskPollTimeout = new HashMap<>();

private Map<String /* taskType */, Integer /* timeoutInMillisecond */> taskPollCount = new HashMap<>();

private ConductorClientConfiguration conductorClientConfiguration =
new DefaultConductorClientConfiguration();
Expand Down Expand Up @@ -206,6 +214,16 @@ public TaskRunnerConfigurer.Builder withTaskPollTimeout(Integer taskPollTimeout)
return this;
}

public TaskRunnerConfigurer.Builder withTaskPollCount(Map<String, Integer> taskPollCount) {
this.taskPollCount = taskPollCount;
return this;
}

public TaskRunnerConfigurer.Builder withTaskPollCount(int defaultPollCount) {
this.defaultPollCount = defaultPollCount;
return this;
}

/**
* Builds an instance of the TaskRunnerConfigurer.
*
Expand Down Expand Up @@ -346,6 +364,7 @@ private void startPooledGRPCWorker(Worker worker, TaskServiceGrpc.TaskServiceStu

final Integer threadCountForTask = this.taskToThreadCount.getOrDefault(worker.getTaskDefName(), threadCount);
final Integer taskPollTimeout = this.taskPollTimeout.getOrDefault(worker.getTaskDefName(), defaultPollTimeout);
final Integer taskPollcount = this.taskPollCount.getOrDefault(worker.getTaskDefName(), defaultPollCount);
String taskType = worker.getTaskDefName();
String domain =
Optional.ofNullable(PropertyFactory.getString(taskType, DOMAIN, null))
Expand All @@ -366,6 +385,7 @@ private void startPooledGRPCWorker(Worker worker, TaskServiceGrpc.TaskServiceStu
blockingStub,
worker,
domain,
taskPollcount,
taskPollTimeout,
executor,
threadCountForTask,
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/orkes/conductor/client/grpc/PoolWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo
}

private void _updateTask(TaskResult taskResult) {
log.debug("Updating task {}", taskResult.getTaskId());
log.trace("Updating task {}", taskResult.getTaskId());
taskResult.getOutputData().put("_clientSendTime", System.currentTimeMillis());
TaskServicePb.UpdateTaskRequest request = TaskServicePb.UpdateTaskRequest.newBuilder().setResult(protoMapper.toProto(taskResult)).build();
blockingStub.updateTask(request);
log.debug("Updated task {}", taskResult.getTaskId());
log.trace("Updated task {}", taskResult.getTaskId());
}

private <T, R> R retryOperation(Function<T, R> operation, int count, T input, String opName) {
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/io/orkes/conductor/client/grpc/PooledPoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@ public class PooledPoller implements StreamObserver<TaskPb.Task> {
private final AtomicLong lastAskedForMessageCount = new AtomicLong(0);
private final Semaphore semaphore;

private final int taskPollCount;

public PooledPoller(
TaskServiceGrpc.TaskServiceStub asyncStub,
TaskServiceGrpc.TaskServiceBlockingStub blockingStub,
Worker worker,
String domain,
int taskPollCount,
Integer taskPollTimeout,
ThreadPoolExecutor executor,
Integer threadCountForTask,
Expand All @@ -68,6 +71,7 @@ public PooledPoller(
this.executor = executor;
this.threadCountForTask = threadCountForTask;
this.semaphore = semaphore;
this.taskPollCount = taskPollCount;
}

public void start() {
Expand Down Expand Up @@ -164,15 +168,15 @@ public void runAccumulatedRequests() {
if (currentPending <= 0) {
return;
}
if(currentPending > 20) {
currentPending = 20;
if(currentPending > taskPollCount) {
currentPending = taskPollCount;
}
// Make GRPC call for these many
// Observe for results, add them to local queue
if (callAgain.get()) {
callAgain.set(false);
lastAskedForMessageCount.set(currentPending);
log.debug("Accumulated {} for {}", currentPending, worker.getTaskDefName());
log.trace("Polling {} for {} tasks", worker.getTaskDefName(), currentPending);
TaskServicePb.BatchPollRequest request = buildPollRequest(currentPending, 1);
asyncStub.batchPoll(request, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public static void main(String[] args) {
.withSleepWhenRetry(10)
.withTaskThreadCount(taskThreadCount)
.withTaskPollTimeout(10)
.withTaskPollCount(5)
.build();
configurer.init();

Expand Down

0 comments on commit 690d869

Please sign in to comment.