diff --git a/common/constants.ts b/common/constants.ts index 5e8062de..5d6e0678 100644 --- a/common/constants.ts +++ b/common/constants.ts @@ -31,6 +31,11 @@ export const ML_CONNECTOR_ROUTE_PREFIX = `${ML_API_ROUTE_PREFIX}/connectors`; export const ML_SEARCH_MODELS_ROUTE = `${ML_MODEL_ROUTE_PREFIX}/_search`; export const ML_SEARCH_CONNECTORS_ROUTE = `${ML_CONNECTOR_ROUTE_PREFIX}/_search`; +/** + * OpenSearch APIs + */ +export const SEARCH_PIPELINE_ROUTE = '/_search/pipeline'; + /** * NODE APIs */ @@ -40,10 +45,13 @@ export const BASE_NODE_API_PATH = '/api/flow_framework'; export const BASE_OPENSEARCH_NODE_API_PATH = `${BASE_NODE_API_PATH}/opensearch`; export const CAT_INDICES_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/catIndices`; export const GET_MAPPINGS_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/mappings`; +export const GET_INDEX_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/getIndex`; export const SEARCH_INDEX_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/search`; export const INGEST_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/ingest`; export const BULK_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/bulk`; export const SIMULATE_PIPELINE_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/simulatePipeline`; +export const INGEST_PIPELINE_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/getIngestPipeline`; +export const SEARCH_PIPELINE_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/getSearchPipeline`; // Flow Framework node APIs export const BASE_WORKFLOW_NODE_API_PATH = `${BASE_NODE_API_PATH}/workflow`; diff --git a/common/interfaces.ts b/common/interfaces.ts index baa08165..f9bda443 100644 --- a/common/interfaces.ts +++ b/common/interfaces.ts @@ -555,3 +555,18 @@ export type SimulateIngestPipelineResponse = { }; export type SearchHit = SimulateIngestPipelineDoc; + +export type IndexResponse = { + indexName: string; + indexDetails: IndexConfiguration; +}; + +export type IngestPipelineResponse = { + pipelineId: string; + ingestPipelineDetails: IngestPipelineConfig; +}; + +export type SearchPipelineResponse = { + pipelineId: string; + searchPipelineDetails: SearchPipelineConfig; +}; diff --git a/public/general_components/index.ts b/public/general_components/index.ts index 728fc38d..24c72304 100644 --- a/public/general_components/index.ts +++ b/public/general_components/index.ts @@ -5,4 +5,3 @@ export { MultiSelectFilter } from './multi_select_filter'; export { ProcessorsTitle } from './processors_title'; -export { ResourceList } from './resource_list'; diff --git a/public/general_components/resource_list.tsx b/public/general_components/resource_list.tsx deleted file mode 100644 index 3788f2ba..00000000 --- a/public/general_components/resource_list.tsx +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -import React, { useState, useEffect } from 'react'; -import { - EuiInMemoryTable, - Direction, - EuiFlexGroup, - EuiFlexItem, -} from '@elastic/eui'; -import { Workflow, WorkflowResource } from '../../common'; -import { columns } from '../pages/workflow_detail/tools/resources/columns'; - -interface ResourceListProps { - workflow?: Workflow; -} - -/** - * The searchable list of resources for a particular workflow. - */ -export function ResourceList(props: ResourceListProps) { - const [allResources, setAllResources] = useState([]); - - // Hook to initialize all resources. Reduce to unique IDs, since - // the backend resources may include the same resource multiple times - // (e.g., register and deploy steps persist the same model ID resource) - useEffect(() => { - if (props.workflow?.resourcesCreated) { - const resourcesMap = {} as { [id: string]: WorkflowResource }; - props.workflow.resourcesCreated.forEach((resource) => { - resourcesMap[resource.id] = resource; - }); - setAllResources(Object.values(resourcesMap)); - } - }, [props.workflow?.resourcesCreated]); - - const sorting = { - sort: { - field: 'id', - direction: 'asc' as Direction, - }, - }; - - return ( - - - - items={allResources} - rowHeader="id" - columns={columns} - sorting={sorting} - pagination={true} - message={'No existing resources found'} - /> - - - ); -} diff --git a/public/pages/workflow_detail/tools/resources/resource_list_with_flyout.tsx b/public/pages/workflow_detail/tools/resources/resource_list_with_flyout.tsx new file mode 100644 index 00000000..f2378c05 --- /dev/null +++ b/public/pages/workflow_detail/tools/resources/resource_list_with_flyout.tsx @@ -0,0 +1,219 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import React, { useState, useEffect } from 'react'; +import { + Direction, + EuiCodeBlock, + EuiFlexGroup, + EuiFlexItem, + EuiFlyout, + EuiFlyoutBody, + EuiFlyoutHeader, + EuiInMemoryTable, + EuiTitle, + EuiIcon, + EuiText, + EuiEmptyPrompt, + EuiLoadingSpinner, +} from '@elastic/eui'; +import { + Workflow, + WorkflowResource, + customStringify, +} from '../../../../../common'; +import { + AppState, + useAppDispatch, + getIndex, + getIngestPipeline, + getSearchPipeline, +} from '../../../../store'; +import { + extractIdsByStepType, + getDataSourceId, + getErrorMessageForStepType, +} from '../../../../utils'; +import { columns } from './columns'; +import { useSelector } from 'react-redux'; + +interface ResourceListFlyoutProps { + workflow?: Workflow; +} + +/** + * The searchable list of resources for a particular workflow. + */ +export function ResourceListWithFlyout(props: ResourceListFlyoutProps) { + const [allResources, setAllResources] = useState([]); + const dispatch = useAppDispatch(); + const dataSourceId = getDataSourceId(); + const [resourceDetails, setResourceDetails] = useState(null); + const [rowErrorMessage, setRowErrorMessage] = useState(null); + const { + loading, + getIndexErrorMessage, + getIngestPipelineErrorMessage, + getSearchPipelineErrorMessage, + indexDetails, + ingestPipelineDetails, + searchPipelineDetails, + } = useSelector((state: AppState) => state.opensearch); + + // Hook to initialize all resources. Reduce to unique IDs, since + // the backend resources may include the same resource multiple times + // (e.g., register and deploy steps persist the same model ID resource) + useEffect(() => { + if (props.workflow?.resourcesCreated) { + const resourcesMap = {} as { [id: string]: WorkflowResource }; + props.workflow.resourcesCreated.forEach((resource) => { + resourcesMap[resource.id] = resource; + }); + setAllResources(Object.values(resourcesMap)); + } + }, [props.workflow?.resourcesCreated]); + + useEffect(() => { + const { + indexIds, + ingestPipelineIds, + searchPipelineIds, + } = extractIdsByStepType(allResources); + + if (indexIds) { + try { + dispatch(getIndex({ index: indexIds, dataSourceId })); + } catch {} + } + + if (ingestPipelineIds) { + try { + dispatch( + getIngestPipeline({ pipelineId: ingestPipelineIds, dataSourceId }) + ); + } catch {} + } + + if (searchPipelineIds) { + try { + dispatch( + getSearchPipeline({ pipelineId: searchPipelineIds, dataSourceId }) + ); + } catch {} + } + }, [allResources]); + + const sorting = { + sort: { + field: 'id', + direction: 'asc' as Direction, + }, + }; + + const [isFlyoutVisible, setIsFlyoutVisible] = useState(false); + const [selectedRowData, setSelectedRowData] = useState< + WorkflowResource | undefined + >(undefined); + + // Opens the flyout and fetches resource details for the selected row. + const openFlyout = async (row: WorkflowResource) => { + setSelectedRowData(row); + setIsFlyoutVisible(true); + const value = + indexDetails[row.id] ?? + ingestPipelineDetails[row.id] ?? + searchPipelineDetails[row.id] ?? + ''; + setResourceDetails(customStringify({ [row.id]: value })); + const resourceDetailsErrorMessage = getErrorMessageForStepType( + row.stepType, + getIndexErrorMessage, + getIngestPipelineErrorMessage, + getSearchPipelineErrorMessage + ); + setRowErrorMessage(resourceDetailsErrorMessage); + }; + + // Closes the flyout and resets the selected resource data. + const closeFlyout = () => { + setIsFlyoutVisible(false); + setSelectedRowData(undefined); + setResourceDetails(null); + }; + + return ( + <> + + + + items={allResources} + rowHeader="id" + columns={[ + ...columns, + { + name: 'Actions', + width: '20%', + + render: (row: WorkflowResource) => ( + openFlyout(row)} + type="inspect" + size="m" + style={{ cursor: 'pointer' }} + /> + ), + }, + ]} + sorting={sorting} + pagination={true} + message={'No existing resources found'} + /> + + + {isFlyoutVisible && ( + + + +

{selectedRowData?.id}

+
+
+ + + + +

Resource details

+
+
+ + {!rowErrorMessage && !loading ? ( + + {resourceDetails} + + ) : loading ? ( + } + title={

Loading

} + /> + ) : ( + Error loading resource details} + body={

{rowErrorMessage}

} + /> + )} +
+
+
+
+ )} + + ); +} diff --git a/public/pages/workflow_detail/tools/resources/resources.tsx b/public/pages/workflow_detail/tools/resources/resources.tsx index 4449d998..7ad2e80e 100644 --- a/public/pages/workflow_detail/tools/resources/resources.tsx +++ b/public/pages/workflow_detail/tools/resources/resources.tsx @@ -11,7 +11,7 @@ import { EuiText, } from '@elastic/eui'; import { Workflow } from '../../../../../common'; -import { ResourceList } from '../../../../general_components'; +import { ResourceListWithFlyout } from './resource_list_with_flyout'; interface ResourcesProps { workflow?: Workflow; @@ -29,7 +29,7 @@ export function Resources(props: ResourcesProps) { <> - + diff --git a/public/pages/workflows/workflow_list/resource_list.tsx b/public/pages/workflows/workflow_list/resource_list.tsx new file mode 100644 index 00000000..48421258 --- /dev/null +++ b/public/pages/workflows/workflow_list/resource_list.tsx @@ -0,0 +1,240 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import React, { useState, useEffect } from 'react'; +import { columns } from '../../workflow_detail/tools/resources/columns'; +import { + EuiCodeBlock, + EuiFlexGroup, + EuiFlexItem, + EuiBasicTable, + EuiButtonIcon, + RIGHT_ALIGNMENT, + EuiText, + Direction, + EuiEmptyPrompt, + EuiLoadingSpinner, +} from '@elastic/eui'; +import { + Workflow, + WorkflowResource, + customStringify, +} from '../../../../common'; +import { + AppState, + getIndex, + getIngestPipeline, + getSearchPipeline, + useAppDispatch, +} from '../../../store'; +import { + extractIdsByStepType, + getDataSourceId, + getErrorMessageForStepType, +} from '../../../utils'; +import { useSelector } from 'react-redux'; + +interface ResourceListProps { + workflow?: Workflow; +} + +/** + * The searchable list of resources for a particular workflow. + */ +export function ResourceList(props: ResourceListProps) { + const [allResources, setAllResources] = useState([]); + const dispatch = useAppDispatch(); + const dataSourceId = getDataSourceId(); + const [itemIdToExpandedRowMap, setItemIdToExpandedRowMap] = useState<{ + [key: string]: any; + }>({}); + const { + loading, + getIndexErrorMessage, + getIngestPipelineErrorMessage, + getSearchPipelineErrorMessage, + indexDetails, + ingestPipelineDetails, + searchPipelineDetails, + } = useSelector((state: AppState) => state.opensearch); + const [pageIndex, setPageIndex] = useState(0); + const [pageSize, setPageSize] = useState(10); + const [sortField, setSortField] = useState('id'); + const [sortDirection, setSortDirection] = useState('asc'); + + // Hook to initialize all resources. Reduce to unique IDs, since + // the backend resources may include the same resource multiple times + // (e.g., register and deploy steps persist the same model ID resource) + useEffect(() => { + if (props.workflow?.resourcesCreated) { + const resourcesMap: { [id: string]: WorkflowResource } = {}; + props.workflow.resourcesCreated.forEach((resource) => { + resourcesMap[resource.id] = resource; + }); + setAllResources(Object.values(resourcesMap)); + } + }, [props.workflow?.resourcesCreated]); + + useEffect(() => { + const { + indexIds, + ingestPipelineIds, + searchPipelineIds, + } = extractIdsByStepType(allResources); + + if (indexIds) { + try { + dispatch(getIndex({ index: indexIds, dataSourceId })); + } catch {} + } + + if (ingestPipelineIds) { + try { + dispatch( + getIngestPipeline({ pipelineId: ingestPipelineIds, dataSourceId }) + ); + } catch {} + } + + if (searchPipelineIds) { + try { + dispatch( + getSearchPipeline({ pipelineId: searchPipelineIds, dataSourceId }) + ); + } catch {} + } + }, [allResources]); + + // Renders the expanded row to show resource details in a code block. + const renderExpandedRow = ( + data: any, + resourceDetailsErrorMessage?: string + ) => ( + + + +

Resource details

+
+
+ + {!resourceDetailsErrorMessage && !loading ? ( + + {customStringify(data)} + + ) : loading ? ( + } + title={

Loading

} + /> + ) : ( + Error loading resource details} + body={

{resourceDetailsErrorMessage}

} + /> + )} +
+
+ ); + + // Expands or collapses the details for a resource item. + const toggleDetails = async (item: WorkflowResource) => { + const updatedItemIdToExpandedRowMap = { ...itemIdToExpandedRowMap }; + + if (updatedItemIdToExpandedRowMap[item.id]) { + delete updatedItemIdToExpandedRowMap[item.id]; + setItemIdToExpandedRowMap(updatedItemIdToExpandedRowMap); + } else { + const value = + indexDetails[item.id] ?? + ingestPipelineDetails[item.id] ?? + searchPipelineDetails[item.id] ?? + ''; + const resourceDetailsErrorMessage = getErrorMessageForStepType( + item.stepType, + getIndexErrorMessage, + getIngestPipelineErrorMessage, + getSearchPipelineErrorMessage + ); + + const result = { [item.id]: value }; + if (result) { + setItemIdToExpandedRowMap((prevMap) => ({ + ...prevMap, + [item.id]: renderExpandedRow(result, resourceDetailsErrorMessage), + })); + } + } + }; + + // Handles table pagination and sorting. + const onTableChange = ({ + page = { index: pageIndex, size: pageSize }, + sort = { field: 'id', direction: 'asc' }, + }: any) => { + setPageIndex(page.index); + setPageSize(page.size); + setSortField(sort.field as keyof WorkflowResource); + setSortDirection(sort.direction); + }; + + const sortedItems = [...allResources].sort((a, b) => { + const multiplier = sortDirection === 'asc' ? 1 : -1; + return a[sortField] > b[sortField] + ? 1 * multiplier + : a[sortField] < b[sortField] + ? -1 * multiplier + : 0; + }); + + const pagination = { + pageIndex, + pageSize, + totalItemCount: allResources.length, + pageSizeOptions: [10, 25, 50], + }; + + return ( + + + ( + toggleDetails(item)} + aria-label={ + itemIdToExpandedRowMap[item.id] ? 'Collapse' : 'Expand' + } + iconType={ + itemIdToExpandedRowMap[item.id] ? 'arrowUp' : 'arrowDown' + } + /> + ), + }, + ]} + items={sortedItems} + noItemsMessage={'No existing resources found'} + /> + + + ); +} diff --git a/public/pages/workflows/workflow_list/workflow_list.test.tsx b/public/pages/workflows/workflow_list/workflow_list.test.tsx index f5f38645..a7f2d1b2 100644 --- a/public/pages/workflows/workflow_list/workflow_list.test.tsx +++ b/public/pages/workflows/workflow_list/workflow_list.test.tsx @@ -14,6 +14,7 @@ import { mockStore } from '../../../../test/utils'; import { WORKFLOW_TYPE } from '../../../../common'; import configureStore from 'redux-mock-store'; import { WorkflowInput } from '../../../../test/interfaces'; +import { INITIAL_OPENSEARCH_STATE } from '../../../store'; jest.mock('../../../services', () => { const { mockCoreServices } = require('../../../../test'); @@ -33,6 +34,7 @@ const workflowSet: WorkflowInput[] = Array.from({ length: 20 }, (_, index) => ({ const mockStore1 = configureStore([]); const initialState = { + opensearch: INITIAL_OPENSEARCH_STATE, workflows: { loading: false, errorMessage: '', diff --git a/public/pages/workflows/workflow_list/workflow_list.tsx b/public/pages/workflows/workflow_list/workflow_list.tsx index fe832f14..e6ae4881 100644 --- a/public/pages/workflows/workflow_list/workflow_list.tsx +++ b/public/pages/workflows/workflow_list/workflow_list.tsx @@ -28,9 +28,10 @@ import { getCharacterLimitedString, } from '../../../../common'; import { columns } from './columns'; -import { MultiSelectFilter, ResourceList } from '../../../general_components'; +import { MultiSelectFilter } from '../../../general_components'; import { WORKFLOWS_TAB } from '../workflows'; import { DeleteWorkflowModal } from './delete_workflow_modal'; +import { ResourceList } from './resource_list'; interface WorkflowListProps { setSelectedTabId: (tabId: WORKFLOWS_TAB) => void; diff --git a/public/route_service.ts b/public/route_service.ts index bd0b4c23..c093fc91 100644 --- a/public/route_service.ts +++ b/public/route_service.ts @@ -26,6 +26,9 @@ import { BASE_NODE_API_PATH, SEARCH_CONNECTORS_NODE_API_PATH, GET_MAPPINGS_NODE_API_PATH, + SEARCH_PIPELINE_NODE_API_PATH, + INGEST_PIPELINE_NODE_API_PATH, + GET_INDEX_NODE_API_PATH, } from '../common'; /** @@ -85,6 +88,10 @@ export interface RouteService { index: string, dataSourceId?: string ) => Promise; + getIndex: ( + index: string, + dataSourceId?: string + ) => Promise; searchIndex: ({ index, body, @@ -118,7 +125,6 @@ export interface RouteService { body: {}, dataSourceId?: string ) => Promise; - simulatePipeline: ( body: { pipeline: IngestPipelineConfig; @@ -126,6 +132,14 @@ export interface RouteService { }, dataSourceId?: string ) => Promise; + getIngestPipeline: ( + pipelineId: string, + dataSourceId?: string + ) => Promise; + getSearchPipeline: ( + pipelineId: string, + dataSourceId?: string + ) => Promise; } export function configureRoutes(core: CoreStart): RouteService { @@ -288,6 +302,19 @@ export function configureRoutes(core: CoreStart): RouteService { return e as HttpFetchError; } }, + getIndex: async (index: string, dataSourceId?: string) => { + try { + const url = dataSourceId + ? `${BASE_NODE_API_PATH}/${dataSourceId}/opensearch/getIndex` + : GET_INDEX_NODE_API_PATH; + const response = await core.http.get<{ respString: string }>( + `${url}/${index}` + ); + return response; + } catch (e: any) { + return e as HttpFetchError; + } + }, searchIndex: async ({ index, body, @@ -398,5 +425,31 @@ export function configureRoutes(core: CoreStart): RouteService { return e as HttpFetchError; } }, + getSearchPipeline: async (pipelineId: string, dataSourceId?: string) => { + try { + const url = dataSourceId + ? `${BASE_NODE_API_PATH}/${dataSourceId}/opensearch/getSearchPipeline` + : SEARCH_PIPELINE_NODE_API_PATH; + const response = await core.http.get<{ respString: string }>( + `${url}/${pipelineId}` + ); + return response; + } catch (e: any) { + return e as HttpFetchError; + } + }, + getIngestPipeline: async (pipelineId: string, dataSourceId?: string) => { + try { + const url = dataSourceId + ? `${BASE_NODE_API_PATH}/${dataSourceId}/opensearch/getIngestPipeline` + : INGEST_PIPELINE_NODE_API_PATH; + const response = await core.http.get<{ respString: string }>( + `${url}/${pipelineId}` + ); + return response; + } catch (e: any) { + return e as HttpFetchError; + } + }, }; } diff --git a/public/store/reducers/opensearch_reducer.ts b/public/store/reducers/opensearch_reducer.ts index 4fe76b64..75d905eb 100644 --- a/public/store/reducers/opensearch_reducer.ts +++ b/public/store/reducers/opensearch_reducer.ts @@ -7,7 +7,12 @@ import { createAsyncThunk, createSlice } from '@reduxjs/toolkit'; import { getRouteService } from '../../services'; import { Index, + IndexConfiguration, + IndexResponse, IngestPipelineConfig, + IngestPipelineResponse, + SearchPipelineConfig, + SearchPipelineResponse, OMIT_SYSTEM_INDEX_PATTERN, SimulateIngestPipelineDoc, } from '../../../common'; @@ -16,7 +21,13 @@ import { HttpFetchError } from '../../../../../src/core/public'; export const INITIAL_OPENSEARCH_STATE = { loading: false, errorMessage: '', + getIndexErrorMessage: '', + getSearchPipelineErrorMessage: '', + getIngestPipelineErrorMessage: '', indices: {} as { [key: string]: Index }, + indexDetails: {} as { [key: string]: IndexConfiguration }, + ingestPipelineDetails: {} as { [key: string]: IngestPipelineConfig }, + searchPipelineDetails: {} as { [key: string]: SearchPipelineConfig }, }; const OPENSEARCH_PREFIX = 'opensearch'; @@ -26,6 +37,9 @@ const SEARCH_INDEX_ACTION = `${OPENSEARCH_PREFIX}/search`; const INGEST_ACTION = `${OPENSEARCH_PREFIX}/ingest`; const BULK_ACTION = `${OPENSEARCH_PREFIX}/bulk`; const SIMULATE_PIPELINE_ACTION = `${OPENSEARCH_PREFIX}/simulatePipeline`; +const GET_INGEST_PIPELINE_ACTION = `${OPENSEARCH_PREFIX}/getIngestPipeline`; +const GET_SEARCH_PIPELINE_ACTION = `${OPENSEARCH_PREFIX}/getSearchPipeline`; +const GET_INDEX_ACTION = `${OPENSEARCH_PREFIX}/getIndex`; export const catIndices = createAsyncThunk( CAT_INDICES_ACTION, @@ -69,6 +83,26 @@ export const getMappings = createAsyncThunk( } ); +export const getIndex = createAsyncThunk( + GET_INDEX_ACTION, + async ( + { index, dataSourceId }: { index: string; dataSourceId?: string }, + { rejectWithValue } + ) => { + const response: any | HttpFetchError = await getRouteService().getIndex( + index, + dataSourceId + ); + if (response instanceof HttpFetchError) { + return rejectWithValue( + 'Error getting index settings and mappings: ' + response.body.message + ); + } else { + return response; + } + } +); + export const searchIndex = createAsyncThunk( SEARCH_INDEX_ACTION, async ( @@ -181,6 +215,62 @@ export const simulatePipeline = createAsyncThunk( } ); +export const getSearchPipeline = createAsyncThunk( + GET_SEARCH_PIPELINE_ACTION, + async ( + { + pipelineId, + dataSourceId, + }: { + pipelineId: string; + dataSourceId?: string; + }, + { rejectWithValue } + ) => { + const response: + | any + | HttpFetchError = await getRouteService().getSearchPipeline( + pipelineId, + dataSourceId + ); + if (response instanceof HttpFetchError) { + return rejectWithValue( + 'Error fetching search pipeline: ' + response.body.message + ); + } else { + return response; + } + } +); + +export const getIngestPipeline = createAsyncThunk( + GET_INGEST_PIPELINE_ACTION, + async ( + { + pipelineId, + dataSourceId, + }: { + pipelineId: string; + dataSourceId?: string; + }, + { rejectWithValue } + ) => { + const response: + | any + | HttpFetchError = await getRouteService().getIngestPipeline( + pipelineId, + dataSourceId + ); + if (response instanceof HttpFetchError) { + return rejectWithValue( + 'Error fetching ingest pipeline: ' + response.body.message + ); + } else { + return response; + } + } +); + const opensearchSlice = createSlice({ name: OPENSEARCH_PREFIX, initialState: INITIAL_OPENSEARCH_STATE, @@ -195,6 +285,18 @@ const opensearchSlice = createSlice({ state.loading = true; state.errorMessage = ''; }) + .addCase(getIndex.pending, (state, action) => { + state.loading = true; + state.getIndexErrorMessage = ''; + }) + .addCase(getIngestPipeline.pending, (state, action) => { + state.loading = true; + state.getIngestPipelineErrorMessage = ''; + }) + .addCase(getSearchPipeline.pending, (state, action) => { + state.loading = true; + state.getSearchPipelineErrorMessage = ''; + }) .addCase(searchIndex.pending, (state, action) => { state.loading = true; state.errorMessage = ''; @@ -216,6 +318,43 @@ const opensearchSlice = createSlice({ state.loading = false; state.errorMessage = ''; }) + .addCase(getIndex.fulfilled, (state, action) => { + const resourceDetailsMap = new Map(); + action.payload.forEach((index: IndexResponse) => { + resourceDetailsMap.set(index.indexName, index.indexDetails); + }); + state.indexDetails = Object.fromEntries(resourceDetailsMap.entries()); + state.loading = false; + state.getIndexErrorMessage = ''; + }) + .addCase(getSearchPipeline.fulfilled, (state, action) => { + const resourceDetailsMap = new Map(); + action.payload.forEach((pipeline: SearchPipelineResponse) => { + resourceDetailsMap.set( + pipeline.pipelineId, + pipeline.searchPipelineDetails + ); + }); + state.searchPipelineDetails = Object.fromEntries( + resourceDetailsMap.entries() + ); + state.loading = false; + state.getSearchPipelineErrorMessage = ''; + }) + .addCase(getIngestPipeline.fulfilled, (state, action) => { + const resourceDetailsMap = new Map(); + action.payload.forEach((pipeline: IngestPipelineResponse) => { + resourceDetailsMap.set( + pipeline.pipelineId, + pipeline.ingestPipelineDetails + ); + }); + state.ingestPipelineDetails = Object.fromEntries( + resourceDetailsMap.entries() + ); + state.loading = false; + state.getIngestPipelineErrorMessage = ''; + }) .addCase(searchIndex.fulfilled, (state, action) => { state.loading = false; state.errorMessage = ''; @@ -232,6 +371,18 @@ const opensearchSlice = createSlice({ state.errorMessage = action.payload as string; state.loading = false; }) + .addCase(getIndex.rejected, (state, action) => { + state.getIndexErrorMessage = action.payload as string; + state.loading = false; + }) + .addCase(getIngestPipeline.rejected, (state, action) => { + state.getIngestPipelineErrorMessage = action.payload as string; + state.loading = false; + }) + .addCase(getSearchPipeline.rejected, (state, action) => { + state.getSearchPipelineErrorMessage = action.payload as string; + state.loading = false; + }) .addCase(searchIndex.rejected, (state, action) => { state.errorMessage = action.payload as string; state.loading = false; diff --git a/public/utils/utils.ts b/public/utils/utils.ts index 411447b3..34970ce6 100644 --- a/public/utils/utils.ts +++ b/public/utils/utils.ts @@ -20,6 +20,7 @@ import { WORKFLOW_RESOURCE_TYPE, WORKFLOW_STEP_TYPE, Workflow, + WorkflowResource, } from '../../common'; import { getCore, getDataSourceEnabled } from '../services'; import { @@ -367,3 +368,57 @@ export const dataSourceFilterFn = ( ) ); }; + +export const extractIdsByStepType = (resources: WorkflowResource[]) => { + const ids = resources.reduce( + ( + acc: { + indexIds: string[]; + ingestPipelineIds: string[]; + searchPipelineIds: string[]; + }, + item: WorkflowResource + ) => { + switch (item.stepType) { + case WORKFLOW_STEP_TYPE.CREATE_INDEX_STEP_TYPE: + acc.indexIds.push(item.id); + break; + case WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE: + acc.ingestPipelineIds.push(item.id); + break; + case WORKFLOW_STEP_TYPE.CREATE_SEARCH_PIPELINE_STEP_TYPE: + acc.searchPipelineIds.push(item.id); + break; + } + return acc; + }, + { indexIds: [], ingestPipelineIds: [], searchPipelineIds: [] } + ); + + return { + indexIds: ids.indexIds.join(','), + ingestPipelineIds: ids.ingestPipelineIds.join(','), + searchPipelineIds: ids.searchPipelineIds.join(','), + }; +}; + +export const getErrorMessageForStepType = ( + stepType: WORKFLOW_STEP_TYPE, + getIndexErrorMessage: string, + getIngestPipelineErrorMessage: string, + getSearchPipelineErrorMessage: string +) => { + switch (stepType) { + case WORKFLOW_STEP_TYPE.CREATE_INDEX_STEP_TYPE: + return getIndexErrorMessage; + + case WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE: + return getIngestPipelineErrorMessage; + + case WORKFLOW_STEP_TYPE.CREATE_SEARCH_PIPELINE_STEP_TYPE: + return getSearchPipelineErrorMessage; + + default: + return ''; + } +}; diff --git a/server/cluster/core_plugin.ts b/server/cluster/core_plugin.ts new file mode 100644 index 00000000..e05eb40b --- /dev/null +++ b/server/cluster/core_plugin.ts @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { SEARCH_PIPELINE_ROUTE } from '../../common'; + +export function corePlugin(Client: any, config: any, components: any) { + const ca = components.clientAction.factory; + + Client.prototype.coreClient = components.clientAction.namespaceFactory(); + const coreClient = Client.prototype.coreClient.prototype; + + coreClient.getSearchPipeline = ca({ + url: { + fmt: `${SEARCH_PIPELINE_ROUTE}/<%=pipeline_id%>`, + req: { + pipeline_id: { + type: 'string', + required: true, + }, + }, + }, + method: 'GET', + }); +} diff --git a/server/cluster/index.ts b/server/cluster/index.ts index a29304ea..63b257e7 100644 --- a/server/cluster/index.ts +++ b/server/cluster/index.ts @@ -5,3 +5,4 @@ export * from './flow_framework_plugin'; export * from './ml_plugin'; +export * from './core_plugin'; diff --git a/server/plugin.ts b/server/plugin.ts index 98ae9f76..78305515 100644 --- a/server/plugin.ts +++ b/server/plugin.ts @@ -11,7 +11,7 @@ import { Logger, } from '../../../src/core/server'; import { first } from 'rxjs/operators'; -import { flowFrameworkPlugin, mlPlugin } from './cluster'; +import { flowFrameworkPlugin, mlPlugin, corePlugin } from './cluster'; import { FlowFrameworkDashboardsPluginSetup, FlowFrameworkDashboardsPluginStart, @@ -62,7 +62,7 @@ export class FlowFrameworkDashboardsPlugin const client: ILegacyClusterClient = core.opensearch.legacy.createClient( 'flow_framework', { - plugins: [flowFrameworkPlugin, mlPlugin], + plugins: [flowFrameworkPlugin, mlPlugin, corePlugin], ...globalConfig.opensearch, } ); @@ -71,6 +71,7 @@ export class FlowFrameworkDashboardsPlugin if (dataSourceEnabled) { dataSource.registerCustomApiSchema(flowFrameworkPlugin); dataSource.registerCustomApiSchema(mlPlugin); + dataSource.registerCustomApiSchema(corePlugin); } const opensearchRoutesService = new OpenSearchRoutesService( client, diff --git a/server/routes/opensearch_routes_service.ts b/server/routes/opensearch_routes_service.ts index 4f9f7c04..2ba07300 100644 --- a/server/routes/opensearch_routes_service.ts +++ b/server/routes/opensearch_routes_service.ts @@ -15,13 +15,19 @@ import { BASE_NODE_API_PATH, BULK_NODE_API_PATH, CAT_INDICES_NODE_API_PATH, + GET_INDEX_NODE_API_PATH, GET_MAPPINGS_NODE_API_PATH, INGEST_NODE_API_PATH, + INGEST_PIPELINE_NODE_API_PATH, Index, IndexMappings, + IndexResponse, IngestPipelineConfig, + IngestPipelineResponse, SEARCH_INDEX_NODE_API_PATH, + SEARCH_PIPELINE_NODE_API_PATH, SIMULATE_PIPELINE_NODE_API_PATH, + SearchPipelineResponse, SimulateIngestPipelineDoc, SimulateIngestPipelineResponse, } from '../../common'; @@ -82,6 +88,29 @@ export function registerOpenSearchRoutes( }, opensearchRoutesService.getMappings ); + router.get( + { + path: `${GET_INDEX_NODE_API_PATH}/{index}`, + validate: { + params: schema.object({ + index: schema.string(), + }), + }, + }, + opensearchRoutesService.getIndex + ); + router.get( + { + path: `${BASE_NODE_API_PATH}/{data_source_id}/opensearch/getIndex/{index}`, + validate: { + params: schema.object({ + index: schema.string(), + data_source_id: schema.string(), + }), + }, + }, + opensearchRoutesService.getIndex + ); router.post( { path: `${SEARCH_INDEX_NODE_API_PATH}/{index}`, @@ -232,6 +261,52 @@ export function registerOpenSearchRoutes( }, opensearchRoutesService.simulatePipeline ); + router.get( + { + path: `${INGEST_PIPELINE_NODE_API_PATH}/{pipeline_id}`, + validate: { + params: schema.object({ + pipeline_id: schema.string(), + }), + }, + }, + opensearchRoutesService.getIngestPipeline + ); + router.get( + { + path: `${BASE_NODE_API_PATH}/{data_source_id}/opensearch/getIngestPipeline/{pipeline_id}`, + validate: { + params: schema.object({ + pipeline_id: schema.string(), + data_source_id: schema.string(), + }), + }, + }, + opensearchRoutesService.getIngestPipeline + ); + router.get( + { + path: `${SEARCH_PIPELINE_NODE_API_PATH}/{pipeline_id}`, + validate: { + params: schema.object({ + pipeline_id: schema.string(), + }), + }, + }, + opensearchRoutesService.getSearchPipeline + ); + router.get( + { + path: `${BASE_NODE_API_PATH}/{data_source_id}/opensearch/getSearchPipeline/{pipeline_id}`, + validate: { + params: schema.object({ + pipeline_id: schema.string(), + data_source_id: schema.string(), + }), + }, + }, + opensearchRoutesService.getSearchPipeline + ); } export class OpenSearchRoutesService { @@ -308,6 +383,38 @@ export class OpenSearchRoutesService { } }; + getIndex = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const { index } = req.params as { index: string }; + const { data_source_id = '' } = req.params as { data_source_id?: string }; + try { + const callWithRequest = getClientBasedOnDataSource( + context, + this.dataSourceEnabled, + req, + data_source_id, + this.client + ); + const response = await callWithRequest('indices.get', { + index, + }); + // re-formatting the results to match IndexResponse + const cleanedIndexDetails = Object.entries(response).map( + ([indexName, indexDetails]) => ({ + indexName, + indexDetails, + }) + ) as IndexResponse[]; + + return res.ok({ body: cleanedIndexDetails }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; + searchIndex = async ( context: RequestHandlerContext, req: OpenSearchDashboardsRequest, @@ -429,4 +536,73 @@ export class OpenSearchRoutesService { return generateCustomError(res, err); } }; + + getIngestPipeline = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const { pipeline_id } = req.params as { pipeline_id: string }; + const { data_source_id = '' } = req.params as { data_source_id?: string }; + + try { + const callWithRequest = getClientBasedOnDataSource( + context, + this.dataSourceEnabled, + req, + data_source_id, + this.client + ); + + const response = await callWithRequest('ingest.getPipeline', { + id: pipeline_id, + }); + // re-formatting the results to match IngestPipelineResponse + const cleanedIngestPipelineDetails = Object.entries(response).map( + ([pipelineId, ingestPipelineDetails]) => ({ + pipelineId, + ingestPipelineDetails, + }) + ) as IngestPipelineResponse[]; + + return res.ok({ body: cleanedIngestPipelineDetails }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; + + getSearchPipeline = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const { pipeline_id } = req.params as { pipeline_id: string }; + const { data_source_id = '' } = req.params as { data_source_id?: string }; + + try { + const callWithRequest = getClientBasedOnDataSource( + context, + this.dataSourceEnabled, + req, + data_source_id, + this.client + ); + + const response = await callWithRequest('coreClient.getSearchPipeline', { + pipeline_id: pipeline_id, + }); + + // re-formatting the results to match SearchPipelineResponse + const cleanedSearchPipelineDetails = Object.entries(response).map( + ([pipelineId, searchPipelineDetails]) => ({ + pipelineId, + searchPipelineDetails, + }) + ) as SearchPipelineResponse[]; + + return res.ok({ body: cleanedSearchPipelineDetails }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; }