diff --git a/CHANGELOG.md b/CHANGELOG.md index 985a507c9..b7c653e8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) - Silently ignore content on APIs that don't require it ([#639](https://github.com/opensearch-project/flow-framework/pull/639)) - Hide user and credential field from search response ([#680](https://github.com/opensearch-project/flow-framework/pull/680)) - Throw the correct error message in status API for WorkflowSteps ([#676](https://github.com/opensearch-project/flow-framework/pull/676)) +- Delete workflow state when template is deleted and no resources exist ([#689](https://github.com/opensearch-project/flow-framework/pull/689)) ### Infrastructure - Switch macos runner to macos-13 from macos-latest since macos-latest is now arm64 ([#686](https://github.com/opensearch-project/flow-framework/pull/686)) diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 9e7dd9433..34cdf2b56 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -15,6 +15,8 @@ import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; @@ -519,6 +521,51 @@ public void getProvisioningProgress( } } + /** + * Check workflow provisioning state and resources to see if state can be deleted with template + * + * @param documentId document id + * @param canDeleteStateConsumer consumer function which will be true if NOT_STARTED or COMPLETED and no resources + * @param listener action listener from caller to fail on error + * @param action listener response type + */ + public void canDeleteWorkflowStateDoc(String documentId, Consumer canDeleteStateConsumer, ActionListener listener) { + GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId); + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + client.get(getRequest, ActionListener.wrap(response -> { + context.restore(); + if (!response.isExists()) { + // no need to delete if it's not there to start with + canDeleteStateConsumer.accept(Boolean.FALSE); + return; + } + try ( + XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef()) + ) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + WorkflowState workflowState = WorkflowState.parse(parser); + canDeleteStateConsumer.accept( + workflowState.resourcesCreated().isEmpty() + && !ProvisioningProgress.IN_PROGRESS.equals( + ProvisioningProgress.valueOf(workflowState.getProvisioningProgress()) + ) + ); + } catch (Exception e) { + String errorMessage = "Failed to parse workflow state " + documentId; + logger.error(errorMessage, e); + listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR)); + } + }, exception -> { + logger.error("Failed to get workflow state for {} ", documentId); + canDeleteStateConsumer.accept(Boolean.FALSE); + })); + } catch (Exception e) { + String errorMessage = "Failed to retrieve workflow state to check provisioning status"; + logger.error(errorMessage, e); + listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + } + } + /** * Updates a document in the workflow state index * @param documentId the document ID @@ -531,7 +578,7 @@ public void updateFlowFrameworkSystemIndexDoc( ActionListener listener ) { if (!doesIndexExist(WORKFLOW_STATE_INDEX)) { - String errorMessage = "Failed to update document for given workflow due to missing " + WORKFLOW_STATE_INDEX + " index"; + String errorMessage = "Failed to update document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index"; logger.error(errorMessage); listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); } else { @@ -551,6 +598,29 @@ public void updateFlowFrameworkSystemIndexDoc( } } + /** + * Deletes a document in the workflow state index + * @param documentId the document ID + * @param listener action listener + */ + public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener listener) { + if (!doesIndexExist(WORKFLOW_STATE_INDEX)) { + String errorMessage = "Failed to delete document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index"; + logger.error(errorMessage); + listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); + } else { + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + DeleteRequest deleteRequest = new DeleteRequest(WORKFLOW_STATE_INDEX, documentId); + deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore)); + } catch (Exception e) { + String errorMessage = "Failed to delete " + WORKFLOW_STATE_INDEX + " entry : " + documentId; + logger.error(errorMessage, e); + listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + } + } + } + /** * Updates a document in the workflow state index * @param indexName the index that we will be updating a document of. diff --git a/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java index 84f629948..0e4699f20 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java @@ -64,6 +64,15 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener stateListener = ActionListener.wrap(response -> { + logger.info("Deleted workflow state doc: {}", workflowId); + }, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); }); + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, canDelete -> { + if (Boolean.TRUE.equals(canDelete)) { + flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener); + } + }, stateListener); } else { String errorMessage = "There are no templates in the global context"; logger.error(errorMessage); diff --git a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java index f0ca288a2..98f93aa07 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java @@ -226,15 +226,23 @@ private void updateWorkflowState( ) { if (remainingResources.isEmpty()) { // Successful deprovision, reset state to initial - flowFrameworkIndicesHandler.putInitialStateToWorkflowState( - workflowId, - getUserContext(client), - ActionListener.wrap(indexResponse -> { - logger.info("Reset workflow {} state to NOT_STARTED", workflowId); - }, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); }) - ); - // return workflow ID - listener.onResponse(new WorkflowResponse(workflowId)); + flowFrameworkIndicesHandler.doesTemplateExist(workflowId, templateExists -> { + if (Boolean.TRUE.equals(templateExists)) { + flowFrameworkIndicesHandler.putInitialStateToWorkflowState( + workflowId, + getUserContext(client), + ActionListener.wrap(indexResponse -> { + logger.info("Reset workflow {} state to NOT_STARTED", workflowId); + }, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); }) + ); + } else { + flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, ActionListener.wrap(deleteResponse -> { + logger.info("Deleted workflow {} state", workflowId); + }, exception -> { logger.error("Failed to delete workflow state for {}", workflowId, exception); })); + } + // return workflow ID + listener.onResponse(new WorkflowResponse(workflowId)); + }, listener); } else { // Failed deprovision flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( diff --git a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java index 645b7c5c8..ad2ebca0e 100644 --- a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java +++ b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java @@ -9,12 +9,17 @@ package org.opensearch.flowframework.indices; import org.opensearch.Version; +import org.opensearch.action.DocWriteResponse.Result; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.client.IndicesAdminClient; @@ -29,9 +34,11 @@ import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.flowframework.TestHelpers; import org.opensearch.flowframework.model.ProvisioningProgress; +import org.opensearch.flowframework.model.ResourceCreated; import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.model.Workflow; import org.opensearch.flowframework.model.WorkflowState; @@ -285,6 +292,93 @@ public void testIsWorkflowProvisionedFailedParsing() { assertTrue(exceptionCaptor.getValue().getMessage().contains("Failed to parse workflow state")); } + public void testCanDeleteWorkflowStateDoc() { + String documentId = randomAlphaOfLength(5); + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + WorkflowState workFlowState = new WorkflowState( + documentId, + "test", + "PROVISIONING", + "NOT_STARTED", + Instant.now(), + Instant.now(), + TestHelpers.randomUser(), + Collections.emptyMap(), + Collections.emptyList() + ); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + workFlowState.toXContent(builder, null); + BytesReference workflowBytesRef = BytesReference.bytes(builder); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, documentId, 1, 1, 1, true, workflowBytesRef, null, null); + responseListener.onResponse(new GetResponse(getResult)); + return null; + }).when(client).get(any(GetRequest.class), any()); + + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertTrue(canDelete); }, listener); + } + + public void testCanNotDeleteWorkflowStateDocInProgress() { + String documentId = randomAlphaOfLength(5); + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + WorkflowState workFlowState = new WorkflowState( + documentId, + "test", + "PROVISIONING", + "IN_PROGRESS", + Instant.now(), + Instant.now(), + TestHelpers.randomUser(), + Collections.emptyMap(), + Collections.emptyList() + ); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + workFlowState.toXContent(builder, null); + BytesReference workflowBytesRef = BytesReference.bytes(builder); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, documentId, 1, 1, 1, true, workflowBytesRef, null, null); + responseListener.onResponse(new GetResponse(getResult)); + return null; + }).when(client).get(any(GetRequest.class), any()); + + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener); + } + + public void testCanNotDeleteWorkflowStateDocResourcesExist() { + String documentId = randomAlphaOfLength(5); + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + WorkflowState workFlowState = new WorkflowState( + documentId, + "test", + "PROVISIONING", + "DONE", + Instant.now(), + Instant.now(), + TestHelpers.randomUser(), + Collections.emptyMap(), + List.of(new ResourceCreated("w", "x", "y", "z")) + ); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + workFlowState.toXContent(builder, null); + BytesReference workflowBytesRef = BytesReference.bytes(builder); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, documentId, 1, 1, 1, true, workflowBytesRef, null, null); + responseListener.onResponse(new GetResponse(getResult)); + return null; + }).when(client).get(any(GetRequest.class), any()); + + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener); + } + public void testDoesTemplateExist() { String documentId = randomAlphaOfLength(5); Consumer function = mock(Consumer.class); @@ -302,4 +396,98 @@ public void testDoesTemplateExist() { flowFrameworkIndicesHandler.doesTemplateExist(documentId, function, listener); verify(function).accept(true); } + + public void testUpdateFlowFrameworkSystemIndexDoc() throws IOException { + ClusterState mockClusterState = mock(ClusterState.class); + Metadata mockMetaData = mock(Metadata.class); + when(clusterService.state()).thenReturn(mockClusterState); + when(mockClusterState.metadata()).thenReturn(mockMetaData); + when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true); + + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + + // test success + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, Result.UPDATED)); + return null; + }).when(client).update(any(UpdateRequest.class), any()); + + flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", Map.of("foo", "bar"), listener); + + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(UpdateResponse.class); + verify(listener, times(1)).onResponse(responseCaptor.capture()); + assertEquals(Result.UPDATED, responseCaptor.getValue().getResult()); + + // test failure + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(new Exception("Failed to update state")); + return null; + }).when(client).update(any(UpdateRequest.class), any()); + + flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", Map.of("foo", "bar"), listener); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(listener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals("Failed to update state", exceptionCaptor.getValue().getMessage()); + + // test no index + when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false); + flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", Map.of("foo", "bar"), listener); + + verify(listener, times(2)).onFailure(exceptionCaptor.capture()); + assertEquals( + "Failed to update document 1 due to missing .plugins-flow-framework-state index", + exceptionCaptor.getValue().getMessage() + ); + } + + public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException { + ClusterState mockClusterState = mock(ClusterState.class); + Metadata mockMetaData = mock(Metadata.class); + when(clusterService.state()).thenReturn(mockClusterState); + when(mockClusterState.metadata()).thenReturn(mockMetaData); + when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true); + + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + + // test success + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onResponse(new DeleteResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, true)); + return null; + }).when(client).delete(any(DeleteRequest.class), any()); + + flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener); + + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(DeleteResponse.class); + verify(listener, times(1)).onResponse(responseCaptor.capture()); + assertEquals(Result.DELETED, responseCaptor.getValue().getResult()); + + // test failure + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(new Exception("Failed to delete state")); + return null; + }).when(client).delete(any(DeleteRequest.class), any()); + + flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(listener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals("Failed to delete state", exceptionCaptor.getValue().getMessage()); + + // test no index + when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false); + flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener); + + verify(listener, times(2)).onFailure(exceptionCaptor.capture()); + assertEquals( + "Failed to delete document 1 due to missing .plugins-flow-framework-state index", + exceptionCaptor.getValue().getMessage() + ); + } } diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkSecureRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkSecureRestApiIT.java index 44a15d960..40efaff83 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkSecureRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkSecureRestApiIT.java @@ -129,13 +129,21 @@ public void testCreateProvisionDeprovisionWorkflowWithFullAccess() throws Except response = getWorkflowStatus(fullAccessClient(), workflowId, false); assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); + // Invoke delete API while state still exists + response = deleteWorkflow(fullAccessClient(), workflowId); + assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); + + // Invoke status API + response = getWorkflowStatus(fullAccessClient(), workflowId, false); + assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); + // Invoke deprovision API response = deprovisionWorkflow(fullAccessClient(), workflowId); assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); - // Invoke delete API - response = deleteWorkflow(fullAccessClient(), workflowId); - assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); + // Invoke status API with failure + ResponseException exception = expectThrows(ResponseException.class, () -> getWorkflowStatus(fullAccessClient(), workflowId, false)); + assertEquals(RestStatus.NOT_FOUND.getStatus(), exception.getResponse().getStatusLine().getStatusCode()); } public void testGetWorkflowStepWithFullAccess() throws Exception { diff --git a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java index 80c1ed0b8..9265c4c63 100644 --- a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.mockito.ArgumentCaptor; @@ -122,6 +123,12 @@ public void testDeprovisionWorkflow() throws Exception { return null; }).when(client).execute(any(GetWorkflowStateAction.class), any(GetWorkflowStateRequest.class), any()); + doAnswer(invocation -> { + Consumer booleanConsumer = invocation.getArgument(1); + booleanConsumer.accept(Boolean.TRUE); + return null; + }).when(flowFrameworkIndicesHandler).doesTemplateExist(anyString(), any(), any()); + PlainActionFuture future = PlainActionFuture.newFuture(); future.onResponse(WorkflowData.EMPTY); when(this.deleteConnectorStep.execute(anyString(), any(WorkflowData.class), anyMap(), anyMap(), anyMap())).thenReturn(future);