From ae6539b471cdfcfd41c168b28dc6fdeabd88c85e Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Mon, 23 Dec 2024 14:59:19 -0800 Subject: [PATCH] Migrate canDeleteWorkflowStateDoc to avoid repetition Signed-off-by: Daniel Widdis --- .../indices/FlowFrameworkIndicesHandler.java | 55 ------------------- .../DeleteWorkflowTransportAction.java | 2 +- .../FlowFrameworkIndicesHandlerTests.java | 8 +-- 3 files changed, 5 insertions(+), 60 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index ad2b3c94..8b910c1c 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -685,61 +685,6 @@ public void canDeleteWorkflowStateDoc( } } - /** - * Check workflow provisioning state and resources to see if state can be deleted with template - * - * @param documentId document id - * @param clearStatus if set true, always deletes the state document unless status is IN_PROGRESS - * @param canDeleteStateConsumer consumer function which will be true if workflow state is not IN_PROGRESS and either no resources or true clearStatus - * @param listener action listener from caller to fail on error - * @param action listener response type - * @deprecated TODO migrating all these to the tenant aware version above - */ - @Deprecated - public void canDeleteWorkflowStateDoc( - String documentId, - boolean clearStatus, - Consumer canDeleteStateConsumer, - ActionListener listener - ) { - GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId); - try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - client.get(getRequest, ActionListener.wrap(response -> { - context.restore(); - if (!response.isExists()) { - // no need to delete if it's not there to start with - canDeleteStateConsumer.accept(Boolean.FALSE); - return; - } - try ( - XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef()) - ) { - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); - WorkflowState workflowState = WorkflowState.parse(parser); - canDeleteStateConsumer.accept( - (clearStatus || workflowState.resourcesCreated().isEmpty()) - && !ProvisioningProgress.IN_PROGRESS.equals( - ProvisioningProgress.valueOf(workflowState.getProvisioningProgress()) - ) - ); - } catch (Exception e) { - String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage("Failed to parse workflow state {}", documentId) - .getFormattedMessage(); - ; - logger.error(errorMessage, e); - listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR)); - } - }, exception -> { - logger.error("Failed to get workflow state for {} ", documentId); - canDeleteStateConsumer.accept(Boolean.FALSE); - })); - } catch (Exception e) { - String errorMessage = "Failed to retrieve workflow state to check provisioning status"; - logger.error(errorMessage, e); - listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); - } - } - /** * Updates a complete document in the workflow state index * @param documentId the document ID diff --git a/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java index c3db98ea..8f3da2c6 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java @@ -166,7 +166,7 @@ private void executeDeleteRequest( ActionListener stateListener = ActionListener.wrap(response -> { logger.info("Deleted workflow state doc: {}", workflowId); }, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); }); - flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, clearStatus, canDelete -> { + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, tenantId, clearStatus, canDelete -> { if (Boolean.TRUE.equals(canDelete)) { flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener); } diff --git a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java index 5054eb52..64b51f9e 100644 --- a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java +++ b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java @@ -371,7 +371,7 @@ public void testCanDeleteWorkflowStateDoc() { return null; }).when(client).get(any(GetRequest.class), any()); - flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, false, canDelete -> { assertTrue(canDelete); }, listener); + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, null, false, canDelete -> { assertTrue(canDelete); }, listener); } public void testCanNotDeleteWorkflowStateDocInProgress() { @@ -401,7 +401,7 @@ public void testCanNotDeleteWorkflowStateDocInProgress() { return null; }).when(client).get(any(GetRequest.class), any()); - flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, true, canDelete -> { assertFalse(canDelete); }, listener); + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, null, true, canDelete -> { assertFalse(canDelete); }, listener); } public void testDeleteWorkflowStateDocResourcesExist() { @@ -432,10 +432,10 @@ public void testDeleteWorkflowStateDocResourcesExist() { }).when(client).get(any(GetRequest.class), any()); // Can't delete because resources exist - flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, false, canDelete -> { assertFalse(canDelete); }, listener); + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, null, false, canDelete -> { assertFalse(canDelete); }, listener); // But can delete if clearStatus set true - flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, true, canDelete -> { assertTrue(canDelete); }, listener); + flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(documentId, null, true, canDelete -> { assertTrue(canDelete); }, listener); } public void testDoesTemplateExist() throws IOException, InterruptedException {