Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete workflow state when template is deleted and no resources exist #689

Merged
merged 10 commits into from
Apr 30, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
- Silently ignore content on APIs that don't require it ([#639](https://github.com/opensearch-project/flow-framework/pull/639))
- Hide user and credential field from search response ([#680](https://github.com/opensearch-project/flow-framework/pull/680))
- Throw the correct error message in status API for WorkflowSteps ([#676](https://github.com/opensearch-project/flow-framework/pull/676))
- Delete workflow state when template is deleted and no resources exist ([#689](https://github.com/opensearch-project/flow-framework/pull/689))

### Infrastructure
- Switch macos runner to macos-13 from macos-latest since macos-latest is now arm64 ([#686](https://github.com/opensearch-project/flow-framework/pull/686))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
Expand Down Expand Up @@ -519,6 +521,51 @@ public <T> void getProvisioningProgress(
}
}

/**
* Check workflow provisioning state and resources to see if state can be deleted with template
*
* @param documentId document id
* @param canDeleteStateConsumer consumer function which will be true if NOT_STARTED or COMPLETED and no resources
* @param listener action listener from caller to fail on error
* @param <T> action listener response type
*/
public <T> void canDeleteWorkflowStateDoc(String documentId, Consumer<Boolean> canDeleteStateConsumer, ActionListener<T> listener) {
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(getRequest, ActionListener.wrap(response -> {
context.restore();
if (!response.isExists()) {
// no need to delete if it's not there to start with
canDeleteStateConsumer.accept(Boolean.FALSE);
return;
}
try (
XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
WorkflowState workflowState = WorkflowState.parse(parser);
canDeleteStateConsumer.accept(
workflowState.resourcesCreated().isEmpty()
&& !ProvisioningProgress.IN_PROGRESS.equals(
ProvisioningProgress.valueOf(workflowState.getProvisioningProgress())
)
);
} catch (Exception e) {
String errorMessage = "Failed to parse workflow state " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR));
}
}, exception -> {
logger.error("Failed to get workflow state for {} ", documentId);
canDeleteStateConsumer.accept(Boolean.FALSE);
}));
} catch (Exception e) {
String errorMessage = "Failed to retrieve workflow state to check provisioning status";
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}

/**
* Updates a document in the workflow state index
* @param documentId the document ID
Expand All @@ -531,7 +578,7 @@ public void updateFlowFrameworkSystemIndexDoc(
ActionListener<UpdateResponse> listener
) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to update document for given workflow due to missing " + WORKFLOW_STATE_INDEX + " index";
String errorMessage = "Failed to update document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
Expand All @@ -551,6 +598,29 @@ public void updateFlowFrameworkSystemIndexDoc(
}
}

/**
* Deletes a document in the workflow state index
* @param documentId the document ID
* @param listener action listener
*/
public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener<DeleteResponse> listener) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to delete document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
DeleteRequest deleteRequest = new DeleteRequest(WORKFLOW_STATE_INDEX, documentId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
String errorMessage = "Failed to delete " + WORKFLOW_STATE_INDEX + " entry : " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}
}

/**
* Updates a document in the workflow state index
* @param indexName the index that we will be updating a document of.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Dele
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext();
logger.info("Deleting workflow doc: {}", workflowId);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));

ActionListener<DeleteResponse> stateListener = ActionListener.wrap(response -> {
logger.info("Deleted workflow state doc: {}", workflowId);
}, exception -> { logger.info("Failed to delet workflow state doc: {}", workflowId, exception); });
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, canDelete -> {
if (Boolean.TRUE.equals(canDelete)) {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener);
}
}, stateListener);
} else {
String errorMessage = "There are no templates in the global context";
logger.error(errorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,23 @@ private void updateWorkflowState(
) {
if (remainingResources.isEmpty()) {
// Successful deprovision, reset state to initial
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
workflowId,
getUserContext(client),
ActionListener.wrap(indexResponse -> {
logger.info("Reset workflow {} state to NOT_STARTED", workflowId);
}, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); })
);
// return workflow ID
listener.onResponse(new WorkflowResponse(workflowId));
flowFrameworkIndicesHandler.doesTemplateExist(workflowId, templateExists -> {
if (Boolean.TRUE.equals(templateExists)) {
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(
workflowId,
getUserContext(client),
ActionListener.wrap(indexResponse -> {
logger.info("Reset workflow {} state to NOT_STARTED", workflowId);
}, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); })
);
} else {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, ActionListener.wrap(deleteResponse -> {
logger.info("Deleted workflow {} state", workflowId);
}, exception -> { logger.error("Failed to delete workflow state for {}", workflowId, exception); }));
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
}
// return workflow ID
listener.onResponse(new WorkflowResponse(workflowId));
}, listener);
} else {
// Failed deprovision
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
package org.opensearch.flowframework.indices;

import org.opensearch.Version;
import org.opensearch.action.DocWriteResponse.Result;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.IndicesAdminClient;
Expand All @@ -29,9 +34,11 @@
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.ResourceCreated;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowState;
Expand Down Expand Up @@ -285,6 +292,93 @@ public void testIsWorkflowProvisionedFailedParsing() {
assertTrue(exceptionCaptor.getValue().getMessage().contains("Failed to parse workflow state"));
}

public void testCanDeleteWorkflowStateDoc() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
"test",
"PROVISIONING",
"NOT_STARTED",
Instant.now(),
Instant.now(),
TestHelpers.randomUser(),
Collections.emptyMap(),
Collections.emptyList()
);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

XContentBuilder builder = XContentFactory.jsonBuilder();
workFlowState.toXContent(builder, null);
BytesReference workflowBytesRef = BytesReference.bytes(builder);
GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, documentId, 1, 1, 1, true, workflowBytesRef, null, null);
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertTrue(canDelete); }, listener);
}

public void testCanNotDeleteWorkflowStateDocInProgress() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
"test",
"PROVISIONING",
"IN_PROGRESS",
Instant.now(),
Instant.now(),
TestHelpers.randomUser(),
Collections.emptyMap(),
Collections.emptyList()
);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

XContentBuilder builder = XContentFactory.jsonBuilder();
workFlowState.toXContent(builder, null);
BytesReference workflowBytesRef = BytesReference.bytes(builder);
GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, documentId, 1, 1, 1, true, workflowBytesRef, null, null);
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener);
}

public void testCanNotDeleteWorkflowStateDocResourcesExist() {
String documentId = randomAlphaOfLength(5);
@SuppressWarnings("unchecked")
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
"test",
"PROVISIONING",
"DONE",
Instant.now(),
Instant.now(),
TestHelpers.randomUser(),
Collections.emptyMap(),
List.of(new ResourceCreated("w", "x", "y", "z"))
);
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

XContentBuilder builder = XContentFactory.jsonBuilder();
workFlowState.toXContent(builder, null);
BytesReference workflowBytesRef = BytesReference.bytes(builder);
GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, documentId, 1, 1, 1, true, workflowBytesRef, null, null);
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());

flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, canDelete -> { assertFalse(canDelete); }, listener);
}

public void testDoesTemplateExist() {
String documentId = randomAlphaOfLength(5);
Consumer<Boolean> function = mock(Consumer.class);
Expand All @@ -302,4 +396,98 @@ public void testDoesTemplateExist() {
flowFrameworkIndicesHandler.doesTemplateExist(documentId, function, listener);
verify(function).accept(true);
}

public void testUpdateFlowFrameworkSystemIndexDoc() throws IOException {
ClusterState mockClusterState = mock(ClusterState.class);
Metadata mockMetaData = mock(Metadata.class);
when(clusterService.state()).thenReturn(mockClusterState);
when(mockClusterState.metadata()).thenReturn(mockMetaData);
when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true);

@SuppressWarnings("unchecked")
ActionListener<UpdateResponse> listener = mock(ActionListener.class);

// test success
doAnswer(invocation -> {
ActionListener<UpdateResponse> responseListener = invocation.getArgument(1);
responseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, Result.UPDATED));
return null;
}).when(client).update(any(UpdateRequest.class), any());

flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", Map.of("foo", "bar"), listener);

ArgumentCaptor<UpdateResponse> responseCaptor = ArgumentCaptor.forClass(UpdateResponse.class);
verify(listener, times(1)).onResponse(responseCaptor.capture());
assertEquals(Result.UPDATED, responseCaptor.getValue().getResult());

// test failure
doAnswer(invocation -> {
ActionListener<UpdateResponse> responseListener = invocation.getArgument(1);
responseListener.onFailure(new Exception("Failed to update state"));
return null;
}).when(client).update(any(UpdateRequest.class), any());

flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", Map.of("foo", "bar"), listener);

ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("Failed to update state", exceptionCaptor.getValue().getMessage());

// test no index
when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc("1", Map.of("foo", "bar"), listener);

verify(listener, times(2)).onFailure(exceptionCaptor.capture());
assertEquals(
"Failed to update document 1 due to missing .plugins-flow-framework-state index",
exceptionCaptor.getValue().getMessage()
);
}

public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException {
ClusterState mockClusterState = mock(ClusterState.class);
Metadata mockMetaData = mock(Metadata.class);
when(clusterService.state()).thenReturn(mockClusterState);
when(mockClusterState.metadata()).thenReturn(mockMetaData);
when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true);

@SuppressWarnings("unchecked")
ActionListener<DeleteResponse> listener = mock(ActionListener.class);

// test success
doAnswer(invocation -> {
ActionListener<DeleteResponse> responseListener = invocation.getArgument(1);
responseListener.onResponse(new DeleteResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, true));
return null;
}).when(client).delete(any(DeleteRequest.class), any());

flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener);

ArgumentCaptor<DeleteResponse> responseCaptor = ArgumentCaptor.forClass(DeleteResponse.class);
verify(listener, times(1)).onResponse(responseCaptor.capture());
assertEquals(Result.DELETED, responseCaptor.getValue().getResult());

// test failure
doAnswer(invocation -> {
ActionListener<DeleteResponse> responseListener = invocation.getArgument(1);
responseListener.onFailure(new Exception("Failed to delete state"));
return null;
}).when(client).delete(any(DeleteRequest.class), any());

flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener);

ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("Failed to delete state", exceptionCaptor.getValue().getMessage());

// test no index
when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false);
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener);

verify(listener, times(2)).onFailure(exceptionCaptor.capture());
assertEquals(
"Failed to delete document 1 due to missing .plugins-flow-framework-state index",
exceptionCaptor.getValue().getMessage()
);
}
}
Loading
Loading