From 804a9f68cf87fef2fedcc2fb6ff5eae0a1de85a3 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sun, 9 Oct 2022 19:44:48 -0700 Subject: [PATCH] updates --- build.gradle | 4 +- .../automator/TaskRunnerConfigurer3.java | 407 ------------------ .../client/grpc/GrpcTaskWorker3.java | 97 ----- .../client/grpc/GrpcWorkflowClient.java | 34 -- .../conductor/client/grpc/PooledPoller.java | 2 +- .../client/grpc/TaskPollObserver3.java | 145 ------- .../grpc/task/BiDirectionalTaskWorker.java | 118 ----- .../grpc/task/PollResponseObserver.java | 65 --- .../grpc/workflow/GrpcWorkflowClient.java | 68 +++ .../workflow/StartWorkflowResponseStream.java | 30 ++ .../client/http/OrkesWorkflowClient.java | 4 +- .../conductor/client/LoadTestWorker.java | 12 - .../java/io/orkes/conductor/client/Main.java | 4 +- 13 files changed, 105 insertions(+), 885 deletions(-) delete mode 100644 src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer3.java delete mode 100644 src/main/java/io/orkes/conductor/client/grpc/GrpcTaskWorker3.java delete mode 100644 src/main/java/io/orkes/conductor/client/grpc/GrpcWorkflowClient.java delete mode 100644 src/main/java/io/orkes/conductor/client/grpc/TaskPollObserver3.java delete mode 100644 src/main/java/io/orkes/conductor/client/grpc/task/BiDirectionalTaskWorker.java delete mode 100644 src/main/java/io/orkes/conductor/client/grpc/task/PollResponseObserver.java create mode 100644 src/main/java/io/orkes/conductor/client/grpc/workflow/GrpcWorkflowClient.java create mode 100644 src/main/java/io/orkes/conductor/client/grpc/workflow/StartWorkflowResponseStream.java diff --git a/build.gradle b/build.gradle index 51da3828..429b9e64 100644 --- a/build.gradle +++ b/build.gradle @@ -60,7 +60,7 @@ dependencies { implementation "org.slf4j:slf4j-api:${versions.slf4j}" - implementation 'io.orkes.conductor:orkes-conductor-common-protos:0.0.29' + implementation 'io.orkes.conductor:orkes-conductor-common-protos:0.0.33' //Grpc dependencies //implementation "io.grpc:grpc-okhttp:${versions.ioGRPC}" @@ -193,7 +193,7 @@ configurations.implementation { spotless { java { - googleJavaFormat().aosp() + googleJavaFormat().aosp().reflowLongStrings(false) removeUnusedImports() importOrder('java', 'javax', 'org', 'com.netflix', 'io.orkes','', '\\#com.netflix', '\\#') licenseHeaderFile("$rootDir/licenseheader.txt") diff --git a/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer3.java b/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer3.java deleted file mode 100644 index f990a4ab..00000000 --- a/src/main/java/io/orkes/conductor/client/automator/TaskRunnerConfigurer3.java +++ /dev/null @@ -1,407 +0,0 @@ -/* - * Copyright 2022 Orkes, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package io.orkes.conductor.client.automator; - -import java.util.*; -import java.util.concurrent.*; -import java.util.stream.Collectors; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.netflix.conductor.client.config.ConductorClientConfiguration; -import com.netflix.conductor.client.config.DefaultConductorClientConfiguration; -import com.netflix.conductor.client.config.PropertyFactory; -import com.netflix.conductor.client.worker.Worker; -import com.netflix.conductor.grpc.TaskServiceGrpc; -import com.netflix.discovery.EurekaClient; - -import io.orkes.conductor.client.ApiClient; -import io.orkes.conductor.client.TaskClient; -import io.orkes.conductor.client.grpc.*; -import io.orkes.conductor.client.http.OrkesTaskClient; -import io.orkes.grpc.service.TaskServiceStreamGrpc; - -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import io.grpc.ManagedChannel; - -import static io.orkes.conductor.client.automator.TaskRunner.ALL_WORKERS; -import static io.orkes.conductor.client.automator.TaskRunner.DOMAIN; -import static io.orkes.conductor.client.grpc.ChannelManager.getChannel; - -public class TaskRunnerConfigurer3 { - private static final Logger LOGGER = LoggerFactory.getLogger(TaskRunnerConfigurer3.class); - - private final EurekaClient eurekaClient; - private final TaskClient taskClient; - - private final ApiClient apiClient; - private final List workers; - private final int sleepWhenRetry; - private final int updateRetryCount; - private final int shutdownGracePeriodSeconds; - private final String workerNamePrefix; - private final Map taskToDomain; - private final Map taskToThreadCount; - private final Map taskPollTimeout; - - private final ConductorClientConfiguration conductorClientConfiguration; - private Integer defaultPollTimeout; - private final int threadCount; - - private final List taskRunners; - - private ScheduledExecutorService scheduledExecutorService; - - /** - * @see TaskRunnerConfigurer3.Builder - * @see TaskRunnerConfigurer3#init() - */ - private TaskRunnerConfigurer3(TaskRunnerConfigurer3.Builder builder) { - this.eurekaClient = builder.eurekaClient; - this.taskClient = builder.taskClient; - this.apiClient = ((OrkesTaskClient) builder.taskClient).getApiClient(); - this.sleepWhenRetry = builder.sleepWhenRetry; - this.updateRetryCount = builder.updateRetryCount; - this.workerNamePrefix = builder.workerNamePrefix; - this.taskToDomain = builder.taskToDomain; - this.taskToThreadCount = builder.taskToThreadCount; - this.taskPollTimeout = builder.taskPollTimeout; - this.defaultPollTimeout = builder.defaultPollTimeout; - this.shutdownGracePeriodSeconds = builder.shutdownGracePeriodSeconds; - this.conductorClientConfiguration = builder.conductorClientConfiguration; - this.workers = new LinkedList<>(); - this.threadCount = builder.threadCount; - builder.workers.forEach(this.workers::add); - taskRunners = new LinkedList<>(); - } - - /** Builder used to create the instances of TaskRunnerConfigurer */ - public static class Builder { - private String workerNamePrefix = "workflow-worker-%d"; - private int sleepWhenRetry = 500; - private int updateRetryCount = 3; - private int threadCount = -1; - private int shutdownGracePeriodSeconds = 10; - private int defaultPollTimeout = 100; - private final Iterable workers; - private EurekaClient eurekaClient; - private final TaskClient taskClient; - private Map taskToDomain = new HashMap<>(); - private Map taskToThreadCount = - new HashMap<>(); - private Map taskPollTimeout = - new HashMap<>(); - - private ConductorClientConfiguration conductorClientConfiguration = - new DefaultConductorClientConfiguration(); - - public Builder(TaskClient taskClient, Iterable workers) { - Preconditions.checkNotNull(taskClient, "TaskClient cannot be null"); - Preconditions.checkNotNull(workers, "Workers cannot be null"); - this.taskClient = taskClient; - this.workers = workers; - } - - /** - * @param workerNamePrefix prefix to be used for worker names, defaults to workflow-worker- - * if not supplied. - * @return Returns the current instance. - */ - public TaskRunnerConfigurer3.Builder withWorkerNamePrefix(String workerNamePrefix) { - this.workerNamePrefix = workerNamePrefix; - return this; - } - - /** - * @param sleepWhenRetry time in milliseconds, for which the thread should sleep when task - * update call fails, before retrying the operation. - * @return Returns the current instance. - */ - public TaskRunnerConfigurer3.Builder withSleepWhenRetry(int sleepWhenRetry) { - this.sleepWhenRetry = sleepWhenRetry; - return this; - } - - /** - * @param updateRetryCount number of times to retry the failed updateTask operation - * @return Builder instance - * @see #withSleepWhenRetry(int) - */ - public TaskRunnerConfigurer3.Builder withUpdateRetryCount(int updateRetryCount) { - this.updateRetryCount = updateRetryCount; - return this; - } - - /** - * @param conductorClientConfiguration client configuration to handle external payloads - * @return Builder instance - */ - public TaskRunnerConfigurer3.Builder withConductorClientConfiguration( - ConductorClientConfiguration conductorClientConfiguration) { - this.conductorClientConfiguration = conductorClientConfiguration; - return this; - } - - /** - * @param shutdownGracePeriodSeconds waiting seconds before forcing shutdown of your worker - * @return Builder instance - */ - public TaskRunnerConfigurer3.Builder withShutdownGracePeriodSeconds( - int shutdownGracePeriodSeconds) { - if (shutdownGracePeriodSeconds < 1) { - throw new IllegalArgumentException( - "Seconds of shutdownGracePeriod cannot be less than 1"); - } - this.shutdownGracePeriodSeconds = shutdownGracePeriodSeconds; - return this; - } - - /** - * @param eurekaClient Eureka client - used to identify if the server is in discovery or - * not. When the server goes out of discovery, the polling is terminated. If passed - * null, discovery check is not done. - * @return Builder instance - */ - public TaskRunnerConfigurer3.Builder withEurekaClient(EurekaClient eurekaClient) { - this.eurekaClient = eurekaClient; - return this; - } - - public TaskRunnerConfigurer3.Builder withTaskToDomain(Map taskToDomain) { - this.taskToDomain = taskToDomain; - return this; - } - - public TaskRunnerConfigurer3.Builder withTaskThreadCount( - Map taskToThreadCount) { - this.taskToThreadCount = taskToThreadCount; - return this; - } - - public TaskRunnerConfigurer3.Builder withTaskToThreadCount( - Map taskToThreadCount) { - this.taskToThreadCount = taskToThreadCount; - return this; - } - - public TaskRunnerConfigurer3.Builder withTaskPollTimeout( - Map taskPollTimeout) { - this.taskPollTimeout = taskPollTimeout; - return this; - } - - public TaskRunnerConfigurer3.Builder withTaskPollTimeout(Integer taskPollTimeout) { - this.defaultPollTimeout = taskPollTimeout; - return this; - } - - /** - * Builds an instance of the TaskRunnerConfigurer. - * - *

Please see {@link TaskRunnerConfigurer3#init()} method. The method must be called - * after this constructor for the polling to start. - * - * @return Builder instance - */ - public TaskRunnerConfigurer3 build() { - return new TaskRunnerConfigurer3(this); - } - - /** - * @param threadCount # of threads assigned to the workers. Should be at-least the size of - * taskWorkers to avoid starvation in a busy system. - * @return Builder instance - */ - public Builder withThreadCount(int threadCount) { - if (threadCount < 1) { - throw new IllegalArgumentException("No. of threads cannot be less than 1"); - } - this.threadCount = threadCount; - return this; - } - } - - /** - * @return seconds before forcing shutdown of worker - */ - public int getShutdownGracePeriodSeconds() { - return shutdownGracePeriodSeconds; - } - - /** - * @return sleep time in millisecond before task update retry is done when receiving error from - * the Conductor server - */ - public int getSleepWhenRetry() { - return sleepWhenRetry; - } - - /** - * @return Number of times updateTask should be retried when receiving error from Conductor - * server - */ - public int getUpdateRetryCount() { - return updateRetryCount; - } - - /** - * @return prefix used for worker names - */ - public String getWorkerNamePrefix() { - return workerNamePrefix; - } - - /** - * Starts the polling. Must be called after {@link TaskRunnerConfigurer3.Builder#build()} - * method. - */ - public synchronized void init() { - this.scheduledExecutorService = Executors.newScheduledThreadPool(workers.size()); - if (apiClient.isUseGRPC()) { - LOGGER.info("Using gRPC for task poll/update"); - - ManagedChannel channel = getChannel(apiClient); - - int totalThreads = - this.taskToThreadCount.values().stream() - .collect(Collectors.summingInt(Integer::intValue)); - if (totalThreads == 0) { - totalThreads = this.threadCount; - } - LOGGER.info("Using {} threads for grpc channels", totalThreads); - ExecutorService executor = getExecutor(totalThreads); - TaskServiceGrpc.TaskServiceStub asyncStub = - TaskServiceGrpc.newStub(channel) - .withInterceptors(new HeaderClientInterceptor(apiClient)) - .withExecutor(executor); - - TaskServiceStreamGrpc.TaskServiceStreamStub orkesTaskService = - TaskServiceStreamGrpc.newStub(channel) - .withInterceptors(new HeaderClientInterceptor(apiClient)) - .withExecutor(executor); - - TaskUpdateObserver taskUpdateObserver = new TaskUpdateObserver(); - LOGGER.info("Going to start {}", workers); - workers.forEach( - worker -> - scheduledExecutorService.submit( - () -> - this.startGRPCWorker( - worker, - orkesTaskService, - asyncStub, - taskUpdateObserver))); - } else { - workers.forEach( - worker -> scheduledExecutorService.submit(() -> this.startWorker(worker))); - } - } - - private ThreadPoolExecutor getExecutor(int threadPoolSize) { - return new ThreadPoolExecutor( - threadPoolSize, - threadPoolSize, - 0, - TimeUnit.SECONDS, - new ArrayBlockingQueue<>(threadPoolSize) { - @Override - public boolean offer(Runnable runnable) { - try { - return super.offer(runnable, 200, TimeUnit.MILLISECONDS); - } catch (InterruptedException ie) { - return false; - } - } - }, - new ThreadFactoryBuilder().setNameFormat("task-update-thread-%d").build()); - } - - /** - * Invoke this method within a PreDestroy block within your application to facilitate a graceful - * shutdown of your worker, during process termination. - */ - public void shutdown() { - this.taskRunners.forEach(taskRunner -> taskRunner.shutdown(shutdownGracePeriodSeconds)); - this.scheduledExecutorService.shutdown(); - } - - private void startWorker(Worker worker) { - LOGGER.info("Starting worker: {} with ", worker.getTaskDefName()); - final Integer threadCountForTask = - this.taskToThreadCount.getOrDefault(worker.getTaskDefName(), threadCount); - final Integer taskPollTimeout = - this.taskPollTimeout.getOrDefault(worker.getTaskDefName(), defaultPollTimeout); - final TaskRunner taskRunner = - new TaskRunner( - worker, - eurekaClient, - taskClient, - conductorClientConfiguration, - updateRetryCount, - taskToDomain, - workerNamePrefix, - threadCountForTask, - taskPollTimeout); - this.taskRunners.add(taskRunner); - taskRunner.pollAndExecute(); - } - - private void startGRPCWorker( - Worker worker, - TaskServiceStreamGrpc.TaskServiceStreamStub orkesTaskService, - TaskServiceGrpc.TaskServiceStub asyncStub, - TaskUpdateObserver taskUpdateObserver) { - - final Integer threadCountForTask = - this.taskToThreadCount.getOrDefault(worker.getTaskDefName(), threadCount); - final Integer taskPollTimeout = - this.taskPollTimeout.getOrDefault(worker.getTaskDefName(), defaultPollTimeout); - String taskType = worker.getTaskDefName(); - String domain = - Optional.ofNullable(PropertyFactory.getString(taskType, DOMAIN, null)) - .orElseGet( - () -> - Optional.ofNullable( - PropertyFactory.getString( - ALL_WORKERS, DOMAIN, null)) - .orElse(taskToDomain.get(taskType))); - - LOGGER.info( - "Starting gRPC worker: {} with {} threads", - worker.getTaskDefName(), - threadCountForTask); - ThreadPoolExecutor executor = - new ThreadPoolExecutor( - threadCountForTask, - threadCountForTask, - 1, - TimeUnit.MINUTES, - new ArrayBlockingQueue<>(threadCountForTask * 100)); - - TaskPollObserver3 taskPollObserver = - new TaskPollObserver3( - worker, orkesTaskService, executor, asyncStub, taskUpdateObserver); - GrpcTaskWorker3 grpcTaskWorker = - new GrpcTaskWorker3( - asyncStub, - taskPollObserver, - executor, - worker, - domain, - threadCountForTask, - taskPollTimeout); - grpcTaskWorker.init(); - } -} diff --git a/src/main/java/io/orkes/conductor/client/grpc/GrpcTaskWorker3.java b/src/main/java/io/orkes/conductor/client/grpc/GrpcTaskWorker3.java deleted file mode 100644 index 09a5db7f..00000000 --- a/src/main/java/io/orkes/conductor/client/grpc/GrpcTaskWorker3.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 2022 Orkes, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package io.orkes.conductor.client.grpc; - -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import com.netflix.conductor.client.worker.Worker; -import com.netflix.conductor.grpc.TaskServiceGrpc; -import com.netflix.conductor.grpc.TaskServicePb; - -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class GrpcTaskWorker3 { - - private final TaskServiceGrpc.TaskServiceStub asyncStub; - - private final Worker worker; - - private final String domain; - - private final ThreadPoolExecutor executor; - - private final int threadCount; - - private final int pollTimeoutInMills; - - private final TaskPollObserver3 taskPollObserver; - - private final int bufferSize; - - public GrpcTaskWorker3( - TaskServiceGrpc.TaskServiceStub asyncStub, - TaskPollObserver3 taskPollObserver, - ThreadPoolExecutor executor, - Worker worker, - String domain, - int threadCount, - int pollTimeoutInMills) { - this.worker = worker; - this.domain = domain; - this.threadCount = threadCount; - this.pollTimeoutInMills = pollTimeoutInMills; - this.asyncStub = asyncStub; - this.taskPollObserver = taskPollObserver; - this.executor = executor; - this.bufferSize = this.threadCount * 2; - } - - public void init() { - Executors.newSingleThreadScheduledExecutor() - .scheduleAtFixedRate( - () -> _pollAndExecute(), - worker.getPollingInterval(), - worker.getPollingInterval(), - TimeUnit.MILLISECONDS); - } - - private void _pollAndExecute() { - int pollCount = getPollCount(); - if (pollCount < 1) { - return; - } - log.debug("Polling {} for {} tasks", worker.getTaskDefName(), pollCount); - TaskServicePb.BatchPollRequest request = buildPollRequest(pollCount, pollTimeoutInMills); - asyncStub.batchPoll(request, taskPollObserver); - } - - private TaskServicePb.BatchPollRequest buildPollRequest(int count, int timeoutInMillisecond) { - TaskServicePb.BatchPollRequest.Builder requestBuilder = - TaskServicePb.BatchPollRequest.newBuilder() - .setCount(count) - .setTaskType(worker.getTaskDefName()) - .setTimeout(timeoutInMillisecond) - .setWorkerId(worker.getIdentity()); - if (domain != null) { - requestBuilder = requestBuilder.setDomain(domain); - } - return requestBuilder.build(); - } - - private int getPollCount() { - return (this.bufferSize) - this.executor.getActiveCount(); - } -} diff --git a/src/main/java/io/orkes/conductor/client/grpc/GrpcWorkflowClient.java b/src/main/java/io/orkes/conductor/client/grpc/GrpcWorkflowClient.java deleted file mode 100644 index 02ba95e0..00000000 --- a/src/main/java/io/orkes/conductor/client/grpc/GrpcWorkflowClient.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2022 Orkes, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package io.orkes.conductor.client.grpc; - -import java.util.UUID; -import java.util.concurrent.CompletableFuture; - -import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; - -import io.orkes.conductor.client.ApiClient; -import io.orkes.conductor.client.model.WorkflowRun; - -public class GrpcWorkflowClient { - - public GrpcWorkflowClient(ApiClient apiClient) {} - - public CompletableFuture executeWorkflow(StartWorkflowRequest request) { - String requestId = UUID.randomUUID().toString(); - request.getInput().put("_X-request-id", requestId); - CompletableFuture future = new CompletableFuture<>(); - future.complete(new WorkflowRun()); - return future; - } -} diff --git a/src/main/java/io/orkes/conductor/client/grpc/PooledPoller.java b/src/main/java/io/orkes/conductor/client/grpc/PooledPoller.java index 01786156..0da997f5 100644 --- a/src/main/java/io/orkes/conductor/client/grpc/PooledPoller.java +++ b/src/main/java/io/orkes/conductor/client/grpc/PooledPoller.java @@ -236,7 +236,7 @@ public void onCompleted() { private void drain() { long didntGetMessageCount = lastAskedForMessageCount.get(); - if(didntGetMessageCount > 0) { + if (didntGetMessageCount > 0) { log.info("Didn't get {} messages from server as expected", didntGetMessageCount); for (int i = 0; i < didntGetMessageCount; i++) { this.saveTask(TaskPb.Task.newBuilder().setTaskId("NO_OP").build()); diff --git a/src/main/java/io/orkes/conductor/client/grpc/TaskPollObserver3.java b/src/main/java/io/orkes/conductor/client/grpc/TaskPollObserver3.java deleted file mode 100644 index c13d903b..00000000 --- a/src/main/java/io/orkes/conductor/client/grpc/TaskPollObserver3.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright 2022 Orkes, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package io.orkes.conductor.client.grpc; - -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; - -import com.netflix.conductor.client.worker.Worker; -import com.netflix.conductor.common.metadata.tasks.TaskResult; -import com.netflix.conductor.grpc.TaskServiceGrpc; -import com.netflix.conductor.grpc.TaskServicePb; -import com.netflix.conductor.proto.ProtoMappingHelper; -import com.netflix.conductor.proto.TaskPb; -import com.netflix.conductor.proto.TaskResultPb; - -import io.orkes.grpc.service.TaskServiceStreamGrpc; - -import io.grpc.Status; -import io.grpc.stub.StreamObserver; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class TaskPollObserver3 implements StreamObserver { - - private final ProtoMappingHelper protoMapper = ProtoMappingHelper.INSTANCE; - - private final Worker worker; - - private final ThreadPoolExecutor executor; - - private final TaskUpdateObserver taskUpdateObserver; - - private final TaskServiceGrpc.TaskServiceStub asyncStub; - - private final TaskServiceStreamGrpc.TaskServiceStreamStub orkesTaskService; - - private StreamObserver taskUpdateStream; - - public TaskPollObserver3( - Worker worker, - TaskServiceStreamGrpc.TaskServiceStreamStub orkesTaskService, - ThreadPoolExecutor executor, - TaskServiceGrpc.TaskServiceStub asyncStub, - TaskUpdateObserver taskUpdateObserver) { - this.worker = worker; - this.executor = executor; - this.taskUpdateObserver = taskUpdateObserver; - this.orkesTaskService = orkesTaskService; - this.asyncStub = asyncStub; - - taskUpdateStream = orkesTaskService.updateTaskResult(new TaskUpdateObserver()); - } - - @Override - public void onNext(TaskPb.Task task) { - try { - log.debug("Executor size {}", executor.getActiveCount()); - executor.execute( - () -> { - try { - log.debug("Executing task {}", task.getTaskId()); - TaskResult result = worker.execute(protoMapper.fromProto(task)); - log.debug("Executed task {}", task.getTaskId()); - updateTaskAsync(result); - } catch (Exception e) { - log.error("Error executing task: {}", e.getMessage(), e); - } - }); - } catch (RejectedExecutionException ree) { - log.error(ree.getMessage(), ree); - } - } - - @Override - public void onError(Throwable t) { - log.error("Error {}", t.getMessage()); - t.printStackTrace(); - System.exit(1); - Status status = Status.fromThrowable(t); - Status.Code code = status.getCode(); - switch (code) { - case UNAVAILABLE: - log.trace("Server not available "); - break; - case UNAUTHENTICATED: - log.error("{} - Invalid or missing api key/secret", code); - break; - case CANCELLED: - case ABORTED: - case DATA_LOSS: - case DEADLINE_EXCEEDED: - break; - default: - log.error("Error from server when polling for the task {} - {}", code); - } - } - - @Override - public void onCompleted() {} - - public void updateTask(TaskResult taskResult) { - log.info("Updating task {}", taskResult.getTaskId()); - TaskServicePb.UpdateTaskRequest request = - TaskServicePb.UpdateTaskRequest.newBuilder() - .setResult(protoMapper.toProto(taskResult)) - .build(); - asyncStub.updateTask(request, taskUpdateObserver); - log.info("Updated task {}", taskResult.getTaskId()); - } - - public void updateTaskAsync(TaskResult taskResult) { - try { - log.info("Updating task async {}", taskResult.getTaskId()); - taskUpdateStream.onNext(protoMapper.toProto(taskResult)); - log.info("Updated task async {}", taskResult.getTaskId()); - } catch (Throwable e) { - log.error(e.getMessage(), e); - System.exit(1); - } - } - - private TaskServicePb.BatchPollRequest buildPollRequest( - String domain, int count, int timeoutInMillisecond) { - TaskServicePb.BatchPollRequest.Builder requestBuilder = - TaskServicePb.BatchPollRequest.newBuilder() - .setCount(count) - .setTaskType(worker.getTaskDefName()) - .setTimeout(timeoutInMillisecond) - .setWorkerId(worker.getIdentity()); - if (domain != null) { - requestBuilder = requestBuilder.setDomain(domain); - } - return requestBuilder.build(); - } -} diff --git a/src/main/java/io/orkes/conductor/client/grpc/task/BiDirectionalTaskWorker.java b/src/main/java/io/orkes/conductor/client/grpc/task/BiDirectionalTaskWorker.java deleted file mode 100644 index 79089ff6..00000000 --- a/src/main/java/io/orkes/conductor/client/grpc/task/BiDirectionalTaskWorker.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2022 Orkes, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package io.orkes.conductor.client.grpc.task; - -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import com.netflix.conductor.client.worker.Worker; -import com.netflix.conductor.grpc.TaskServiceGrpc; -import com.netflix.conductor.grpc.TaskServicePb; - -import io.orkes.conductor.client.grpc.TaskPollObserver; -import io.orkes.grpc.service.TaskServiceStreamGrpc; - -import io.grpc.stub.StreamObserver; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class BiDirectionalTaskWorker { - - private final TaskServiceGrpc.TaskServiceStub asyncStub; - - private final Worker worker; - - private final String domain; - - private final ThreadPoolExecutor executor; - - private final int threadCount; - - private final int pollTimeoutInMills; - - private final TaskPollObserver taskPollObserver; - - private StreamObserver requestObserver; - - private final PollResponseObserver pollResponseObserver; - - private final TaskServiceStreamGrpc.TaskServiceStreamStub bidiService; - - public BiDirectionalTaskWorker( - TaskServiceStreamGrpc.TaskServiceStreamStub bidiService, - TaskServiceGrpc.TaskServiceStub asyncStub, - TaskPollObserver taskPollObserver, - ThreadPoolExecutor executor, - Worker worker, - String domain, - int threadCount, - int pollTimeoutInMills) { - this.bidiService = bidiService; - this.worker = worker; - this.domain = domain; - this.threadCount = threadCount; - this.pollTimeoutInMills = pollTimeoutInMills; - this.asyncStub = asyncStub; - this.taskPollObserver = taskPollObserver; - this.executor = executor; - this.pollResponseObserver = new PollResponseObserver(); - } - - public void init() { - requestObserver = bidiService.taskPoll(pollResponseObserver); - pollResponseObserver.setReady(); - Executors.newSingleThreadScheduledExecutor() - .scheduleAtFixedRate( - () -> _pollAndExecute(), - worker.getPollingInterval(), - worker.getPollingInterval(), - TimeUnit.MILLISECONDS); - } - - private void _pollAndExecute() { - int pollCount = getAvailableWorkers(); - if (pollCount < 1) { - return; - } - - try { - if (!pollResponseObserver.isReady()) { - log.warn("Connection not ready yet..."); - requestObserver = bidiService.taskPoll(pollResponseObserver); - pollResponseObserver.setReady(); - return; - } - requestObserver.onNext(buildPollRequest(pollCount, pollTimeoutInMills)); - } catch (Throwable t) { - log.error("Error sending request {}", t.getMessage()); - } - } - - private TaskServicePb.BatchPollRequest buildPollRequest(int count, int timeoutInMillisecond) { - TaskServicePb.BatchPollRequest.Builder requestBuilder = - TaskServicePb.BatchPollRequest.newBuilder() - .setCount(count) - .setTaskType(worker.getTaskDefName()) - .setTimeout(timeoutInMillisecond) - .setWorkerId(worker.getIdentity()); - if (domain != null) { - requestBuilder = requestBuilder.setDomain(domain); - } - return requestBuilder.build(); - } - - private int getAvailableWorkers() { - return (this.threadCount * 2) - this.executor.getActiveCount(); - } -} diff --git a/src/main/java/io/orkes/conductor/client/grpc/task/PollResponseObserver.java b/src/main/java/io/orkes/conductor/client/grpc/task/PollResponseObserver.java deleted file mode 100644 index 1dabda90..00000000 --- a/src/main/java/io/orkes/conductor/client/grpc/task/PollResponseObserver.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2022 Orkes, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package io.orkes.conductor.client.grpc.task; - -import com.netflix.conductor.grpc.TaskServicePb; - -import io.grpc.Status; -import io.grpc.stub.StreamObserver; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class PollResponseObserver implements StreamObserver { - - private boolean ready = true; - - @Override - public void onNext(TaskServicePb.PollResponse response) { - log.debug("got task to execute {}", response.getTask().getTaskId()); - // Execute the task here - // Send to another BiDi stream for update - } - - @Override - public void onError(Throwable t) { - Status status = Status.fromThrowable(t); - Status.Code code = status.getCode(); - switch (code) { - case UNAVAILABLE: - log.trace("Server not available "); - ready = false; - break; - case UNAUTHENTICATED: - log.error("{} - Invalid or missing api key/secret", code); - break; - case CANCELLED: - case ABORTED: - case DATA_LOSS: - case DEADLINE_EXCEEDED: - break; - default: - log.error("Error from server when polling for the task {} - {}", code); - } - } - - @Override - public void onCompleted() {} - - public boolean isReady() { - return ready; - } - - public void setReady() { - ready = true; - } -} diff --git a/src/main/java/io/orkes/conductor/client/grpc/workflow/GrpcWorkflowClient.java b/src/main/java/io/orkes/conductor/client/grpc/workflow/GrpcWorkflowClient.java new file mode 100644 index 00000000..c05dc2e7 --- /dev/null +++ b/src/main/java/io/orkes/conductor/client/grpc/workflow/GrpcWorkflowClient.java @@ -0,0 +1,68 @@ +/* + * Copyright 2022 Orkes, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.client.grpc.workflow; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; + +import io.orkes.conductor.client.ApiClient; +import io.orkes.conductor.client.grpc.HeaderClientInterceptor; +import io.orkes.conductor.client.model.WorkflowRun; +import io.orkes.grpc.service.OrkesWorkflowService; +import io.orkes.grpc.service.WorkflowServiceStreamGrpc; + +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; + +import static io.orkes.conductor.client.grpc.ChannelManager.getChannel; + +@Slf4j +public class GrpcWorkflowClient { + + private WorkflowServiceStreamGrpc.WorkflowServiceStreamStub stub; + + private StreamObserver requestStream; + + public GrpcWorkflowClient(ApiClient apiClient) { + stub = + WorkflowServiceStreamGrpc.newStub(getChannel(apiClient)) + .withInterceptors(new HeaderClientInterceptor(apiClient)); + requestStream = stub.startWorkflow(new StartWorkflowResponseStream()); + } + + public CompletableFuture executeWorkflow( + StartWorkflowRequest startWorkflowRequest, String waitUntilTask) { + String requestId = UUID.randomUUID().toString(); + startWorkflowRequest.getInput().put("_X-request-id", requestId); + + OrkesWorkflowService.StartWorkflowRequest.Builder requestBuilder = + OrkesWorkflowService.StartWorkflowRequest.newBuilder(); + requestBuilder + .setRequestId(requestId) + .setIdempotencyKey(requestId) + .setMonitor(true) + .setWaitUntilTask(waitUntilTask) + .build(); + try { + requestStream.onNext(requestBuilder.build()); + } catch (Throwable t) { + log.error("Error starting a workflow {}", t.getMessage(), t); + } + + CompletableFuture future = new CompletableFuture<>(); + future.complete(new WorkflowRun()); + return future; + } +} diff --git a/src/main/java/io/orkes/conductor/client/grpc/workflow/StartWorkflowResponseStream.java b/src/main/java/io/orkes/conductor/client/grpc/workflow/StartWorkflowResponseStream.java new file mode 100644 index 00000000..c9c91bb9 --- /dev/null +++ b/src/main/java/io/orkes/conductor/client/grpc/workflow/StartWorkflowResponseStream.java @@ -0,0 +1,30 @@ +/* + * Copyright 2022 Orkes, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.client.grpc.workflow; + +import io.orkes.grpc.service.OrkesWorkflowService; + +import io.grpc.stub.StreamObserver; + +public class StartWorkflowResponseStream + implements StreamObserver { + + @Override + public void onNext(OrkesWorkflowService.StartWorkflowResponse value) {} + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() {} +} diff --git a/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java b/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java index 560ce7b8..b24534e7 100644 --- a/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java +++ b/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java @@ -27,7 +27,7 @@ import io.orkes.conductor.client.ApiClient; import io.orkes.conductor.client.WorkflowClient; -import io.orkes.conductor.client.grpc.GrpcWorkflowClient; +import io.orkes.conductor.client.grpc.workflow.GrpcWorkflowClient; import io.orkes.conductor.client.http.api.WorkflowBulkResourceApi; import io.orkes.conductor.client.http.api.WorkflowResourceApi; import io.orkes.conductor.client.model.WorkflowRun; @@ -56,7 +56,7 @@ public String startWorkflow(StartWorkflowRequest startWorkflowRequest) { } public CompletableFuture executeWorkflow(StartWorkflowRequest request) { - return grpcWorkflowClient.executeWorkflow(request); + return grpcWorkflowClient.executeWorkflow(request, null); } @Override diff --git a/src/test/java/io/orkes/conductor/client/LoadTestWorker.java b/src/test/java/io/orkes/conductor/client/LoadTestWorker.java index a778da67..91b5ee86 100644 --- a/src/test/java/io/orkes/conductor/client/LoadTestWorker.java +++ b/src/test/java/io/orkes/conductor/client/LoadTestWorker.java @@ -34,14 +34,6 @@ public LoadTestWorker(String name) { this.name = name; } - private static String generateRandomString() { - - Random random = new Random(); - int wordCount = Math.max(1, random.nextInt(5)); - StringBuilder sb = new StringBuilder(); - return sb.toString(); - } - @Override public String getTaskDefName() { return name; @@ -72,10 +64,6 @@ public TaskResult execute(Task task) { result.addOutputData("scheduledTime", task.getScheduledTime()); result.addOutputData("startTime", task.getStartTime()); - for (int i = 0; i < resultCount; i++) { - result.getOutputData().put("key" + i, generateRandomString()); - } - return result; } diff --git a/src/test/java/io/orkes/conductor/client/Main.java b/src/test/java/io/orkes/conductor/client/Main.java index f1d140af..0f12ba86 100644 --- a/src/test/java/io/orkes/conductor/client/Main.java +++ b/src/test/java/io/orkes/conductor/client/Main.java @@ -25,7 +25,7 @@ import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; import io.orkes.conductor.client.automator.TaskRunnerConfigurer; -import io.orkes.conductor.client.grpc.GrpcWorkflowClient; +import io.orkes.conductor.client.grpc.workflow.GrpcWorkflowClient; import io.orkes.conductor.client.model.WorkflowRun; public class Main { @@ -69,7 +69,7 @@ public static void main(String[] args) throws InterruptedException { } request.setInput(input); try { - CompletableFuture future = client.executeWorkflow(request); + CompletableFuture future = client.executeWorkflow(request, null); future.thenAccept( workflowRun -> { System.out.println(