Skip to content

Commit

Permalink
Migrate state document deletion to metadata client
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Dec 26, 2024
1 parent 2d79e13 commit c5a01c5
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
Expand Down Expand Up @@ -50,6 +49,7 @@
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.remote.metadata.client.DeleteDataObjectRequest;
import org.opensearch.remote.metadata.client.GetDataObjectRequest;
import org.opensearch.remote.metadata.client.PutDataObjectRequest;
import org.opensearch.remote.metadata.client.SdkClient;
Expand All @@ -68,6 +68,7 @@
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.CONFIG_INDEX_MAPPING;
import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING;
import static org.opensearch.flowframework.common.CommonValue.META;
Expand Down Expand Up @@ -778,9 +779,10 @@ public void updateFlowFrameworkSystemIndexDoc(
/**
* Deletes a document in the workflow state index
* @param documentId the document ID
* @param tenantId the tenant Id
* @param listener action listener
*/
public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener<DeleteResponse> listener) {
public void deleteFlowFrameworkSystemIndexDoc(String documentId, String tenantId, ActionListener<DeleteResponse> listener) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to delete document {} due to missing {} index",
Expand All @@ -791,17 +793,36 @@ public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener<
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
DeleteRequest deleteRequest = new DeleteRequest(WORKFLOW_STATE_INDEX, documentId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to delete {} entry : {}",
WORKFLOW_STATE_INDEX,
documentId
).getFormattedMessage();
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
DeleteDataObjectRequest deleteRequest = DeleteDataObjectRequest.builder()
.index(WORKFLOW_STATE_INDEX)
.id(documentId)
.tenantId(tenantId)
.build();
sdkClient.deleteDataObjectAsync(deleteRequest, client.threadPool().executor(DEPROVISION_WORKFLOW_THREAD_POOL))
.whenComplete((r, throwable) -> {
context.restore();
if (throwable == null) {
try {
DeleteResponse response = DeleteResponse.fromXContent(r.parser());
logger.info("Deleted workflow state doc: {}", documentId);
listener.onResponse(response);
} catch (Exception e) {
logger.error("Failed to parse delete response", e);
listener.onFailure(
new FlowFrameworkException("Failed to parse delete response", RestStatus.INTERNAL_SERVER_ERROR)
);
}
} else {
Exception exception = SdkClientUtils.unwrapAndConvertToException(throwable);
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to delete {} entry : {}",
WORKFLOW_STATE_INDEX,
documentId
).getFormattedMessage();
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private void executeDeleteRequest(
}, exception -> { logger.info("Failed to delete workflow state doc: {}", workflowId, exception); });
flowFrameworkIndicesHandler.canDeleteWorkflowStateDoc(workflowId, tenantId, clearStatus, canDelete -> {
if (Boolean.TRUE.equals(canDelete)) {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, stateListener);
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, tenantId, stateListener);
}
}, stateListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,13 @@ private void updateWorkflowState(
}, exception -> { logger.error("Failed to reset to initial workflow state for {}", workflowId, exception); })
);
} else {
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(workflowId, ActionListener.wrap(deleteResponse -> {
logger.info("Deleted workflow {} state", workflowId);
}, exception -> { logger.error("Failed to delete workflow state for {}", workflowId, exception); }));
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc(
workflowId,
tenantId,
ActionListener.wrap(deleteResponse -> {
logger.info("Deleted workflow {} state", workflowId);
}, exception -> { logger.error("Failed to delete workflow state for {}", workflowId, exception); })
);
}
// return workflow ID
listener.onResponse(new WorkflowResponse(workflowId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
Expand Down Expand Up @@ -79,6 +78,7 @@
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
Expand All @@ -102,6 +102,13 @@ public class FlowFrameworkIndicesHandlerTests extends OpenSearchTestCase {
Math.max(2, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
DEPROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(2, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + DEPROVISION_WORKFLOW_THREAD_POOL
)
);

Expand All @@ -115,7 +122,6 @@ public class FlowFrameworkIndicesHandlerTests extends OpenSearchTestCase {
private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private AdminClient adminClient;
private IndicesAdminClient indicesAdminClient;
private ThreadContext threadContext;
@Mock
protected ClusterService clusterService;
@Mock
Expand All @@ -135,8 +141,6 @@ public void setUp() throws Exception {
super.setUp();
MockitoAnnotations.openMocks(this);

Settings settings = Settings.builder().build();
threadContext = new ThreadContext(settings);
when(client.threadPool()).thenReturn(testThreadPool);
sdkClient = SdkClientFactory.createSdkClient(client, namedXContentRegistry, Collections.emptyMap());
flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(
Expand Down Expand Up @@ -228,7 +232,7 @@ public void testFailedUpdateTemplateInGlobalContextNotExisting() throws IOExcept

CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<IndexResponse> latchedActionListener = new LatchedActionListener<>(listener, latch);
flowFrameworkIndicesHandler.updateTemplateInGlobalContext("1", template, listener);
flowFrameworkIndicesHandler.updateTemplateInGlobalContext("1", template, latchedActionListener);
latch.await(1, TimeUnit.SECONDS);

ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
Expand Down Expand Up @@ -564,7 +568,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
);
}

public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException {
public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException, InterruptedException {
ClusterState mockClusterState = mock(ClusterState.class);
Metadata mockMetaData = mock(Metadata.class);
when(clusterService.state()).thenReturn(mockClusterState);
Expand All @@ -574,35 +578,35 @@ public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException {
@SuppressWarnings("unchecked")
ActionListener<DeleteResponse> listener = mock(ActionListener.class);

// test success
doAnswer(invocation -> {
ActionListener<DeleteResponse> responseListener = invocation.getArgument(1);
responseListener.onResponse(new DeleteResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, true));
return null;
}).when(client).delete(any(DeleteRequest.class), any());
PlainActionFuture<DeleteResponse> future = PlainActionFuture.newFuture();
future.onResponse(new DeleteResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, true));
when(client.delete(any(DeleteRequest.class))).thenReturn(future);

flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener);
CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<DeleteResponse> latchedActionListener = new LatchedActionListener<>(listener, latch);
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", null, latchedActionListener);
latch.await(1, TimeUnit.SECONDS);

ArgumentCaptor<DeleteResponse> responseCaptor = ArgumentCaptor.forClass(DeleteResponse.class);
verify(listener, times(1)).onResponse(responseCaptor.capture());
assertEquals(Result.DELETED, responseCaptor.getValue().getResult());

// test failure
doAnswer(invocation -> {
ActionListener<DeleteResponse> responseListener = invocation.getArgument(1);
responseListener.onFailure(new Exception("Failed to delete state"));
return null;
}).when(client).delete(any(DeleteRequest.class), any());
future = PlainActionFuture.newFuture();
future.onFailure(new Exception("Failed to delete state"));
when(client.delete(any(DeleteRequest.class))).thenReturn(future);

flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener);
latch = new CountDownLatch(1);
latchedActionListener = new LatchedActionListener<>(listener, latch);
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", null, latchedActionListener);
latch.await(1, TimeUnit.SECONDS);

ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("Failed to delete state", exceptionCaptor.getValue().getMessage());
assertEquals("Failed to delete .plugins-flow-framework-state entry : 1", exceptionCaptor.getValue().getMessage());

// test no index
when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false);
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", listener);
flowFrameworkIndicesHandler.deleteFlowFrameworkSystemIndexDoc("1", null, listener);

verify(listener, times(2)).onFailure(exceptionCaptor.capture());
assertEquals(
Expand Down

0 comments on commit c5a01c5

Please sign in to comment.