Skip to content

Commit

Permalink
Merge pull request #39 from orkes-io/sync_apis
Browse files Browse the repository at this point in the history
APIs for synchronous workflows
  • Loading branch information
v1r3n authored Oct 3, 2022
2 parents 784e630 + 890a669 commit 9c008d5
Show file tree
Hide file tree
Showing 8 changed files with 647 additions and 24 deletions.
51 changes: 49 additions & 2 deletions src/main/java/io/orkes/conductor/client/ApiClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@

import javax.net.ssl.*;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.LocalDate;
Expand All @@ -53,6 +51,7 @@
import io.orkes.conductor.client.http.auth.OAuth;
import io.orkes.conductor.client.model.GenerateTokenRequest;

import com.google.gson.GsonBuilder;
import com.squareup.okhttp.*;
import com.squareup.okhttp.internal.http.HttpMethod;
import okio.BufferedSink;
Expand Down Expand Up @@ -162,6 +161,10 @@ public ApiClient setHttpClient(OkHttpClient httpClient) {
return this;
}

public void shutdown() {
this.httpClient.getDispatcher().getExecutorService().shutdown();
}

/**
* Get JSON
*
Expand Down Expand Up @@ -889,6 +892,50 @@ public <T> T handleResponse(Response response, Type returnType) throws ApiExcept
}
}

/**
* {@link #executeAsync(Call, Type, ApiCallback)}
*
* @param <T> Type
* @param call An instance of the Call object
* @param callback ApiCallback&lt;T&gt;
*/
public <T> void executeAsync(Call call, ApiCallback<T> callback) {
executeAsync(call, null, callback);
}

/**
* Execute HTTP call asynchronously.
*
* @see #execute(Call, Type)
* @param <T> Type
* @param call The callback to be executed when the API call finishes
* @param returnType Return type
* @param callback ApiCallback
*/
@SuppressWarnings("unchecked")
public <T> void executeAsync(Call call, final Type returnType, final ApiCallback<T> callback) {
call.enqueue(
new Callback() {
@Override
public void onFailure(Request request, IOException e) {
callback.onFailure(new ApiException(e), 0, null);
}

@Override
public void onResponse(Response response) throws IOException {
T result;
try {
result = (T) handleResponse(response, returnType);
} catch (ApiException e) {
callback.onFailure(e, response.code(), response.headers().toMultimap());
return;
}
callback.onSuccess(
result, response.code(), response.headers().toMultimap());
}
});
}

/**
* Build HTTP call with the given options.
*
Expand Down
168 changes: 161 additions & 7 deletions src/main/java/io/orkes/conductor/client/WorkflowClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package io.orkes.conductor.client;

import java.util.List;
import java.util.concurrent.CompletableFuture;

import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest;
import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
Expand All @@ -21,55 +22,211 @@
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;

import io.orkes.conductor.client.model.WorkflowRun;
import io.orkes.conductor.client.model.WorkflowStatus;

public interface WorkflowClient {
/**
* Start a new workflow execution
*
* @param startWorkflowRequest start workflow request
* @return Id of the workflow that was started. {@link #getWorkflow(String, boolean)} can be
* used to get the status of this workflow execution
*/
String startWorkflow(StartWorkflowRequest startWorkflowRequest);

/**
* Starts a new workflow and waits until it completes.
*
* @param startWorkflowRequest start workflow request
* @param waitUntilTaskRef Wait until the task identified by the reference name completes and
* return back the results. Useful when part of the workflow needs to be synchronous and
* rest can be monitored asynchronously.
* @return CompletableFuture of a WorkflowRun for the started workflow
*/
CompletableFuture<WorkflowRun> executeWorkflow(
StartWorkflowRequest startWorkflowRequest, String waitUntilTaskRef);

/**
* Get the workflow execution
*
* @param workflowId Id of the workflow
* @param includeTasks if false, no tasks are returned only input, output and variables.
* @return Workflow if a workflow exists with given workflowId
*/
Workflow getWorkflow(String workflowId, boolean includeTasks);

/**
* Return workflows matching the correlation id
*
* @param name workflow name
* @param correlationId correlation id
* @param includeCompleted if the result should include
* @param includeTasks
* @return List of Workflows matching name and correlationId
*/
List<Workflow> getWorkflows(
String name, String correlationId, boolean includeClosed, boolean includeTasks);

void populateWorkflowOutput(Workflow workflow);

String name, String correlationId, boolean includeCompleted, boolean includeTasks);

/**
* Removes the workflow from the system permanently
*
* @param workflowId
* @param archiveWorkflow
*/
void deleteWorkflow(String workflowId, boolean archiveWorkflow);

/**
* Terminates the workflows
*
* @param workflowIds
* @param reason
* @return BulkResponse of terminated workflows
*/
BulkResponse terminateWorkflows(List<String> workflowIds, String reason);

/**
* Return currently running workflow ids
*
* @param workflowName
* @param version
* @return List of running workflows matching workflowName and version
*/
List<String> getRunningWorkflow(String workflowName, Integer version);

/**
* Return the ids of the workflow that were started in the given time period
*
* @param workflowName
* @param version
* @param startTime
* @param endTime
* @return List of workflowIds
*/
List<String> getWorkflowsByTimePeriod(
String workflowName, int version, Long startTime, Long endTime);

/**
* Forces workflow state evaluation. Use with Caution, meant only for advanced use cases.
*
* @param workflowId
*/
void runDecider(String workflowId);

/**
* Pause a running workflow. No New tasks will be scheduled until resume is called
*
* @param workflowId
*/
void pauseWorkflow(String workflowId);

/**
* Resume a paused workflow
*
* @param workflowId
*/
void resumeWorkflow(String workflowId);

/**
* Skip one Task from a Workflow. If there exists a taskReferenceName for given workflowId.
*
* @param workflowId
* @param taskReferenceName
*/
void skipTaskFromWorkflow(String workflowId, String taskReferenceName);

/**
* Rerun a Workflow. If there exists a workflowId, rerun it given a RerunWorkflowRequest
*
* @param workflowId
* @param rerunWorkflowRequest
* @return WorkflowId of started Workflow
*/
String rerunWorkflow(String workflowId, RerunWorkflowRequest rerunWorkflowRequest);

/**
* Restart a workflow with given a workflowId
*
* @param workflowId
* @param useLatestDefinitions
*/
void restart(String workflowId, boolean useLatestDefinitions);

/**
* Retry last failed task from a workflow given a workflowId
*
* @param workflowId
*/
void retryLastFailedTask(String workflowId);

/**
* Reset the Callback for tasks in progress given a workflowId
*
* @param workflowId
*/
void resetCallbacksForInProgressTasks(String workflowId);

/**
* Terminate a Workflow given workflowId and set a reason
*
* @param workflowId
* @param reason
*/
void terminateWorkflow(String workflowId, String reason);

/**
* Search for workflows matching query
*
* @param query
* @return SearchResult of WorkflowSummary
*/
SearchResult<WorkflowSummary> search(String query);

/**
* Search for workflows matching query
*
* @param query
* @return SearchResult of Workflow
*/
SearchResult<Workflow> searchV2(String query);

/**
* Search for workflows matching query
*
* @param start
* @param size
* @param sort
* @param freeText
* @param query
* @return SearchResult of WorkflowSummary
*/
SearchResult<WorkflowSummary> search(
Integer start, Integer size, String sort, String freeText, String query);

/**
* Search for workflows matching query
*
* @param start
* @param size
* @param sort
* @param freeText
* @param query
* @return SearchResult of Workflow
*/
SearchResult<Workflow> searchV2(
Integer start, Integer size, String sort, String freeText, String query);

/**
* Get the WorkflowStatus of a Workflow given workflowId
*
* @param workflowId
* @param includeOutput
* @param includeVariables
* @return WorkflowStatus of the workflow
*/
WorkflowStatus getWorkflowStatusSummary(
String workflowId, Boolean includeOutput, Boolean includeVariables);

// Bulk operations
BulkResponse pauseWorkflow(List<String> workflowIds);

Expand All @@ -80,7 +237,4 @@ SearchResult<Workflow> searchV2(
BulkResponse retryWorkflow(List<String> workflowIds);

BulkResponse terminateWorkflow(List<String> workflowIds, String reason);

WorkflowStatus getWorkflowStatusSummary(
String workflowId, Boolean includeOutput, Boolean includeVariables);
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ public String getResponseBody() {

@Override
public String getMessage() {
return getCode() + ":" + responseBody;
return getCode() + ":" + responseBody != null ? responseBody : super.getMessage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
package io.orkes.conductor.client.http;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import org.apache.commons.lang.StringUtils;

Expand All @@ -26,8 +28,10 @@

import io.orkes.conductor.client.ApiClient;
import io.orkes.conductor.client.WorkflowClient;
import io.orkes.conductor.client.http.api.AsyncApiCallback;
import io.orkes.conductor.client.http.api.WorkflowBulkResourceApi;
import io.orkes.conductor.client.http.api.WorkflowResourceApi;
import io.orkes.conductor.client.model.WorkflowRun;
import io.orkes.conductor.client.model.WorkflowStatus;

import com.google.common.base.Preconditions;
Expand All @@ -49,6 +53,22 @@ public String startWorkflow(StartWorkflowRequest startWorkflowRequest) {
return httpClient.startWorkflow(startWorkflowRequest);
}

@Override
public CompletableFuture<WorkflowRun> executeWorkflow(
StartWorkflowRequest startWorkflowRequest, String waitUntilTaskRef) {
CompletableFuture<WorkflowRun> future = new CompletableFuture<>();
AsyncApiCallback<WorkflowRun> callback = new AsyncApiCallback<>(future);
String requestId = UUID.randomUUID().toString();
httpClient.executeWorkflowAsync(
startWorkflowRequest,
startWorkflowRequest.getName(),
startWorkflowRequest.getVersion(),
waitUntilTaskRef,
requestId,
callback);
return future;
}

@Override
public Workflow getWorkflow(String workflowId, boolean includeTasks) {
return httpClient.getExecutionStatus(workflowId, includeTasks);
Expand All @@ -61,11 +81,6 @@ public List<Workflow> getWorkflows(
name, correlationId, includeClosed, includeTasks);
}

@Override
public void populateWorkflowOutput(Workflow workflow) {
throw new UnsupportedOperationException("External storage upload is not supported yet!");
}

@Override
public void deleteWorkflow(String workflowId, boolean archiveWorkflow) {
httpClient.delete(workflowId, archiveWorkflow);
Expand Down
Loading

0 comments on commit 9c008d5

Please sign in to comment.