+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.google.gson.annotations;
+
+public @interface SerializedName {
+ String value();
+}
diff --git a/src/main/java/com/netflix/conductor/client/config/ConductorClientConfiguration.java b/src/main/java/com/netflix/conductor/client/config/ConductorClientConfiguration.java
new file mode 100644
index 00000000..0aaad6fb
--- /dev/null
+++ b/src/main/java/com/netflix/conductor/client/config/ConductorClientConfiguration.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2018 Orkes, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.client.config;
+
+public interface ConductorClientConfiguration {
+
+ /**
+ * @return the workflow input payload size threshold in KB, beyond which the payload will be
+ * processed based on {@link
+ * ConductorClientConfiguration#isExternalPayloadStorageEnabled()}.
+ */
+ int getWorkflowInputPayloadThresholdKB();
+
+ /**
+ * @return the max value of workflow input payload size threshold in KB, beyond which the
+ * payload will be rejected regardless external payload storage is enabled.
+ */
+ int getWorkflowInputMaxPayloadThresholdKB();
+
+ /**
+ * @return the task output payload size threshold in KB, beyond which the payload will be
+ * processed based on {@link
+ * ConductorClientConfiguration#isExternalPayloadStorageEnabled()}.
+ */
+ int getTaskOutputPayloadThresholdKB();
+
+ /**
+ * @return the max value of task output payload size threshold in KB, beyond which the payload
+ * will be rejected regardless external payload storage is enabled.
+ */
+ int getTaskOutputMaxPayloadThresholdKB();
+
+ /**
+ * @return the flag which controls the use of external storage for storing workflow/task input
+ * and output JSON payloads with size greater than threshold. If it is set to true, the
+ * payload is stored in external location. If it is set to false, the payload is rejected
+ * and the task/workflow execution fails.
+ */
+ boolean isExternalPayloadStorageEnabled();
+}
diff --git a/src/main/java/com/netflix/conductor/client/config/PropertyFactory.java b/src/main/java/com/netflix/conductor/client/config/PropertyFactory.java
new file mode 100644
index 00000000..79d4a510
--- /dev/null
+++ b/src/main/java/com/netflix/conductor/client/config/PropertyFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2020 Orkes, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.client.config;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.netflix.config.DynamicProperty;
+
+/** Used to configure the Conductor workers using properties. */
+public class PropertyFactory {
+
+ private final DynamicProperty global;
+ private final DynamicProperty local;
+
+ private static final String PROPERTY_PREFIX = "conductor.worker";
+
+ private static final ConcurrentHashMap PROPERTY_FACTORY_MAP =
+ new ConcurrentHashMap<>();
+
+ private PropertyFactory(String prefix, String propName, String workerName) {
+ this.global = DynamicProperty.getInstance(prefix + "." + propName);
+ this.local = DynamicProperty.getInstance(prefix + "." + workerName + "." + propName);
+ }
+
+ /**
+ * @param defaultValue Default Value
+ * @return Returns the value as integer. If not value is set (either global or worker specific),
+ * then returns the default value.
+ */
+ public Integer getInteger(int defaultValue) {
+ Integer value = local.getInteger();
+ if (value == null) {
+ value = global.getInteger(defaultValue);
+ }
+ return value;
+ }
+
+ /**
+ * @param defaultValue Default Value
+ * @return Returns the value as String. If not value is set (either global or worker specific),
+ * then returns the default value.
+ */
+ public String getString(String defaultValue) {
+ String value = local.getString();
+ if (value == null) {
+ value = global.getString(defaultValue);
+ }
+ return value;
+ }
+
+ /**
+ * @param defaultValue Default Value
+ * @return Returns the value as Boolean. If not value is set (either global or worker specific),
+ * then returns the default value.
+ */
+ public Boolean getBoolean(Boolean defaultValue) {
+ Boolean value = local.getBoolean();
+ if (value == null) {
+ value = global.getBoolean(defaultValue);
+ }
+ return value;
+ }
+
+ public static Integer getInteger(String workerName, String property, Integer defaultValue) {
+ return getPropertyFactory(workerName, property).getInteger(defaultValue);
+ }
+
+ public static Boolean getBoolean(String workerName, String property, Boolean defaultValue) {
+ return getPropertyFactory(workerName, property).getBoolean(defaultValue);
+ }
+
+ public static String getString(String workerName, String property, String defaultValue) {
+ return getPropertyFactory(workerName, property).getString(defaultValue);
+ }
+
+ private static PropertyFactory getPropertyFactory(String workerName, String property) {
+ String key = property + "." + workerName;
+ return PROPERTY_FACTORY_MAP.computeIfAbsent(
+ key, t -> new PropertyFactory(PROPERTY_PREFIX, property, workerName));
+ }
+}
diff --git a/src/main/java/com/netflix/conductor/client/exception/ConductorClientException.java b/src/main/java/com/netflix/conductor/client/exception/ConductorClientException.java
new file mode 100644
index 00000000..2dc7c941
--- /dev/null
+++ b/src/main/java/com/netflix/conductor/client/exception/ConductorClientException.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2020 Orkes, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.client.exception;
+
+import java.util.List;
+
+import com.netflix.conductor.common.validation.ErrorResponse;
+import com.netflix.conductor.common.validation.ValidationError;
+
+/** Client exception thrown from Conductor api clients. */
+public class ConductorClientException extends RuntimeException {
+
+ private int status;
+ private String message;
+ private String instance;
+ private String code;
+ private boolean retryable;
+
+ public List getValidationErrors() {
+ return validationErrors;
+ }
+
+ public void setValidationErrors(List validationErrors) {
+ this.validationErrors = validationErrors;
+ }
+
+ private List validationErrors;
+
+ public ConductorClientException() {
+ super();
+ }
+
+ public ConductorClientException(String message) {
+ super(message);
+ this.message = message;
+ }
+
+ public ConductorClientException(String message, Throwable cause) {
+ super(message, cause);
+ this.message = message;
+ }
+
+ public ConductorClientException(int status, String message) {
+ super(message);
+ this.status = status;
+ this.message = message;
+ }
+
+ public ConductorClientException(int status, ErrorResponse errorResponse) {
+ super(errorResponse.getMessage());
+ this.status = status;
+ this.retryable = errorResponse.isRetryable();
+ this.message = errorResponse.getMessage();
+ this.code = errorResponse.getCode();
+ this.instance = errorResponse.getInstance();
+ this.validationErrors = errorResponse.getValidationErrors();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+
+ builder.append(getClass().getName()).append(": ");
+
+ if (this.message != null) {
+ builder.append(message);
+ }
+
+ if (status > 0) {
+ builder.append(" {status=").append(status);
+ if (this.code != null) {
+ builder.append(", code='").append(code).append("'");
+ }
+
+ builder.append(", retryable: ").append(retryable);
+ }
+
+ if (this.instance != null) {
+ builder.append(", instance: ").append(instance);
+ }
+
+ if (this.validationErrors != null) {
+ builder.append(", validationErrors: ").append(validationErrors.toString());
+ }
+
+ builder.append("}");
+ return builder.toString();
+ }
+
+ public String getCode() {
+ return code;
+ }
+
+ public void setCode(String code) {
+ this.code = code;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public String getInstance() {
+ return instance;
+ }
+
+ public void setInstance(String instance) {
+ this.instance = instance;
+ }
+
+ public boolean isRetryable() {
+ return retryable;
+ }
+
+ public void setRetryable(boolean retryable) {
+ this.retryable = retryable;
+ }
+
+ @Override
+ public String getMessage() {
+ return this.message;
+ }
+
+ public int getStatus() {
+ return this.status;
+ }
+}
diff --git a/src/main/java/com/netflix/conductor/client/http/EventClient.java b/src/main/java/com/netflix/conductor/client/http/EventClient.java
new file mode 100644
index 00000000..00c12d82
--- /dev/null
+++ b/src/main/java/com/netflix/conductor/client/http/EventClient.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.client.http;
+
+import java.util.List;
+
+import com.netflix.conductor.common.metadata.events.EventHandler;
+
+// Client class for all Event Handler operations
+public abstract class EventClient{
+
+ /** Creates a default metadata client */
+ public EventClient() {
+ }
+
+
+
+ /**
+ * Register an event handler with the server.
+ *
+ * @param eventHandler the eventHandler definition.
+ */
+ public abstract void registerEventHandler(EventHandler eventHandler);
+
+ /**
+ * Updates an event handler with the server.
+ *
+ * @param eventHandler the eventHandler definition.
+ */
+ public abstract void updateEventHandler(EventHandler eventHandler);
+
+ /**
+ * @param event name of the event.
+ * @param activeOnly if true, returns only the active handlers.
+ * @return Returns the list of all the event handlers for a given event.
+ */
+ public abstract List getEventHandlers(String event, boolean activeOnly);
+
+ /**
+ * Removes the event handler definition from the conductor server
+ *
+ * @param name the name of the event handler to be unregistered
+ */
+ public abstract void unregisterEventHandler(String name);
+}
diff --git a/src/main/java/com/netflix/conductor/client/http/MetadataClient.java b/src/main/java/com/netflix/conductor/client/http/MetadataClient.java
new file mode 100644
index 00000000..87947959
--- /dev/null
+++ b/src/main/java/com/netflix/conductor/client/http/MetadataClient.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2020 Orkes, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.client.http;
+
+import java.util.List;
+
+import com.netflix.conductor.common.metadata.tasks.TaskDef;
+import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
+
+public abstract class MetadataClient {
+
+ /** Creates a default metadata client */
+ public MetadataClient() {
+
+ }
+
+
+ // Workflow Metadata Operations
+
+ /**
+ * Register a workflow definition with the server
+ *
+ * @param workflowDef the workflow definition
+ */
+ public abstract void registerWorkflowDef(WorkflowDef workflowDef);
+
+ /**
+ * Updates a list of existing workflow definitions
+ *
+ * @param workflowDefs List of workflow definitions to be updated
+ */
+ public abstract void updateWorkflowDefs(List workflowDefs);
+
+ /**
+ * Retrieve the workflow definition
+ *
+ * @param name the name of the workflow
+ * @param version the version of the workflow def
+ * @return Workflow definition for the given workflow and version
+ */
+ public abstract WorkflowDef getWorkflowDef(String name, Integer version);
+
+ /**
+ * Removes the workflow definition of a workflow from the conductor server. It does not remove
+ * associated workflows. Use with caution.
+ *
+ * @param name Name of the workflow to be unregistered.
+ * @param version Version of the workflow definition to be unregistered.
+ */
+ public abstract void unregisterWorkflowDef(String name, Integer version);
+
+ // Task Metadata Operations
+
+ /**
+ * Registers a list of task types with the conductor server
+ *
+ * @param taskDefs List of task types to be registered.
+ */
+ public abstract void registerTaskDefs(List taskDefs);
+
+ /**
+ * Updates an existing task definition
+ *
+ * @param taskDef the task definition to be updated
+ */
+ public abstract void updateTaskDef(TaskDef taskDef);
+
+ /**
+ * Retrieve the task definition of a given task type
+ *
+ * @param taskType type of task for which to retrieve the definition
+ * @return Task Definition for the given task type
+ */
+ public abstract TaskDef getTaskDef(String taskType);
+
+ /**
+ * Removes the task definition of a task type from the conductor server. Use with caution.
+ *
+ * @param taskType Task type to be unregistered.
+ */
+ public abstract void unregisterTaskDef(String taskType);
+
+ /**
+ *
+ * @return All the registered task definitions
+ */
+ public abstract List getAllTaskDefs();
+}
diff --git a/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/src/main/java/com/netflix/conductor/client/http/TaskClient.java
new file mode 100644
index 00000000..84cdec8e
--- /dev/null
+++ b/src/main/java/com/netflix/conductor/client/http/TaskClient.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.client.http;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import com.netflix.conductor.common.metadata.tasks.PollData;
+import com.netflix.conductor.common.metadata.tasks.Task;
+import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
+import com.netflix.conductor.common.metadata.tasks.TaskResult;
+import com.netflix.conductor.common.run.SearchResult;
+import com.netflix.conductor.common.run.TaskSummary;
+import com.netflix.conductor.common.utils.ExternalPayloadStorage;
+
+/** Client for conductor task management including polling for task, updating task status etc. */
+public abstract class TaskClient {
+
+
+
+ /** Creates a default task client */
+ public TaskClient() {
+
+ }
+
+ /**
+ * Perform a poll for a task of a specific task type.
+ *
+ * @param taskType The taskType to poll for
+ * @param domain The domain of the task type
+ * @param workerId Name of the client worker. Used for logging.
+ * @return Task waiting to be executed.
+ */
+ public abstract Task pollTask(String taskType, String workerId, String domain);
+ /**
+ * Perform a batch poll for tasks by task type. Batch size is configurable by count.
+ *
+ * @param taskType Type of task to poll for
+ * @param workerId Name of the client worker. Used for logging.
+ * @param count Maximum number of tasks to be returned. Actual number of tasks returned can be
+ * less than this number.
+ * @param timeoutInMillisecond Long poll wait timeout.
+ * @return List of tasks awaiting to be executed.
+ */
+ public abstract List batchPollTasksByTaskType(
+ String taskType, String workerId, int count, int timeoutInMillisecond);
+
+ /**
+ * Batch poll for tasks in a domain. Batch size is configurable by count.
+ *
+ * @param taskType Type of task to poll for
+ * @param domain The domain of the task type
+ * @param workerId Name of the client worker. Used for logging.
+ * @param count Maximum number of tasks to be returned. Actual number of tasks returned can be
+ * less than this number.
+ * @param timeoutInMillisecond Long poll wait timeout.
+ * @return List of tasks awaiting to be executed.
+ */
+ public abstract List batchPollTasksInDomain(
+ String taskType, String domain, String workerId, int count, int timeoutInMillisecond);
+
+ /**
+
+
+ /**
+ * Updates the result of a task execution. If the size of the task output payload is bigger than
+ * {@link ExternalPayloadStorage}, if enabled, else the task is marked as
+ * FAILED_WITH_TERMINAL_ERROR.
+ *
+ * @param taskResult the {@link TaskResult} of the executed task to be updated.
+ */
+ public abstract void updateTask(TaskResult taskResult);
+
+ public abstract Optional evaluateAndUploadLargePayload(
+ Map taskOutputData, String taskType);
+
+ /**
+ * Ack for the task poll.
+ *
+ * @param taskId Id of the task to be polled
+ * @param workerId user identified worker.
+ * @return true if the task was found with the given ID and acknowledged. False otherwise. If
+ * the server returns false, the client should NOT attempt to ack again.
+ */
+ public abstract Boolean ack(String taskId, String workerId);
+
+ /**
+ * Log execution messages for a task.
+ *
+ * @param taskId id of the task
+ * @param logMessage the message to be logged
+ */
+ public abstract void logMessageForTask(String taskId, String logMessage);
+
+ /**
+ * Fetch execution logs for a task.
+ *
+ * @param taskId id of the task.
+ */
+ public abstract List getTaskLogs(String taskId);
+
+ /**
+ * Retrieve information about the task
+ *
+ * @param taskId ID of the task
+ * @return Task details
+ */
+ public abstract Task getTaskDetails(String taskId);
+
+ /**
+ * Removes a task from a taskType queue
+ *
+ * @param taskType the taskType to identify the queue
+ * @param taskId the id of the task to be removed
+ */
+ public abstract void removeTaskFromQueue(String taskType, String taskId);
+
+ public abstract int getQueueSizeForTask(String taskType);
+
+ public abstract int getQueueSizeForTask(
+ String taskType, String domain, String isolationGroupId, String executionNamespace);
+
+ /**
+ * Get last poll data for a given task type
+ *
+ * @param taskType the task type for which poll data is to be fetched
+ * @return returns the list of poll data for the task type
+ */
+ public abstract List getPollData(String taskType);
+
+ /**
+ * Get the last poll data for all task types
+ *
+ * @return returns a list of poll data for all task types
+ */
+ public abstract List getAllPollData();
+
+ /**
+ * Requeue pending tasks for all running workflows
+ *
+ * @return returns the number of tasks that have been requeued
+ */
+ public abstract String requeueAllPendingTasks();
+
+ /**
+ * Requeue pending tasks of a specific task type
+ *
+ * @return returns the number of tasks that have been requeued
+ */
+ public abstract String requeuePendingTasksByTaskType(String taskType);
+ /**
+ * Search for tasks based on payload
+ *
+ * @param query the search string
+ * @return returns the {@link SearchResult} containing the {@link TaskSummary} matching the
+ * query
+ */
+ public abstract SearchResult search(String query);
+
+ /**
+ * Search for tasks based on payload
+ *
+ * @param query the search string
+ * @return returns the {@link SearchResult} containing the {@link Task} matching the query
+ */
+ public abstract SearchResult searchV2(String query);
+
+ /**
+ * Paginated search for tasks based on payload
+ *
+ * @param start start value of page
+ * @param size number of tasks to be returned
+ * @param sort sort order
+ * @param freeText additional free text query
+ * @param query the search query
+ * @return the {@link SearchResult} containing the {@link TaskSummary} that match the query
+ */
+ public abstract SearchResult search(
+ Integer start, Integer size, String sort, String freeText, String query);
+
+ /**
+ * Paginated search for tasks based on payload
+ *
+ * @param start start value of page
+ * @param size number of tasks to be returned
+ * @param sort sort order
+ * @param freeText additional free text query
+ * @param query the search query
+ * @return the {@link SearchResult} containing the {@link Task} that match the query
+ */
+ public abstract SearchResult searchV2(
+ Integer start, Integer size, String sort, String freeText, String query);
+}
diff --git a/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java
new file mode 100644
index 00000000..914c08a9
--- /dev/null
+++ b/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java
@@ -0,0 +1,216 @@
+/*
+ * Copyright 2021 Orkes, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.client.http;
+
+import java.util.List;
+
+import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest;
+import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
+import com.netflix.conductor.common.model.BulkResponse;
+import com.netflix.conductor.common.run.SearchResult;
+import com.netflix.conductor.common.run.Workflow;
+import com.netflix.conductor.common.run.WorkflowSummary;
+import com.netflix.conductor.common.run.WorkflowTestRequest;
+import com.netflix.conductor.common.utils.ExternalPayloadStorage;
+
+public abstract class WorkflowClient {
+
+ /** Creates a default workflow client */
+ public WorkflowClient() {
+
+ }
+
+
+
+ /**
+ * Starts a workflow. If the size of the workflow input payload is bigger than {@link
+ * ExternalPayloadStorage}, if enabled, else the workflow is rejected.
+ *
+ * @param startWorkflowRequest the {@link StartWorkflowRequest} object to start the workflow
+ * @return the id of the workflow instance that can be used for tracking
+ */
+ public abstract String startWorkflow(StartWorkflowRequest startWorkflowRequest);
+
+ /**
+ * Retrieve a workflow by workflow id
+ *
+ * @param workflowId the id of the workflow
+ * @param includeTasks specify if the tasks in the workflow need to be returned
+ * @return the requested workflow
+ */
+ public abstract Workflow getWorkflow(String workflowId, boolean includeTasks);
+
+ /**
+ * Retrieve all workflows for a given correlation id and name
+ *
+ * @param name the name of the workflow
+ * @param correlationId the correlation id
+ * @param includeClosed specify if all workflows are to be returned or only running workflows
+ * @param includeTasks specify if the tasks in the workflow need to be returned
+ * @return list of workflows for the given correlation id and name
+ */
+ public abstract List getWorkflows(
+ String name, String correlationId, boolean includeClosed, boolean includeTasks);
+
+
+ /**
+ * Removes a workflow from the system
+ *
+ * @param workflowId the id of the workflow to be deleted
+ * @param archiveWorkflow flag to indicate if the workflow should be archived before deletion
+ */
+ public abstract void deleteWorkflow(String workflowId, boolean archiveWorkflow);
+
+ /**
+ * Terminates the execution of all given workflows instances
+ *
+ * @param workflowIds the ids of the workflows to be terminated
+ * @param reason the reason to be logged and displayed
+ * @return the {@link BulkResponse} contains bulkErrorResults and bulkSuccessfulResults
+ */
+ public abstract BulkResponse terminateWorkflows(List workflowIds, String reason);
+
+ /**
+ * Retrieve all running workflow instances for a given name and version
+ *
+ * @param workflowName the name of the workflow
+ * @param version the version of the wokflow definition. Defaults to 1.
+ * @return the list of running workflow instances
+ */
+ public abstract List getRunningWorkflow(String workflowName, Integer version);
+
+ /**
+ * Retrieve all workflow instances for a given workflow name between a specific time period
+ *
+ * @param workflowName the name of the workflow
+ * @param version the version of the workflow definition. Defaults to 1.
+ * @param startTime the start time of the period
+ * @param endTime the end time of the period
+ * @return returns a list of workflows created during the specified during the time period
+ */
+ public abstract List getWorkflowsByTimePeriod(
+ String workflowName, int version, Long startTime, Long endTime);
+
+ /**
+ * Starts the decision task for the given workflow instance
+ *
+ * @param workflowId the id of the workflow instance
+ */
+ public abstract void runDecider(String workflowId);
+
+ /**
+ * Pause a workflow by workflow id
+ *
+ * @param workflowId the workflow id of the workflow to be paused
+ */
+ public abstract void pauseWorkflow(String workflowId);
+
+ /**
+ * Resume a paused workflow by workflow id
+ *
+ * @param workflowId the workflow id of the paused workflow
+ */
+ public abstract void resumeWorkflow(String workflowId);
+
+ /**
+ * Skips a given task from a current RUNNING workflow
+ *
+ * @param workflowId the id of the workflow instance
+ * @param taskReferenceName the reference name of the task to be skipped
+ */
+ public abstract void skipTaskFromWorkflow(String workflowId, String taskReferenceName);
+
+ /**
+ * Reruns the workflow from a specific task
+ *
+ * @param workflowId the id of the workflow
+ * @param rerunWorkflowRequest the request containing the task to rerun from
+ * @return the id of the workflow
+ */
+ public abstract String rerunWorkflow(String workflowId, RerunWorkflowRequest rerunWorkflowRequest);
+
+ /**
+ * Restart a completed workflow
+ *
+ * @param workflowId the workflow id of the workflow to be restarted
+ * @param useLatestDefinitions if true, use the latest workflow and task definitions when
+ * restarting the workflow if false, use the workflow and task definitions embedded in the
+ * workflow execution when restarting the workflow
+ */
+ public abstract void restart(String workflowId, boolean useLatestDefinitions);
+
+ /**
+ * Retries the last failed task in a workflow
+ *
+ * @param workflowId the workflow id of the workflow with the failed task
+ */
+ public abstract void retryLastFailedTask(String workflowId);
+
+ /**
+ * Resets the callback times of all IN PROGRESS tasks to 0 for the given workflow
+ *
+ * @param workflowId the id of the workflow
+ */
+ public abstract void resetCallbacksForInProgressTasks(String workflowId);
+
+ /**
+ * Terminates the execution of the given workflow instance
+ *
+ * @param workflowId the id of the workflow to be terminated
+ * @param reason the reason to be logged and displayed
+ */
+ public abstract void terminateWorkflow(String workflowId, String reason);
+
+ /**
+ * Search for workflows based on payload
+ *
+ * @param query the search query
+ * @return the {@link SearchResult} containing the {@link WorkflowSummary} that match the query
+ */
+ public abstract SearchResult search(String query);
+
+ /**
+ * Search for workflows based on payload
+ *
+ * @param query the search query
+ * @return the {@link SearchResult} containing the {@link Workflow} that match the query
+ */
+ public abstract SearchResult searchV2(String query);
+
+ /**
+ * Paginated search for workflows based on payload
+ *
+ * @param start start value of page
+ * @param size number of workflows to be returned
+ * @param sort sort order
+ * @param freeText additional free text query
+ * @param query the search query
+ * @return the {@link SearchResult} containing the {@link WorkflowSummary} that match the query
+ */
+ public abstract SearchResult search(
+ Integer start, Integer size, String sort, String freeText, String query);
+
+ /**
+ * Paginated search for workflows based on payload
+ *
+ * @param start start value of page
+ * @param size number of workflows to be returned
+ * @param sort sort order
+ * @param freeText additional free text query
+ * @param query the search query
+ * @return the {@link SearchResult} containing the {@link Workflow} that match the query
+ */
+ public abstract SearchResult searchV2(
+ Integer start, Integer size, String sort, String freeText, String query);
+ public abstract Workflow testWorkflow(WorkflowTestRequest testRequest);
+}
diff --git a/src/main/java/com/netflix/conductor/client/telemetry/MetricsContainer.java b/src/main/java/com/netflix/conductor/client/telemetry/MetricsContainer.java
new file mode 100644
index 00000000..21bc27e1
--- /dev/null
+++ b/src/main/java/com/netflix/conductor/client/telemetry/MetricsContainer.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2020 Orkes, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.client.telemetry;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.netflix.spectator.api.*;
+import com.netflix.spectator.api.patterns.PolledMeter;
+
+public class MetricsContainer {
+
+ private static final String TASK_TYPE = "taskType";
+ private static final String WORKFLOW_TYPE = "workflowType";
+ private static final String WORKFLOW_VERSION = "version";
+ private static final String EXCEPTION = "exception";
+ private static final String ENTITY_NAME = "entityName";
+ private static final String OPERATION = "operation";
+ private static final String PAYLOAD_TYPE = "payload_type";
+
+ private static final String TASK_EXECUTION_QUEUE_FULL = "task_execution_queue_full";
+ private static final String TASK_POLL_ERROR = "task_poll_error";
+ private static final String TASK_PAUSED = "task_paused";
+ private static final String TASK_EXECUTE_ERROR = "task_execute_error";
+ private static final String TASK_ACK_FAILED = "task_ack_failed";
+ private static final String TASK_ACK_ERROR = "task_ack_error";
+ private static final String TASK_UPDATE_ERROR = "task_update_error";
+ private static final String TASK_POLL_COUNTER = "task_poll_counter";
+ private static final String TASK_EXECUTE_TIME = "task_execute_time";
+ private static final String TASK_POLL_TIME = "task_poll_time";
+ private static final String TASK_RESULT_SIZE = "task_result_size";
+ private static final String WORKFLOW_INPUT_SIZE = "workflow_input_size";
+ private static final String EXTERNAL_PAYLOAD_USED = "external_payload_used";
+ private static final String WORKFLOW_START_ERROR = "workflow_start_error";
+ private static final String THREAD_UNCAUGHT_EXCEPTION = "thread_uncaught_exceptions";
+
+ private static final Registry REGISTRY = Spectator.globalRegistry();
+ private static final Map TIMERS = new ConcurrentHashMap<>();
+ private static final Map COUNTERS = new ConcurrentHashMap<>();
+ private static final Map GAUGES = new ConcurrentHashMap<>();
+ private static final String CLASS_NAME = MetricsContainer.class.getSimpleName();
+
+ private MetricsContainer() {}
+
+ public static Timer getPollTimer(String taskType) {
+ return getTimer(TASK_POLL_TIME, TASK_TYPE, taskType);
+ }
+
+ public static Timer getExecutionTimer(String taskType) {
+ return getTimer(TASK_EXECUTE_TIME, TASK_TYPE, taskType);
+ }
+
+ private static Timer getTimer(String name, String... additionalTags) {
+ String key = CLASS_NAME + "." + name + "." + String.join(",", additionalTags);
+ return TIMERS.computeIfAbsent(
+ key,
+ k -> {
+ List tagList = getTags(additionalTags);
+ tagList.add(new BasicTag("unit", TimeUnit.MILLISECONDS.name()));
+ return REGISTRY.timer(name, tagList);
+ });
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private static List getTags(String[] additionalTags) {
+ List tagList = new ArrayList();
+ tagList.add(new BasicTag("class", CLASS_NAME));
+ for (int j = 0; j < additionalTags.length - 1; j++) {
+ tagList.add(new BasicTag(additionalTags[j], additionalTags[j + 1]));
+ j++;
+ }
+ return tagList;
+ }
+
+ private static void incrementCount(String name, String... additionalTags) {
+ getCounter(name, additionalTags).increment();
+ }
+
+ private static Counter getCounter(String name, String... additionalTags) {
+ String key = CLASS_NAME + "." + name + "." + String.join(",", additionalTags);
+ return COUNTERS.computeIfAbsent(
+ key,
+ k -> {
+ List tags = getTags(additionalTags);
+ return REGISTRY.counter(name, tags);
+ });
+ }
+
+ private static AtomicLong getGauge(String name, String... additionalTags) {
+ String key = CLASS_NAME + "." + name + "." + String.join(",", additionalTags);
+ return GAUGES.computeIfAbsent(
+ key,
+ pollTimer -> {
+ Id id = REGISTRY.createId(name, getTags(additionalTags));
+ return PolledMeter.using(REGISTRY).withId(id).monitorValue(new AtomicLong(0));
+ });
+ }
+
+ public static void incrementTaskExecutionQueueFullCount(String taskType) {
+ incrementCount(TASK_EXECUTION_QUEUE_FULL, TASK_TYPE, taskType);
+ }
+
+ public static void incrementUncaughtExceptionCount() {
+ incrementCount(THREAD_UNCAUGHT_EXCEPTION);
+ }
+
+ public static void incrementTaskPollErrorCount(String taskType, Exception e) {
+ incrementCount(
+ TASK_POLL_ERROR, TASK_TYPE, taskType, EXCEPTION, e.getClass().getSimpleName());
+ }
+
+ public static void incrementTaskPausedCount(String taskType) {
+ incrementCount(TASK_PAUSED, TASK_TYPE, taskType);
+ }
+
+ public static void incrementTaskExecutionErrorCount(String taskType, Throwable e) {
+ incrementCount(
+ TASK_EXECUTE_ERROR, TASK_TYPE, taskType, EXCEPTION, e.getClass().getSimpleName());
+ }
+
+ public static void incrementTaskAckFailedCount(String taskType) {
+ incrementCount(TASK_ACK_FAILED, TASK_TYPE, taskType);
+ }
+
+ public static void incrementTaskAckErrorCount(String taskType, Exception e) {
+ incrementCount(
+ TASK_ACK_ERROR, TASK_TYPE, taskType, EXCEPTION, e.getClass().getSimpleName());
+ }
+
+ public static void recordTaskResultPayloadSize(String taskType, long payloadSize) {
+ getGauge(TASK_RESULT_SIZE, TASK_TYPE, taskType).getAndSet(payloadSize);
+ }
+
+ public static void incrementTaskUpdateErrorCount(String taskType, Throwable t) {
+ incrementCount(
+ TASK_UPDATE_ERROR, TASK_TYPE, taskType, EXCEPTION, t.getClass().getSimpleName());
+ }
+
+ public static void incrementTaskPollCount(String taskType, int taskCount) {
+ getCounter(TASK_POLL_COUNTER, TASK_TYPE, taskType).increment(taskCount);
+ }
+
+ public static void recordWorkflowInputPayloadSize(
+ String workflowType, String version, long payloadSize) {
+ getGauge(WORKFLOW_INPUT_SIZE, WORKFLOW_TYPE, workflowType, WORKFLOW_VERSION, version)
+ .getAndSet(payloadSize);
+ }
+
+ public static void incrementExternalPayloadUsedCount(
+ String name, String operation, String payloadType) {
+ incrementCount(
+ EXTERNAL_PAYLOAD_USED,
+ ENTITY_NAME,
+ name,
+ OPERATION,
+ operation,
+ PAYLOAD_TYPE,
+ payloadType);
+ }
+
+ public static void incrementWorkflowStartErrorCount(String workflowType, Throwable t) {
+ incrementCount(
+ WORKFLOW_START_ERROR,
+ WORKFLOW_TYPE,
+ workflowType,
+ EXCEPTION,
+ t.getClass().getSimpleName());
+ }
+}
diff --git a/src/main/java/com/netflix/conductor/client/worker/Worker.java b/src/main/java/com/netflix/conductor/client/worker/Worker.java
new file mode 100644
index 00000000..72207087
--- /dev/null
+++ b/src/main/java/com/netflix/conductor/client/worker/Worker.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2021 Orkes, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *