Skip to content

Commit

Permalink
Add comments to explain AtomicBoolean usage in WorkflowTimeoutUtility…
Browse files Browse the repository at this point in the history
…, update error message

Signed-off-by: Junwei Dai <[email protected]>
  • Loading branch information
Junwei Dai committed Jan 15, 2025
1 parent 18a1dbb commit 6e8f147
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
// Ensure wait_for_completion is not set unless reprovision or provision is true
if (waitForCompletionTimeout != TimeValue.MINUS_ONE && !(reprovision || provision)) {
FlowFrameworkException ffe = new FlowFrameworkException(
"Request parameters "
+ request.consumedParams()
+ " are not allowed unless the 'provision' or 'reprovision' parameter is set to true.",
"Request parameters 'wait_for_completion_timeout' are not allowed unless the 'provision' or 'reprovision' parameter is set to true.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,19 @@ private void createExecute(WorkflowRequest request, User user, ActionListener<Wo
ActionListener.wrap(stateResponse -> {
logger.info("Creating state workflow doc: {}", globalContextResponse.getId());
if (request.isProvision()) {
String waitForTimeCompletion = request.getParams()
.getOrDefault(WAIT_FOR_COMPLETION_TIMEOUT, TimeValue.MINUS_ONE.toString());
// default to minus one indicate async execution
TimeValue waitForTimeCompletion = TimeValue.MINUS_ONE;
if (request.getParams().containsKey(WAIT_FOR_COMPLETION_TIMEOUT)) {
waitForTimeCompletion = TimeValue.parseTimeValue(
request.getParams().get(WAIT_FOR_COMPLETION_TIMEOUT),
WAIT_FOR_COMPLETION_TIMEOUT
);
}
WorkflowRequest workflowRequest = new WorkflowRequest(
globalContextResponse.getId(),
null,
request.getParams(),
TimeValue.parseTimeValue(waitForTimeCompletion, PROVISION_TIMEOUT_FIELD)
waitForTimeCompletion
);
logger.info(
"Provisioning parameter is set, continuing to provision workflow {}",
Expand Down Expand Up @@ -358,14 +364,20 @@ private void createExecute(WorkflowRequest request, User user, ActionListener<Wo
.build();

if (request.isReprovision()) {
String waitForTimeCompletion = request.getParams()
.getOrDefault(WAIT_FOR_COMPLETION_TIMEOUT, TimeValue.MINUS_ONE.toString());
// default to minus one indicate async execution
TimeValue waitForTimeCompletion = TimeValue.MINUS_ONE;
if (request.getParams().containsKey(WAIT_FOR_COMPLETION_TIMEOUT)) {
waitForTimeCompletion = TimeValue.parseTimeValue(
request.getParams().get(WAIT_FOR_COMPLETION_TIMEOUT),
WAIT_FOR_COMPLETION_TIMEOUT
);
}
// Reprovision request
ReprovisionWorkflowRequest reprovisionRequest = new ReprovisionWorkflowRequest(
getResponse.getId(),
existingTemplate,
template,
TimeValue.parseTimeValue(waitForTimeCompletion, PROVISION_TIMEOUT_FIELD)
waitForTimeCompletion
);
logger.info("Reprovisioning parameter is set, continuing to reprovision workflow {}", getResponse.getId());
client.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
public class WorkflowTimeoutUtility {

private static final Logger logger = LogManager.getLogger(WorkflowTimeoutUtility.class);
private static final TimeValue MIN_TIMEOUT_MILLIS = TimeValue.timeValueSeconds(0);

/**
* Schedules a timeout task for a workflow execution.
Expand All @@ -55,7 +54,7 @@ public static ActionListener<WorkflowResponse> scheduleTimeoutHandler(
AtomicBoolean isResponseSent
) {
// Ensure timeout is within the valid range (non-negative)
long adjustedTimeout = Math.max(timeout, MIN_TIMEOUT_MILLIS.millis());
long adjustedTimeout = Math.max(timeout, TimeValue.timeValueMillis(0).millis());
Scheduler.ScheduledCancellable scheduledCancellable = threadPool.schedule(
new WorkflowTimeoutListener(client, workflowId, listener, isResponseSent),
TimeValue.timeValueMillis(adjustedTimeout),
Expand Down Expand Up @@ -83,6 +82,7 @@ private static class WorkflowTimeoutListener implements Runnable {

@Override
public void run() {
// This AtomicBoolean ensures that the timeout logic is executed only once, preventing duplicate responses.
if (isResponseSent.compareAndSet(false, true)) {
logger.warn("Workflow execution timed out for workflowId: {}", workflowId);
fetchWorkflowStateAfterTimeout(client, workflowId, listener);
Expand All @@ -107,6 +107,7 @@ public static <Response> ActionListener<Response> wrapWithTimeoutCancellationLis
return new ActionListener<>() {
@Override
public void onResponse(Response response) {
// Cancel the timeout task if the response is successfully sent.
if (isResponseSent.compareAndSet(false, true)) {
scheduledCancellable.cancel();
}
Expand All @@ -115,6 +116,7 @@ public void onResponse(Response response) {

@Override
public void onFailure(Exception e) {
// Cancel the timeout task if an error occurs and the failure is reported.
if (isResponseSent.compareAndSet(false, true)) {
scheduledCancellable.cancel();
}
Expand All @@ -137,6 +139,7 @@ public static void handleResponse(
AtomicBoolean isResponseSent,
ActionListener<WorkflowResponse> listener
) {
// Check if the response has already been sent, and send it only if it hasn't been sent yet.
if (isResponseSent.compareAndSet(false, true)) {
listener.onResponse(new WorkflowResponse(workflowResponse.getWorkflowId(), workflowResponse.getWorkflowState()));
} else {
Expand All @@ -158,6 +161,7 @@ public static void handleFailure(
AtomicBoolean isResponseSent,
ActionListener<WorkflowResponse> listener
) {
// Check if the failure has already been reported, and report it only if it hasn't been reported yet.
if (isResponseSent.compareAndSet(false, true)) {
FlowFrameworkException exception = new FlowFrameworkException(
"Failed to execute workflow " + workflowId,
Expand Down

0 comments on commit 6e8f147

Please sign in to comment.