From f9a0511bb1cfe239b6d3af3b66035561aa6317fe Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Wed, 29 Jan 2025 14:20:56 -0800 Subject: [PATCH 1/2] Integrate with synchronous provisioning feature Signed-off-by: Tyler Ohlsen --- common/constants.ts | 2 +- .../workflow_inputs/workflow_inputs.tsx | 2 -- public/utils/utils.ts | 2 +- server/cluster/flow_framework_plugin.ts | 21 +++++++++++++++--- .../routes/flow_framework_routes_service.ts | 22 +++++++++++++------ 5 files changed, 35 insertions(+), 14 deletions(-) diff --git a/common/constants.ts b/common/constants.ts index a64c1d97..d4719221 100644 --- a/common/constants.ts +++ b/common/constants.ts @@ -654,6 +654,7 @@ export const MAX_BYTES_FORMATTED = '1,048,576'; export const MAX_WORKFLOW_NAME_TO_DISPLAY = 40; export const WORKFLOW_NAME_REGEXP = RegExp('^[a-zA-Z0-9_-]*$'); export const INDEX_NAME_REGEXP = WORKFLOW_NAME_REGEXP; +export const PROVISION_TIMEOUT = '10s'; // the timeout config for synchronous provisioning. https://github.com/opensearch-project/flow-framework/pull/990 export const EMPTY_MAP_ENTRY = { key: '', value: '' } as MapEntry; export const EMPTY_INPUT_MAP_ENTRY = { key: '', @@ -662,7 +663,6 @@ export const EMPTY_INPUT_MAP_ENTRY = { value: '', }, } as InputMapEntry; - export const EMPTY_OUTPUT_MAP_ENTRY = { ...EMPTY_INPUT_MAP_ENTRY, value: { diff --git a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx index ed5f5acb..4ac17c99 100644 --- a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx +++ b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx @@ -400,8 +400,6 @@ export function WorkflowInputs(props: WorkflowInputsProps) { ) .unwrap() .then(async (result) => { - await sleep(1000); - await dispatch( getWorkflow({ workflowId: updatedWorkflow.id as string, diff --git a/public/utils/utils.ts b/public/utils/utils.ts index 618c7bf8..6b75db94 100644 --- a/public/utils/utils.ts +++ b/public/utils/utils.ts @@ -626,7 +626,7 @@ export function getEmbeddingModelDimensions( // so we check for that first. if (connector?.parameters?.dimensions !== undefined) { return connector.parameters?.dimensions; - } else if (connector.parameters?.model !== undefined) { + } else if (connector?.parameters?.model !== undefined) { return ( // @ts-ignore COHERE_CONFIGS[connector.parameters?.model]?.dimension || diff --git a/server/cluster/flow_framework_plugin.ts b/server/cluster/flow_framework_plugin.ts index c01f81b7..78e10066 100644 --- a/server/cluster/flow_framework_plugin.ts +++ b/server/cluster/flow_framework_plugin.ts @@ -7,6 +7,7 @@ import { FLOW_FRAMEWORK_SEARCH_WORKFLOWS_ROUTE, FLOW_FRAMEWORK_SEARCH_WORKFLOW_STATE_ROUTE, FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX, + PROVISION_TIMEOUT, } from '../../common'; /** @@ -75,7 +76,7 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) { flowFramework.updateWorkflow = ca({ url: { - fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>?update_fields=<%=update_fields%>&reprovision=<%=reprovision%>`, + fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>?update_fields=<%=update_fields%>`, req: { workflow_id: { type: 'string', @@ -85,7 +86,21 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) { type: 'boolean', required: true, }, - reprovision: { + }, + }, + needBody: true, + method: 'PUT', + }); + + flowFramework.updateAndReprovisionWorkflow = ca({ + url: { + fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>?update_fields=<%=update_fields%>&reprovision=true&wait_for_completion_timeout=${PROVISION_TIMEOUT}`, + req: { + workflow_id: { + type: 'string', + required: true, + }, + update_fields: { type: 'boolean', required: true, }, @@ -97,7 +112,7 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) { flowFramework.provisionWorkflow = ca({ url: { - fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>/_provision`, + fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>/_provision?wait_for_completion_timeout=${PROVISION_TIMEOUT}`, req: { workflow_id: { type: 'string', diff --git a/server/routes/flow_framework_routes_service.ts b/server/routes/flow_framework_routes_service.ts index 522373a4..4897868e 100644 --- a/server/routes/flow_framework_routes_service.ts +++ b/server/routes/flow_framework_routes_service.ts @@ -475,13 +475,21 @@ export class FlowFrameworkRoutesService { data_source_id, this.client ); - await callWithRequest('flowFramework.updateWorkflow', { - workflow_id, - // default update_fields to false if not explicitly set otherwise - update_fields: update_fields, - reprovision: reprovision, - body: workflowTemplate, - }); + if (reprovision) { + await callWithRequest('flowFramework.updateAndReprovisionWorkflow', { + workflow_id, + // default update_fields to false if not explicitly set otherwise + update_fields, + body: workflowTemplate, + }); + } else { + await callWithRequest('flowFramework.updateWorkflow', { + workflow_id, + // default update_fields to false if not explicitly set otherwise + update_fields, + body: workflowTemplate, + }); + } return res.ok({ body: { workflowId: workflow_id, workflowTemplate } }); } catch (err: any) { From 8cc3f6e4d9646278655a63b277a13afea5a23dfa Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Wed, 29 Jan 2025 14:26:34 -0800 Subject: [PATCH 2/2] Remove remaining sleeps Signed-off-by: Tyler Ohlsen --- .../workflow_detail/workflow_inputs/workflow_inputs.tsx | 6 ------ 1 file changed, 6 deletions(-) diff --git a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx index 4ac17c99..71e10484 100644 --- a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx +++ b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx @@ -49,7 +49,6 @@ import { hasProvisionedIngestResources, hasProvisionedSearchResources, generateId, - sleep, getResourcesToBeForceDeleted, getDataSourceId, } from '../../../utils'; @@ -341,7 +340,6 @@ export function WorkflowInputs(props: WorkflowInputsProps) { ) .unwrap() .then(async (result) => { - await sleep(1000); props.setUnsavedIngestProcessors(false); props.setUnsavedSearchProcessors(false); success = true; @@ -386,13 +384,9 @@ export function WorkflowInputs(props: WorkflowInputsProps) { ) .unwrap() .then(async (result) => { - await sleep(1000); props.setUnsavedIngestProcessors(false); props.setUnsavedSearchProcessors(false); await dispatch( - // TODO: update to be synchronous provisioning, to prevent - // having to wait/sleep before performing next actions. - // https://github.com/opensearch-project/flow-framework/pull/1009 provisionWorkflow({ workflowId: updatedWorkflow.id as string, dataSourceId,