From 04f907252808bced7e49a1db10bff99747f7bfb7 Mon Sep 17 00:00:00 2001 From: Wink <809097465@qq.com> Date: Sun, 15 Dec 2024 17:50:02 +0800 Subject: [PATCH] [Optimization-4047][web] Add push task into DolphinScheduler (#4048) --- .../dinky/controller/SchedulerController.java | 3 - .../service/impl/SchedulerServiceImpl.java | 21 +- .../SqlTask/PushDolphin/constants.tsx | 90 +++++ .../SqlTask/PushDolphin/function.tsx | 55 +++ .../SqlTask/PushDolphin/index.tsx | 329 ++++++++++++++++++ .../CenterTabContent/SqlTask/index.tsx | 106 +++++- 6 files changed, 589 insertions(+), 15 deletions(-) create mode 100644 dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin/constants.tsx create mode 100644 dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin/function.tsx create mode 100644 dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin/index.tsx diff --git a/dinky-admin/src/main/java/org/dinky/controller/SchedulerController.java b/dinky-admin/src/main/java/org/dinky/controller/SchedulerController.java index fe11a91657..cdcf932eb3 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/SchedulerController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/SchedulerController.java @@ -66,9 +66,6 @@ public class SchedulerController { example = "1") public Result getTaskDefinition(@ApiParam(value = "dinky任务id") @RequestParam Long dinkyTaskId) { TaskDefinition taskDefinitionInfo = schedulerService.getTaskDefinitionInfo(dinkyTaskId); - if (taskDefinitionInfo == null) { - return Result.failed(Status.DS_TASK_NOT_EXIST); - } return Result.succeed(taskDefinitionInfo); } diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/SchedulerServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/SchedulerServiceImpl.java index 1a8d1ec26f..a9b03a7bd2 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/SchedulerServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/SchedulerServiceImpl.java @@ -43,6 +43,7 @@ import org.dinky.service.catalogue.CatalogueService; import org.dinky.utils.JsonUtils; +import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -97,7 +98,7 @@ public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) { // Get process from dolphin scheduler ProcessDefinition process = processClient.getProcessDefinitionInfo(projectCode, processName); - String taskName = catalogue.getName() + ":" + catalogue.getId(); + String taskName = catalogue.getName(); dinkyTaskRequest.setName(taskName); TaskRequest taskRequest = new TaskRequest(); @@ -180,7 +181,8 @@ private void updateProcessDefinition(ProcessDefinition process, Long taskCode, T } List processTaskRelationList = dagData.getProcessTaskRelationList(); List taskDefinitionList = dagData.getTaskDefinitionList(); - List locations = JsonUtils.toList(process.getLocations(), DagNodeLocation.class); + List locations = + new ArrayList<>(JsonUtils.toList(process.getLocations(), DagNodeLocation.class)); if (CollUtil.isNotEmpty(locations)) { boolean matched = locations.stream().anyMatch(location -> location.getTaskCode() == taskCode); @@ -307,8 +309,7 @@ public List getTaskMainInfos(long dinkyTaskId) { long projectCode = SystemInit.getProject().getCode(); List taskMainInfos = taskClient.getTaskMainInfos(projectCode, "", "", ""); // 去掉本身 - taskMainInfos.removeIf(taskMainInfo -> - (catalogue.getName() + ":" + catalogue.getId()).equalsIgnoreCase(taskMainInfo.getTaskName())); + taskMainInfos.removeIf(taskMainInfo -> (catalogue.getName()).equalsIgnoreCase(taskMainInfo.getTaskName())); return taskMainInfos; } @@ -331,18 +332,16 @@ public TaskDefinition getTaskDefinitionInfo(long dinkyTaskId) { long projectCode = dinkyProject.getCode(); String processName = getDinkyNames(catalogue, 0); - String taskName = catalogue.getName() + ":" + catalogue.getId(); + String taskName = catalogue.getName(); TaskMainInfo taskMainInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName, TASK_TYPE); TaskDefinition taskDefinition; if (taskMainInfo == null) { - log.error(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST.getMessage(), processName, taskName); - throw new BusException(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST, processName, taskName); + return null; } taskDefinition = taskClient.getTaskDefinition(projectCode, taskMainInfo.getTaskCode()); if (taskDefinition == null) { - log.error(Status.DS_WORK_FLOW_NOT_SAVE.getMessage()); - throw new BusException(Status.DS_WORK_FLOW_NOT_SAVE); + return null; } taskDefinition.setProcessDefinitionCode(taskMainInfo.getProcessDefinitionCode()); @@ -380,12 +379,12 @@ private String getDinkyNames(Catalogue catalogue, int i) { throw new SchedulerException("Get Node List Error"); } - String name = i == 0 ? catalogue.getName() + ":" + catalogue.getId() : catalogue.getName(); + String name = i == 0 ? catalogue.getName() : catalogue.getName(); String next = getDinkyNames(catalogue, ++i); if (Strings.isNullOrEmpty(next)) { return name; } - return name + "_" + next; + return name + "/" + next; } } diff --git a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin/constants.tsx b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin/constants.tsx new file mode 100644 index 0000000000..bd06adc173 --- /dev/null +++ b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin/constants.tsx @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { l } from '@/utils/intl'; +import { Badge, Space } from 'antd'; +import { CheckboxOptionType } from 'antd/es/checkbox/Group'; +import { DefaultOptionType } from 'antd/es/select'; + +/** + * priority list for select | 优先级列表 + */ +export const PriorityList: DefaultOptionType[] = [ + { + label: ( + + + Highest + + ), + value: 'HIGHEST', + key: 'HIGHEST' + }, + { + label: ( + + + High + + ), + value: 'HIGH', + key: 'HIGH' + }, + { + label: ( + + + Medium + + ), + value: 'MEDIUM', + key: 'MEDIUM' + }, + { + label: ( + + + Low + + ), + value: 'LOW', + key: 'LOW' + }, + { + label: ( + + + Lowest + + ), + value: 'LOWEST', + key: 'LOWEST' + } +]; + +export const TimeoutNotifyStrategy: CheckboxOptionType[] = [ + { + label: l('datastudio.header.pushdolphin.timeoutFlag.warn'), + value: 'WARN' + }, + { + label: l('datastudio.header.pushdolphin.timeoutFlag.failed'), + value: 'FAILED' + } +]; diff --git a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin/function.tsx b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin/function.tsx new file mode 100644 index 0000000000..91b3b4c215 --- /dev/null +++ b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin/function.tsx @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { DolphinTaskDefinition, PushDolphinParams } from '@/types/Studio/data.d'; + +export const transformPushDolphinParams = ( + dolphinTaskDefinition: DolphinTaskDefinition, + pushDolphinParams: PushDolphinParams, + toFormValues: boolean +) => { + if (toFormValues && dolphinTaskDefinition) { + const transformValue: PushDolphinParams = { + ...pushDolphinParams, + description: dolphinTaskDefinition.description, + timeoutFlag: dolphinTaskDefinition.timeoutFlag === 'OPEN', + flag: dolphinTaskDefinition.flag === 'YES', + isCache: dolphinTaskDefinition.isCache === 'YES', + upstreamCodes: dolphinTaskDefinition.upstreamTaskMap + ? Object.keys(dolphinTaskDefinition.upstreamTaskMap) + : [], + timeoutNotifyStrategy: + dolphinTaskDefinition.timeoutNotifyStrategy === 'WARNFAILED' + ? ['WARN', 'FAILED'] + : [dolphinTaskDefinition.timeoutNotifyStrategy] + }; + return transformValue; + } else { + const falseTransformValue: DolphinTaskDefinition = { + ...dolphinTaskDefinition, + ...pushDolphinParams, + description: pushDolphinParams.description, + timeoutFlag: pushDolphinParams.timeoutFlag ? 'OPEN' : 'CLOSE', + flag: pushDolphinParams.flag ? 'YES' : 'NO', + isCache: pushDolphinParams.isCache ? 'YES' : 'NO', + timeoutNotifyStrategy: (pushDolphinParams.timeoutNotifyStrategy as string[]).join('') + }; + return falseTransformValue; + } +}; diff --git a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin/index.tsx b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin/index.tsx new file mode 100644 index 0000000000..70c959532c --- /dev/null +++ b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin/index.tsx @@ -0,0 +1,329 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { FormContextValue } from '@/components/Context/FormContext'; +import { NORMAL_MODAL_OPTIONS, SWITCH_OPTIONS } from '@/services/constants'; +import { l } from '@/utils/intl'; +import { + ModalForm, + ProFormCheckbox, + ProFormDigit, + ProFormGroup, + ProFormSelect, + ProFormSwitch, + ProFormText, + ProFormTextArea +} from '@ant-design/pro-components'; + +import { + PriorityList, + TimeoutNotifyStrategy +} from '@/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin/constants'; +import { transformPushDolphinParams } from '@/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin/function'; +import { + DolphinTaskDefinition, + DolphinTaskGroupInfo, + DolphinTaskMinInfo, + PushDolphinParams +} from '@/types/Studio/data.d'; +import { InitPushDolphinParams } from '@/types/Studio/init.d'; +import { Button, Form, Tag } from 'antd'; +import { DefaultOptionType } from 'antd/es/select'; +import React, { useEffect } from 'react'; +import {TaskState} from "@/pages/DataStudio/type"; + +type PushDolphinProps = { + onCancel: () => void; + dolphinTaskList: DolphinTaskMinInfo[]; + dolphinTaskGroup: DolphinTaskGroupInfo[]; + dolphinDefinitionTask: Partial; + modalVisible: boolean; + currentDinkyTaskValue: Partial; + loading: boolean; + onSubmit: (values: DolphinTaskDefinition) => void; + formValuesInfo: DolphinTaskDefinition; +}; + +export const PushDolphin: React.FC = (props) => { + const { + onCancel, + onSubmit, + modalVisible, + dolphinTaskList, + dolphinDefinitionTask, + dolphinTaskGroup, + currentDinkyTaskValue, + loading, + formValuesInfo + } = props; + + const [formValues, setFormValues] = React.useState( + transformPushDolphinParams( + dolphinDefinitionTask as DolphinTaskDefinition, + { ...InitPushDolphinParams, taskId: currentDinkyTaskValue?.taskId ?? '' }, + true + ) as PushDolphinParams + ); + useEffect(() => { + if (JSON.stringify(formValuesInfo) != '{}') { + const isWARN = formValuesInfo?.timeoutNotifyStrategy?.includes('WARN') ? 'WARN' : false; + const isFAILED = formValuesInfo?.timeoutNotifyStrategy?.includes('FAILED') ? 'FAILED' : false; + + const temp: any = { + ...formValuesInfo, + flag: formValuesInfo.flag == 'NO' ? false : true, + timeoutFlag: formValuesInfo.timeoutFlag == 'CLOSE' ? false : true, + isCache: formValuesInfo.isCache == 'YES' ? true : false, + timeoutNotifyStrategy: [isWARN, isFAILED].filter((item) => item) + }; + form.setFieldsValue(temp); + } + }, []); + /** + * init form + */ + const [form] = Form.useForm(); + + /** + * init form context + */ + const formContext = React.useMemo( + () => ({ + resetForm: () => form.resetFields() // 定义 resetForm 方法 + }), + [form] + ); + + /** + * cancel choose + */ + const handleCancel = () => { + onCancel(); + formContext.resetForm(); + }; + + const handlePushDolphinSubmit = async () => { + const values = form.validateFields(); + if (!values) { + return; + } + const transformPushDolphinParamsValue: DolphinTaskDefinition = transformPushDolphinParams( + dolphinDefinitionTask as DolphinTaskDefinition, + formValues, + false + ) as DolphinTaskDefinition; + + onSubmit(transformPushDolphinParamsValue); + handleCancel(); + }; + + const renderFooter = () => { + return [ + , + + ]; + }; + + const buildUpstreamTaskOptions = ( + data: DolphinTaskMinInfo[] | undefined + ): DefaultOptionType[] => { + if (data && data.length > 0) { + return data.map((item) => { + const label = ( + <> + + {l('datastudio.header.pushdolphin.taskName', '', { name: item.taskName })} + + + {l('datastudio.header.pushdolphin.taskNameExt', '', { + type: item.taskType, + processDefinitionName: item.processDefinitionName + })} + + + ); + return { + label: label, + value: item.taskCode.toString(), + key: item.taskCode + }; + }); + } + return []; + }; + + const handleValueChange = (changedValues: any, allValues: any) => { + if (allValues) { + setFormValues({ ...formValues, ...allValues }); + } + }; + + const pushDolphinForm = () => { + return ( + <> + + + + + + + + + + + + + + + + + + + + {/*如果是失败告警,则需要设置告警策略*/} + {formValues.timeoutFlag && ( + <> + + + + + + )} + + + + ); + }; + + return ( + + {...NORMAL_MODAL_OPTIONS} + title={l('datastudio.header.pushdolphin.title', '', { + name: currentDinkyTaskValue?.name ?? '' + })} + open={modalVisible} + form={form} + initialValues={formValues} + modalProps={{ + onCancel: handleCancel, + destroyOnClose: true + }} + submitter={{ render: () => [...renderFooter()] }} + onValuesChange={handleValueChange} + loading={loading} + > + {pushDolphinForm()} + + ); +}; + +export default PushDolphin; diff --git a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx index f65520db86..ab6e5d2953 100644 --- a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx +++ b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx @@ -81,7 +81,7 @@ import { import { l } from '@/utils/intl'; import { editor } from 'monaco-editor'; import { DataStudioActionType } from '@/pages/DataStudio/data.d'; -import { getDataByParams, handlePutDataJson, queryDataByParams } from '@/services/BusinessCrud'; +import {getDataByParams, handleOption, handlePutDataJson, queryDataByParams} from '@/services/BusinessCrud'; import { API_CONSTANTS } from '@/services/endpoints'; import { Jobs, LineageDetailInfo } from '@/types/DevOps/data'; import { lockTask, matchLanguage } from '@/pages/DataStudio/function'; @@ -98,6 +98,8 @@ import { ResourceInfo } from '@/types/RegCenter/data'; import { buildResourceTreeDataAtTreeForm } from '@/pages/RegCenter/Resource/components/FileTree/function'; import { ProFormDependency } from '@ant-design/pro-form'; import { SavePoint } from '@/pages/DataStudio/CenterTabContent/SqlTask/SavePoint'; +import {DolphinTaskDefinition, DolphinTaskGroupInfo, DolphinTaskMinInfo} from "@/types/Studio/data"; +import PushDolphin from "@/pages/DataStudio/CenterTabContent/SqlTask/PushDolphin"; export type FlinkSqlProps = { showDesc: boolean; @@ -184,6 +186,26 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { })); const [isRunning, setIsRunning] = useState(false); + const [pushDolphinState, setPushDolphinState] = useState<{ + modalVisible: boolean; + buttonLoading: boolean; + confirmLoading: boolean; + dolphinTaskList: DolphinTaskMinInfo[]; + dolphinTaskGroup: DolphinTaskGroupInfo[]; + dolphinDefinitionTask: Partial; + currentDinkyTaskValue: Partial; + formValuesInfo: any; + }>({ + modalVisible: false, + buttonLoading: false, + confirmLoading: false, + dolphinTaskList: [], + dolphinTaskGroup: [], + dolphinDefinitionTask: {}, + currentDinkyTaskValue: {}, + formValuesInfo: {} + }); + useEffect(() => { if (sqlForm.enable) { setSqlForm((prevState) => ({ @@ -439,6 +461,7 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { } }); }, [currentState, updateAction]); + const handleDAG = useCallback(async () => { const statement = currentState.dialect.toLowerCase() === DIALECT.FLINKJAR @@ -594,6 +617,7 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { const handleFormat = useCallback(async () => { editorInstance.current?.getAction('format')?.run(); }, [editorInstance.current]); + const handleLocation = useCallback(async () => { const key = Number(id.replace('project_', '')); updateAction({ @@ -604,6 +628,7 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { } }); }, [updateAction]); + const handleChangeJobLife = useCallback(async () => { if (JOB_LIFE_CYCLE.PUBLISH == currentState.step) { await changeTaskLife( @@ -624,6 +649,70 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { setCurrentState((prevState) => ({ ...prevState, step: currentState.step })); }, [handleSave, currentState.step, currentState.taskId]); + const handlePushDolphinOpen = async () => { + const dinkyTaskId = currentState.taskId; + const dolphinTaskList: DolphinTaskMinInfo[] | undefined = await queryDataByParams< + DolphinTaskMinInfo[] + >(API_CONSTANTS.SCHEDULER_QUERY_UPSTREAM_TASKS, { dinkyTaskId }); + const dolphinTaskDefinition: DolphinTaskDefinition | undefined = + await queryDataByParams( + API_CONSTANTS.SCHEDULER_QUERY_TASK_DEFINITION, + { + dinkyTaskId + } + ); + + let dolphinTaskGroup: DolphinTaskGroupInfo[] | undefined = undefined; + if (dolphinTaskDefinition?.projectCode){ + dolphinTaskGroup = await queryDataByParams< + DolphinTaskGroupInfo[] + >(API_CONSTANTS.SCHEDULER_QUERY_TASK_GROUP, { + projectCode: dolphinTaskDefinition?.projectCode || undefined + }); + } + + + const formValuesInfo = dolphinTaskDefinition + ? JSON.parse(JSON.stringify(dolphinTaskDefinition)) + : {}; + + setPushDolphinState((prevState) => ({ + ...prevState, + buttonLoading: true, + confirmLoading: false, + modalVisible: true, + dolphinTaskList: dolphinTaskList ?? [], + dolphinTaskGroup: dolphinTaskGroup ?? [], + dolphinDefinitionTask: dolphinTaskDefinition ?? {}, + currentDinkyTaskValue: currentState as TaskState, + formValuesInfo: formValuesInfo ?? {} + })); + }; + + const handlePushDolphinCancel = async () => { + setPushDolphinState((prevState) => ({ + ...prevState, + modalVisible: false, + buttonLoading: false, + dolphinTaskList: [], + dolphinTaskGroup: [], + confirmLoading: false, + dolphinDefinitionTask: {}, + currentDinkyTaskValue: {}, + formValuesInfo: {} + })); + }; + + const handlePushDolphinSubmit = async (value: DolphinTaskDefinition) => { + setPushDolphinState((prevState) => ({ ...prevState, loading: true })); + await handleOption( + API_CONSTANTS.SCHEDULER_CREATE_OR_UPDATE_TASK_DEFINITION, + `推送任务[${currentState.name}]至 DolphinScheduler`, + value + ); + await handlePushDolphinCancel(); + }; + return ( { hotKeyDesc: 'Ctrl+E', hotKeyHandle: (e: KeyboardEvent) => e.ctrlKey && e.key === 'E' }} + onClick={handlePushDolphinOpen} /> @@ -1049,6 +1139,20 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { + + {pushDolphinState.modalVisible && ( + handlePushDolphinCancel()} + currentDinkyTaskValue={pushDolphinState.currentDinkyTaskValue} + modalVisible={pushDolphinState.modalVisible} + loading={pushDolphinState.confirmLoading} + dolphinDefinitionTask={pushDolphinState.dolphinDefinitionTask} + dolphinTaskList={pushDolphinState.dolphinTaskList} + dolphinTaskGroup={pushDolphinState.dolphinTaskGroup} + onSubmit={(values) => handlePushDolphinSubmit(values)} + formValuesInfo={pushDolphinState.formValuesInfo} + /> + )} ); });