diff --git a/CHANGELOG.md b/CHANGELOG.md index 2fa7d0d69..bbe94207d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,3 +26,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Maintenance ### Refactoring +- Update workflow state without using painless script ([#894](https://github.com/opensearch-project/flow-framework/pull/894)) diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 02ef8a825..cb2dee56f 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -34,6 +34,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.flowframework.exception.FlowFrameworkException; @@ -45,12 +46,13 @@ import org.opensearch.flowframework.util.EncryptorUtils; import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.flowframework.workflow.WorkflowData; -import org.opensearch.script.Script; -import org.opensearch.script.ScriptType; +import org.opensearch.index.engine.VersionConflictEngineException; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -80,6 +82,8 @@ public class FlowFrameworkIndicesHandler { private static final Map indexMappingUpdated = new HashMap<>(); private static final Map indexSettings = Map.of("index.auto_expand_replicas", "0-1"); private final NamedXContentRegistry xContentRegistry; + // Retries in case of simultaneous updates + private static final int RETRIES = 5; /** * constructor @@ -576,14 +580,14 @@ public void canDeleteWorkflowStateDoc( } /** - * Updates a document in the workflow state index + * Updates a complete document in the workflow state index * @param documentId the document ID - * @param updatedFields the fields to update the global state index with + * @param updatedDocument a complete document to update the global state index with * @param listener action listener */ public void updateFlowFrameworkSystemIndexDoc( String documentId, - Map updatedFields, + ToXContentObject updatedDocument, ActionListener listener ) { if (!doesIndexExist(WORKFLOW_STATE_INDEX)) { @@ -593,11 +597,11 @@ public void updateFlowFrameworkSystemIndexDoc( } else { try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, documentId); - Map updatedContent = new HashMap<>(updatedFields); - updateRequest.doc(updatedContent); + XContentBuilder builder = XContentFactory.jsonBuilder(); + updatedDocument.toXContent(builder, null); + updateRequest.doc(builder); updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - updateRequest.retryOnConflict(5); - // TODO: decide what condition can be considered as an update conflict and add retry strategy + updateRequest.retryOnConflict(RETRIES); client.update(updateRequest, ActionListener.runBefore(listener, context::restore)); } catch (Exception e) { String errorMessage = "Failed to update " + WORKFLOW_STATE_INDEX + " entry : " + documentId; @@ -608,22 +612,31 @@ public void updateFlowFrameworkSystemIndexDoc( } /** - * Deletes a document in the workflow state index + * Updates a partial document in the workflow state index * @param documentId the document ID + * @param updatedFields the fields to update the global state index with * @param listener action listener */ - public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener listener) { + public void updateFlowFrameworkSystemIndexDoc( + String documentId, + Map updatedFields, + ActionListener listener + ) { if (!doesIndexExist(WORKFLOW_STATE_INDEX)) { - String errorMessage = "Failed to delete document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index"; + String errorMessage = "Failed to update document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index"; logger.error(errorMessage); listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); } else { try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - DeleteRequest deleteRequest = new DeleteRequest(WORKFLOW_STATE_INDEX, documentId); - deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore)); + UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, documentId); + Map updatedContent = new HashMap<>(updatedFields); + updateRequest.doc(updatedContent); + updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + updateRequest.retryOnConflict(RETRIES); + // TODO: decide what condition can be considered as an update conflict and add retry strategy + client.update(updateRequest, ActionListener.runBefore(listener, context::restore)); } catch (Exception e) { - String errorMessage = "Failed to delete " + WORKFLOW_STATE_INDEX + " entry : " + documentId; + String errorMessage = "Failed to update " + WORKFLOW_STATE_INDEX + " entry : " + documentId; logger.error(errorMessage, e); listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); } @@ -631,76 +644,28 @@ public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener< } /** - * Updates a document in the workflow state index - * @param indexName the index that we will be updating a document of. + * Deletes a document in the workflow state index * @param documentId the document ID - * @param script the given script to update doc * @param listener action listener */ - public void updateFlowFrameworkSystemIndexDocWithScript( - String indexName, - String documentId, - Script script, - ActionListener listener - ) { - if (!doesIndexExist(indexName)) { - String errorMessage = "Failed to update document for given workflow due to missing " + indexName + " index"; + public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener listener) { + if (!doesIndexExist(WORKFLOW_STATE_INDEX)) { + String errorMessage = "Failed to delete document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index"; logger.error(errorMessage); - listener.onFailure(new Exception(errorMessage)); + listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); } else { try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - UpdateRequest updateRequest = new UpdateRequest(indexName, documentId); - // TODO: Also add ability to change other fields at the same time when adding detailed provision progress - updateRequest.script(script); - updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - updateRequest.retryOnConflict(3); - // TODO: Implement our own concurrency control to improve on retry mechanism - client.update(updateRequest, ActionListener.runBefore(listener, context::restore)); + DeleteRequest deleteRequest = new DeleteRequest(WORKFLOW_STATE_INDEX, documentId); + deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore)); } catch (Exception e) { - String errorMessage = "Failed to update " + indexName + " entry : " + documentId; + String errorMessage = "Failed to delete " + WORKFLOW_STATE_INDEX + " entry : " + documentId; logger.error(errorMessage, e); listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); } } } - /** - * Creates a new ResourceCreated object and a script to update the state index - * @param workflowId workflowId for the relevant step - * @param nodeId current process node (workflow step) id - * @param workflowStepName the workflowstep name that created the resource - * @param resourceId the id of the newly created resource - * @param listener the ActionListener for this step to handle completing the future after update - * @throws IOException if parsing fails on new resource - */ - private void updateResourceInStateIndex( - String workflowId, - String nodeId, - String workflowStepName, - String resourceId, - ActionListener listener - ) throws IOException { - ResourceCreated newResource = new ResourceCreated( - workflowStepName, - nodeId, - getResourceByWorkflowStep(workflowStepName), - resourceId - ); - - // The script to append a new object to the resources_created array - Script script = new Script( - ScriptType.INLINE, - "painless", - "ctx._source.resources_created.add(params.newResource);", - Collections.singletonMap("newResource", newResource.resourceMap()) - ); - - updateFlowFrameworkSystemIndexDocWithScript(WORKFLOW_STATE_INDEX, workflowId, script, ActionListener.wrap(updateResponse -> { - logger.info("updated resources created of {}", workflowId); - listener.onResponse(updateResponse); - }, listener::onFailure)); - } - /** * Adds a resource to the state index, including common exception handling * @param currentNodeInputs Inputs to the current node @@ -716,26 +681,93 @@ public void addResourceToStateIndex( String resourceId, ActionListener listener ) { + String workflowId = currentNodeInputs.getWorkflowId(); String resourceName = getResourceByWorkflowStep(workflowStepName); - try { - updateResourceInStateIndex( - currentNodeInputs.getWorkflowId(), - nodeId, - workflowStepName, - resourceId, - ActionListener.wrap(updateResponse -> { - logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex()); - listener.onResponse(new WorkflowData(Map.of(resourceName, resourceId), currentNodeInputs.getWorkflowId(), nodeId)); - }, exception -> { - String errorMessage = "Failed to update new created " + nodeId + " resource " + workflowStepName + " id " + resourceId; - logger.error(errorMessage, exception); - listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); - }) + ResourceCreated newResource = new ResourceCreated(workflowStepName, nodeId, resourceName, resourceId); + if (!doesIndexExist(WORKFLOW_STATE_INDEX)) { + String errorMessage = "Failed to update state for " + workflowId + " due to missing " + WORKFLOW_STATE_INDEX + " index"; + logger.error(errorMessage); + listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND)); + } else { + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + getAndUpdateResourceInStateDocumentWithRetries( + workflowId, + newResource, + RETRIES, + ActionListener.runBefore(listener, context::restore) + ); + } + } + } + + /** + * Performs a get and update of a State Index document adding a new resource with strong consistency and retries + * @param workflowId The document id to update + * @param newResource The resource to add to the resources created list + * @param retries The number of retries on update version conflicts + * @param listener The listener to complete on success or failure + */ + private void getAndUpdateResourceInStateDocumentWithRetries( + String workflowId, + ResourceCreated newResource, + int retries, + ActionListener listener + ) { + GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, workflowId); + client.get(getRequest, ActionListener.wrap(getResponse -> { + if (!getResponse.isExists()) { + listener.onFailure(new FlowFrameworkException("Workflow state not found for " + workflowId, RestStatus.NOT_FOUND)); + return; + } + WorkflowState currentState = WorkflowState.parse(getResponse.getSourceAsString()); + List resourcesCreated = new ArrayList<>(currentState.resourcesCreated()); + resourcesCreated.add(newResource); + XContentBuilder builder = XContentFactory.jsonBuilder(); + WorkflowState newState = WorkflowState.builder(currentState).resourcesCreated(resourcesCreated).build(); + newState.toXContent(builder, null); + UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, workflowId).doc(builder) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .setIfSeqNo(getResponse.getSeqNo()) + .setIfPrimaryTerm(getResponse.getPrimaryTerm()); + client.update( + updateRequest, + ActionListener.wrap( + r -> handleStateUpdateSuccess(workflowId, newResource, listener), + e -> handleStateUpdateException(workflowId, newResource, retries, listener, e) + ) ); - } catch (Exception e) { - String errorMessage = "Failed to parse and update new created resource"; - logger.error(errorMessage, e); - listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + }, ex -> handleStateUpdateException(workflowId, newResource, 0, listener, ex))); + } + + private void handleStateUpdateSuccess(String workflowId, ResourceCreated newResource, ActionListener listener) { + String resourceName = newResource.resourceType(); + String resourceId = newResource.resourceId(); + String nodeId = newResource.workflowStepId(); + logger.info("Updated resources created for {} on step {} with {} {}", workflowId, nodeId, resourceName, resourceId); + listener.onResponse(new WorkflowData(Map.of(resourceName, resourceId), workflowId, nodeId)); + } + + private void handleStateUpdateException( + String workflowId, + ResourceCreated newResource, + int retries, + ActionListener listener, + Exception e + ) { + if (e instanceof VersionConflictEngineException && retries > 0) { + // Retry if we haven't exhausted retries + getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, retries - 1, listener); + return; } + String errorMessage = "Failed to update workflow state for " + + workflowId + + " on step " + + newResource.workflowStepId() + + " with " + + newResource.resourceType() + + " " + + newResource.resourceId(); + logger.error(errorMessage, e); + listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); } } diff --git a/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java index 8d024d180..54f6a332c 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java @@ -31,13 +31,12 @@ import org.opensearch.flowframework.model.State; import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.model.Workflow; +import org.opensearch.flowframework.model.WorkflowState; import org.opensearch.flowframework.util.EncryptorUtils; import org.opensearch.flowframework.workflow.ProcessNode; import org.opensearch.flowframework.workflow.WorkflowProcessSorter; import org.opensearch.flowframework.workflow.WorkflowStepFactory; import org.opensearch.plugins.PluginsService; -import org.opensearch.script.Script; -import org.opensearch.script.ScriptType; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -58,7 +57,6 @@ import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.CommonValue.RESOURCES_CREATED_FIELD; import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD; -import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES; import static org.opensearch.flowframework.util.ParseUtils.getUserContext; import static org.opensearch.flowframework.util.ParseUtils.resolveUserAndExecute; @@ -210,24 +208,14 @@ private void executeReprovisionRequest( // Remove error field if any prior to subsequent execution if (response.getWorkflowState().getError() != null) { - Script script = new Script( - ScriptType.INLINE, - "painless", - "if(ctx._source.containsKey('error')){ctx._source.remove('error')}", - Collections.emptyMap() - ); - flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDocWithScript( - WORKFLOW_STATE_INDEX, - workflowId, - script, - ActionListener.wrap(updateResponse -> { - - }, exception -> { - String errorMessage = "Failed to update workflow state: " + workflowId; - logger.error(errorMessage, exception); - listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); - }) - ); + WorkflowState newState = WorkflowState.builder(response.getWorkflowState()).error(null).build(); + flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(workflowId, newState, ActionListener.wrap(updateResponse -> { + + }, exception -> { + String errorMessage = "Failed to update workflow state: " + workflowId; + logger.error(errorMessage, exception); + listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); + })); } // Update State Index, maintain resources created for subsequent execution diff --git a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java index ecaec46b5..3c6c4846b 100644 --- a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java +++ b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java @@ -35,6 +35,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.flowframework.TestHelpers; import org.opensearch.flowframework.common.WorkflowResources; @@ -47,6 +48,7 @@ import org.opensearch.flowframework.workflow.CreateConnectorStep; import org.opensearch.flowframework.workflow.CreateIndexStep; import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.index.get.GetResult; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; @@ -445,6 +447,63 @@ public void testUpdateFlowFrameworkSystemIndexDoc() throws IOException { ); } + public void testUpdateFlowFrameworkSystemIndexFullDoc() throws IOException { + ClusterState mockClusterState = mock(ClusterState.class); + Metadata mockMetaData = mock(Metadata.class); + when(clusterService.state()).thenReturn(mockClusterState); + when(mockClusterState.metadata()).thenReturn(mockMetaData); + when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true); + + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + + // test success + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, Result.UPDATED)); + return null; + }).when(client).update(any(UpdateRequest.class), any()); + + ToXContentObject fooBar = new ToXContentObject() { + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + XContentBuilder xContentBuilder = builder.startObject(); + xContentBuilder.field("foo", "bar"); + xContentBuilder.endObject(); + return builder; + } + }; + + flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", fooBar, listener); + + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(UpdateResponse.class); + verify(listener, times(1)).onResponse(responseCaptor.capture()); + assertEquals(Result.UPDATED, responseCaptor.getValue().getResult()); + + // test failure + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(new Exception("Failed to update state")); + return null; + }).when(client).update(any(UpdateRequest.class), any()); + + flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", fooBar, listener); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(listener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals("Failed to update state", exceptionCaptor.getValue().getMessage()); + + // test no index + when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false); + flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", fooBar, listener); + + verify(listener, times(2)).onFailure(exceptionCaptor.capture()); + assertEquals( + "Failed to update document 1 due to missing .plugins-flow-framework-state index", + exceptionCaptor.getValue().getMessage() + ); + } + public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException { ClusterState mockClusterState = mock(ClusterState.class); Metadata mockMetaData = mock(Metadata.class); @@ -502,6 +561,16 @@ public void testAddResourceToStateIndex() throws IOException { @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); // test success + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + XContentBuilder builder = XContentFactory.jsonBuilder(); + WorkflowState state = WorkflowState.builder().build(); + state.toXContent(builder, null); + BytesReference workflowBytesRef = BytesReference.bytes(builder); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", 1, 1, 1, true, workflowBytesRef, null, null); + responseListener.onResponse(new GetResponse(getResult)); + return null; + }).when(client).get(any(GetRequest.class), any()); doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(1); responseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "this_id", -2, 0, 0, Result.UPDATED)); @@ -509,7 +578,7 @@ public void testAddResourceToStateIndex() throws IOException { }).when(client).update(any(UpdateRequest.class), any()); flowFrameworkIndicesHandler.addResourceToStateIndex( - new WorkflowData(Collections.emptyMap(), null, null), + new WorkflowData(Collections.emptyMap(), "this_id", null), "node_id", CreateConnectorStep.NAME, "this_id", @@ -528,7 +597,7 @@ public void testAddResourceToStateIndex() throws IOException { }).when(client).update(any(UpdateRequest.class), any()); flowFrameworkIndicesHandler.addResourceToStateIndex( - new WorkflowData(Collections.emptyMap(), null, null), + new WorkflowData(Collections.emptyMap(), "this_id", null), "node_id", CreateConnectorStep.NAME, "this_id", @@ -537,6 +606,147 @@ public void testAddResourceToStateIndex() throws IOException { ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(listener, times(1)).onFailure(exceptionCaptor.capture()); - assertEquals("Failed to update new created node_id resource create_connector id this_id", exceptionCaptor.getValue().getMessage()); + assertEquals( + "Failed to update workflow state for this_id on step node_id with connector_id this_id", + exceptionCaptor.getValue().getMessage() + ); + + // test document not found + @SuppressWarnings("unchecked") + ActionListener notFoundListener = mock(ActionListener.class); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", -2, 0, 1, false, null, null, null); + responseListener.onResponse(new GetResponse(getResult)); + return null; + }).when(client).get(any(GetRequest.class), any()); + flowFrameworkIndicesHandler.addResourceToStateIndex( + new WorkflowData(Collections.emptyMap(), "this_id", null), + "node_id", + CreateConnectorStep.NAME, + "this_id", + notFoundListener + ); + + exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(notFoundListener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals("Workflow state not found for this_id", exceptionCaptor.getValue().getMessage()); + + // test index not found + when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false); + @SuppressWarnings("unchecked") + ActionListener indexNotFoundListener = mock(ActionListener.class); + flowFrameworkIndicesHandler.addResourceToStateIndex( + new WorkflowData(Collections.emptyMap(), "this_id", null), + "node_id", + CreateConnectorStep.NAME, + "this_id", + indexNotFoundListener + ); + + exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(indexNotFoundListener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals( + "Failed to update state for this_id due to missing .plugins-flow-framework-state index", + exceptionCaptor.getValue().getMessage() + ); + } + + public void testAddResourceToStateIndexWithRetries() throws IOException { + ClusterState mockClusterState = mock(ClusterState.class); + Metadata mockMetaData = mock(Metadata.class); + when(clusterService.state()).thenReturn(mockClusterState); + when(mockClusterState.metadata()).thenReturn(mockMetaData); + when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true); + VersionConflictEngineException conflictException = new VersionConflictEngineException( + new ShardId(WORKFLOW_STATE_INDEX, "", 1), + "this_id", + null + ); + UpdateResponse updateResponse = new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "this_id", -2, 0, 0, Result.UPDATED); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + XContentBuilder builder = XContentFactory.jsonBuilder(); + WorkflowState state = WorkflowState.builder().build(); + state.toXContent(builder, null); + BytesReference workflowBytesRef = BytesReference.bytes(builder); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", 1, 1, 1, true, workflowBytesRef, null, null); + responseListener.onResponse(new GetResponse(getResult)); + return null; + }).when(client).get(any(GetRequest.class), any()); + + // test success on retry + @SuppressWarnings("unchecked") + ActionListener retryListener = mock(ActionListener.class); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onResponse(updateResponse); + return null; + }).when(client).update(any(UpdateRequest.class), any()); + + flowFrameworkIndicesHandler.addResourceToStateIndex( + new WorkflowData(Collections.emptyMap(), "this_id", null), + "node_id", + CreateConnectorStep.NAME, + "this_id", + retryListener + ); + + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowData.class); + verify(retryListener, times(1)).onResponse(responseCaptor.capture()); + assertEquals("this_id", responseCaptor.getValue().getContent().get(WorkflowResources.CONNECTOR_ID)); + + // test failure on 6th after 5 retries even if 7th would have been success + @SuppressWarnings("unchecked") + ActionListener threeRetryListener = mock(ActionListener.class); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + // we'll never get here + ActionListener responseListener = invocation.getArgument(1); + responseListener.onResponse(updateResponse); + return null; + }).when(client).update(any(UpdateRequest.class), any()); + + flowFrameworkIndicesHandler.addResourceToStateIndex( + new WorkflowData(Collections.emptyMap(), "this_id", null), + "node_id", + CreateConnectorStep.NAME, + "this_id", + threeRetryListener + ); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(threeRetryListener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals( + "Failed to update workflow state for this_id on step node_id with connector_id this_id", + exceptionCaptor.getValue().getMessage() + ); } } diff --git a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java index 86499e3d8..52238871e 100644 --- a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java @@ -71,6 +71,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -770,7 +771,7 @@ public void testUpdateWorkflow() throws IOException { ActionListener updateResponseListener = invocation.getArgument(2); updateResponseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, UPDATED)); return null; - }).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(anyString(), any(), any()); + }).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(anyString(), anyMap(), any()); createWorkflowTransportAction.doExecute(mock(Task.class), updateWorkflow, listener); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); diff --git a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java index a6eacc069..623270a27 100644 --- a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java @@ -54,6 +54,7 @@ import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -164,7 +165,7 @@ public void testProvisionWorkflow() { ActionListener actionListener = invocation.getArgument(2); actionListener.onResponse(mock(UpdateResponse.class)); return null; - }).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(any(), any(), any()); + }).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(any(), anyMap(), any()); doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(2); @@ -211,7 +212,7 @@ public void testProvisionWorkflowTwice() { ActionListener actionListener = invocation.getArgument(2); actionListener.onResponse(mock(UpdateResponse.class)); return null; - }).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(any(), any(), any()); + }).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(any(), anyMap(), any()); provisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); diff --git a/src/test/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportActionTests.java index e654b0482..6e1e65d3b 100644 --- a/src/test/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportActionTests.java @@ -44,6 +44,7 @@ import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -147,7 +148,7 @@ public void testReprovisionWorkflow() throws Exception { ActionListener actionListener = invocation.getArgument(2); actionListener.onResponse(mock(UpdateResponse.class)); return null; - }).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(any(), any(), any()); + }).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(any(), anyMap(), any()); @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); @@ -275,7 +276,7 @@ public void testFailedStateUpdate() throws Exception { ActionListener actionListener = invocation.getArgument(2); actionListener.onFailure(new Exception("failed")); return null; - }).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(any(), any(), any()); + }).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(any(), anyMap(), any()); @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class);