Skip to content

Commit

Permalink
addressed some comments
Browse files Browse the repository at this point in the history
Signed-off-by: Junwei Dai <[email protected]>
  • Loading branch information
Junwei Dai committed Jan 15, 2025
1 parent 257564a commit eac9355
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@ private void createExecute(WorkflowRequest request, User user, ActionListener<Wo
}
}
String workflowId = request.getWorkflowId();
TimeValue waitForTimeCompletion;
if (request.getParams().containsKey(WAIT_FOR_COMPLETION_TIMEOUT)) {
waitForTimeCompletion = TimeValue.parseTimeValue(
request.getParams().get(WAIT_FOR_COMPLETION_TIMEOUT),
WAIT_FOR_COMPLETION_TIMEOUT
);
} else {
// default to minus one indicate async execution
waitForTimeCompletion = TimeValue.MINUS_ONE;
}
if (workflowId == null) {
// This is a new workflow (POST)
// Throttle incoming requests
Expand Down Expand Up @@ -249,14 +259,6 @@ private void createExecute(WorkflowRequest request, User user, ActionListener<Wo
ActionListener.wrap(stateResponse -> {
logger.info("Creating state workflow doc: {}", globalContextResponse.getId());
if (request.isProvision()) {
// default to minus one indicate async execution
TimeValue waitForTimeCompletion = TimeValue.MINUS_ONE;
if (request.getParams().containsKey(WAIT_FOR_COMPLETION_TIMEOUT)) {
waitForTimeCompletion = TimeValue.parseTimeValue(
request.getParams().get(WAIT_FOR_COMPLETION_TIMEOUT),
WAIT_FOR_COMPLETION_TIMEOUT
);
}
WorkflowRequest workflowRequest = new WorkflowRequest(
globalContextResponse.getId(),
null,
Expand Down Expand Up @@ -363,14 +365,6 @@ private void createExecute(WorkflowRequest request, User user, ActionListener<Wo
.build();

if (request.isReprovision()) {
// default to minus one indicate async execution
TimeValue waitForTimeCompletion = TimeValue.MINUS_ONE;
if (request.getParams().containsKey(WAIT_FOR_COMPLETION_TIMEOUT)) {
waitForTimeCompletion = TimeValue.parseTimeValue(
request.getParams().get(WAIT_FOR_COMPLETION_TIMEOUT),
WAIT_FOR_COMPLETION_TIMEOUT
);
}
// Reprovision request
ReprovisionWorkflowRequest reprovisionRequest = new ReprovisionWorkflowRequest(
getResponse.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ public ReprovisionWorkflowRequest(StreamInput in) throws IOException {
this.workflowId = in.readString();
this.originalTemplate = Template.parse(in.readString());
this.updatedTemplate = Template.parse(in.readString());
// todo:change to 2.19
if (in.getVersion().onOrAfter(Version.CURRENT)) {
if (in.getVersion().onOrAfter(Version.V_2_19_0)) {
this.waitForCompletionTimeout = in.readTimeValue();
}

Expand All @@ -83,8 +82,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(workflowId);
out.writeString(originalTemplate.toJson());
out.writeString(updatedTemplate.toJson());
// todo:change to 2.19
if (out.getVersion().onOrAfter(Version.CURRENT)) {
if (out.getVersion().onOrAfter(Version.V_2_19_0)) {
out.writeTimeValue(waitForCompletionTimeout);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public class WorkflowRequest extends ActionRequest {

/**
* The timeout duration to wait for workflow completion.
* If null, the request will respond immediately with the workflowId.
* default set to -1, the request will respond immediately with the workflowId,
* indicating asynchronous execution.
*/
@Nullable
private TimeValue waitForCompletionTimeout;

/**
Expand Down Expand Up @@ -139,7 +139,7 @@ public WorkflowRequest(
* @param provisionOrUpdate provision or updateFields flag. Only one may be true, the presence of update_fields key in map indicates if updating fields, otherwise true means it's provisioning.
* @param params map of REST path params. If provisionOrUpdate is false, must be an empty map. If update_fields key is present, must be only key.
* @param reprovision flag to indicate if request is to reprovision
* @param waitForCompletionTimeout the timeout duration (in milliseconds) to wait for workflow completion
* @param waitForCompletionTimeout the timeout duration to wait for workflow completion
*/
public WorkflowRequest(
@Nullable String workflowId,
Expand Down Expand Up @@ -187,8 +187,7 @@ public WorkflowRequest(StreamInput in) throws IOException {
this.params = Collections.emptyMap();
}
this.reprovision = !provision && Boolean.parseBoolean(params.get(REPROVISION_WORKFLOW));
// todo:change to 2.19
if (in.getVersion().onOrAfter(Version.CURRENT)) {
if (in.getVersion().onOrAfter(Version.V_2_19_0)) {
this.waitForCompletionTimeout = in.readOptionalTimeValue();
}

Expand Down Expand Up @@ -275,8 +274,7 @@ public void writeTo(StreamOutput out) throws IOException {
} else if (reprovision) {
out.writeMap(Map.of(REPROVISION_WORKFLOW, "true"), StreamOutput::writeString, StreamOutput::writeString);
}
// todo: change to 2.19
if (out.getVersion().onOrAfter(Version.CURRENT)) {
if (out.getVersion().onOrAfter(Version.V_2_19_0)) {
out.writeOptionalTimeValue(waitForCompletionTimeout);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ public WorkflowResponse(String workflowId) {
public WorkflowResponse(StreamInput in) throws IOException {
super(in);
this.workflowId = in.readString();
// todo : change version to 2_19_0
if (in.getVersion().onOrAfter(Version.CURRENT)) {
if (in.getVersion().onOrAfter(Version.V_2_19_0)) {
this.workflowState = in.readOptionalWriteable(WorkflowState::new);
}

Expand Down Expand Up @@ -95,8 +94,7 @@ public WorkflowResponse(String workflowId, WorkflowState workflowState) {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(workflowId);
// todo : change version to 2_19_0
if (out.getVersion().onOrAfter(Version.CURRENT)) {
if (out.getVersion().onOrAfter(Version.V_2_19_0)) {
out.writeOptionalWriteable(workflowState);
}
}
Expand Down

0 comments on commit eac9355

Please sign in to comment.