diff --git a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java index 3de1d198..b106b05f 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java @@ -151,9 +151,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli // Ensure wait_for_completion is not set unless reprovision or provision is true if (waitForCompletionTimeout != TimeValue.MINUS_ONE && !(reprovision || provision)) { FlowFrameworkException ffe = new FlowFrameworkException( - "Request parameters " - + request.consumedParams() - + " are not allowed unless the 'provision' or 'reprovision' parameter is set to true.", + "Request parameters 'wait_for_completion_timeout' are not allowed unless the 'provision' or 'reprovision' parameter is set to true.", RestStatus.BAD_REQUEST ); return processError(ffe, params, request); diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index 317ae0b2..a7fa3b7d 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -250,13 +250,19 @@ private void createExecute(WorkflowRequest request, User user, ActionListener { logger.info("Creating state workflow doc: {}", globalContextResponse.getId()); if (request.isProvision()) { - String waitForTimeCompletion = request.getParams() - .getOrDefault(WAIT_FOR_COMPLETION_TIMEOUT, TimeValue.MINUS_ONE.toString()); + // default to minus one indicate async execution + TimeValue waitForTimeCompletion = TimeValue.MINUS_ONE; + if (request.getParams().containsKey(WAIT_FOR_COMPLETION_TIMEOUT)) { + waitForTimeCompletion = TimeValue.parseTimeValue( + request.getParams().get(WAIT_FOR_COMPLETION_TIMEOUT), + WAIT_FOR_COMPLETION_TIMEOUT + ); + } WorkflowRequest workflowRequest = new WorkflowRequest( globalContextResponse.getId(), null, request.getParams(), - TimeValue.parseTimeValue(waitForTimeCompletion, PROVISION_TIMEOUT_FIELD) + waitForTimeCompletion ); logger.info( "Provisioning parameter is set, continuing to provision workflow {}", @@ -358,14 +364,20 @@ private void createExecute(WorkflowRequest request, User user, ActionListener scheduleTimeoutHandler( AtomicBoolean isResponseSent ) { // Ensure timeout is within the valid range (non-negative) - long adjustedTimeout = Math.max(timeout, MIN_TIMEOUT_MILLIS.millis()); + long adjustedTimeout = Math.max(timeout, TimeValue.timeValueMillis(0).millis()); Scheduler.ScheduledCancellable scheduledCancellable = threadPool.schedule( new WorkflowTimeoutListener(client, workflowId, listener, isResponseSent), TimeValue.timeValueMillis(adjustedTimeout), @@ -83,6 +82,7 @@ private static class WorkflowTimeoutListener implements Runnable { @Override public void run() { + // This AtomicBoolean ensures that the timeout logic is executed only once, preventing duplicate responses. if (isResponseSent.compareAndSet(false, true)) { logger.warn("Workflow execution timed out for workflowId: {}", workflowId); fetchWorkflowStateAfterTimeout(client, workflowId, listener); @@ -107,6 +107,7 @@ public static ActionListener wrapWithTimeoutCancellationLis return new ActionListener<>() { @Override public void onResponse(Response response) { + // Cancel the timeout task if the response is successfully sent. if (isResponseSent.compareAndSet(false, true)) { scheduledCancellable.cancel(); } @@ -115,6 +116,7 @@ public void onResponse(Response response) { @Override public void onFailure(Exception e) { + // Cancel the timeout task if an error occurs and the failure is reported. if (isResponseSent.compareAndSet(false, true)) { scheduledCancellable.cancel(); } @@ -137,6 +139,7 @@ public static void handleResponse( AtomicBoolean isResponseSent, ActionListener listener ) { + // Check if the response has already been sent, and send it only if it hasn't been sent yet. if (isResponseSent.compareAndSet(false, true)) { listener.onResponse(new WorkflowResponse(workflowResponse.getWorkflowId(), workflowResponse.getWorkflowState())); } else { @@ -158,6 +161,7 @@ public static void handleFailure( AtomicBoolean isResponseSent, ActionListener listener ) { + // Check if the failure has already been reported, and report it only if it hasn't been reported yet. if (isResponseSent.compareAndSet(false, true)) { FlowFrameworkException exception = new FlowFrameworkException( "Failed to execute workflow " + workflowId,