diff --git a/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java b/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java index 134e87b3d3..9b2345bb5a 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java @@ -167,7 +167,11 @@ public Result getOneById(@RequestBody ID id) { required = true) public Result refreshJobInfoDetail( @RequestParam Integer id, @RequestParam(defaultValue = "false") boolean isForce) { - return Result.succeed(jobInstanceService.refreshJobInfoDetail(id, isForce)); + JobInstance jobInstance = jobInstanceService.getById(id); + if (jobInstance == null) { + return Result.failed(Status.JOB_INSTANCE_NOT_EXIST); + } + return Result.succeed(jobInstanceService.refreshJobInfoDetail(id, jobInstance.getTaskId(), isForce)); } /** diff --git a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java index a9cff578a1..2ac2720263 100644 --- a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java +++ b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java @@ -152,11 +152,11 @@ private void initDaemon() { List jobInstances = jobInstanceService.listJobInstanceActive(); FlinkJobThreadPool flinkJobThreadPool = FlinkJobThreadPool.getInstance(); for (JobInstance jobInstance : jobInstances) { - DaemonTaskConfig config = new DaemonTaskConfig(FlinkJobTask.TYPE, jobInstance.getId()); + DaemonTaskConfig config = + DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId(), jobInstance.getTaskId()); DaemonTask daemonTask = DaemonTask.build(config); flinkJobThreadPool.execute(daemonTask); } - // SseSessionContextHolder.init(schedule); } /** diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/Job2MysqlHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/Job2MysqlHandler.java index 7050bb48cd..515fb8a0ad 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/Job2MysqlHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/Job2MysqlHandler.java @@ -216,7 +216,8 @@ public boolean success() { : null) .build(); jobHistoryService.save(jobHistory); - DaemonTaskConfig taskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId()); + DaemonTaskConfig taskConfig = + DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId(), jobInstance.getTaskId()); FlinkJobThreadPool.getInstance().execute(DaemonTask.build(taskConfig)); return true; } diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java index fdbd834399..348c88cdf1 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java @@ -25,6 +25,7 @@ import org.dinky.assertion.Asserts; import org.dinky.cluster.FlinkClusterInfo; import org.dinky.context.SpringContextUtils; +import org.dinky.context.TenantContextHolder; import org.dinky.data.constant.FlinkRestResultConstant; import org.dinky.data.dto.ClusterConfigurationDTO; import org.dinky.data.dto.JobDataDto; @@ -101,6 +102,10 @@ public class JobRefreshHandler { * @return True if the job is done, false otherwise. */ public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave) { + if (Asserts.isNull(TenantContextHolder.get())) { + jobInstanceService.initTenantByJobInstanceId( + jobInfoDetail.getInstance().getId()); + } log.debug( "Start to refresh job: {}->{}", jobInfoDetail.getInstance().getId(), diff --git a/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java b/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java index 1542364bfa..4993fbccac 100644 --- a/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java +++ b/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java @@ -83,7 +83,7 @@ public interface JobInstanceService extends ISuperService { * @param jobInstanceId The ID of the job instance to refresh the job information detail for. * @return A {@link JobInfoDetail} object representing the refreshed job information detail. */ - JobInfoDetail refreshJobInfoDetail(Integer jobInstanceId, boolean isForce); + JobInfoDetail refreshJobInfoDetail(Integer jobInstanceId, Integer taskId, boolean isForce); /** * Hook the job done for the given job ID and task ID. diff --git a/dinky-admin/src/main/java/org/dinky/service/catalogue/impl/CatalogueServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/catalogue/impl/CatalogueServiceImpl.java index 8b78802696..1f8f4e63b0 100644 --- a/dinky-admin/src/main/java/org/dinky/service/catalogue/impl/CatalogueServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/catalogue/impl/CatalogueServiceImpl.java @@ -519,7 +519,7 @@ public Result deleteCatalogueById(Integer catalogueId) { if (currentJobInstance != null) { // 获取前 先强制刷新一下, 避免获取任务信息状态不准确 JobInfoDetail jobInfoDetail = - jobInstanceService.refreshJobInfoDetail(task.getJobInstanceId(), true); + jobInstanceService.refreshJobInfoDetail(task.getJobInstanceId(), task.getId(), true); if (jobInfoDetail.getInstance().getStatus().equals(JobStatus.RUNNING.getValue())) { throw new BusException(Status.TASK_IS_RUNNING_CANNOT_DELETE); } diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java index 75454bb37e..17a1b73c83 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java @@ -151,6 +151,9 @@ public List listJobInstanceActive() { @Override public JobInfoDetail getJobInfoDetail(Integer id) { + if (Asserts.isNull(TenantContextHolder.get())) { + initTenantByJobInstanceId(id); + } return getJobInfoDetailInfo(getById(id)); } @@ -199,8 +202,8 @@ public JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance) { } @Override - public JobInfoDetail refreshJobInfoDetail(Integer jobInstanceId, boolean isForce) { - DaemonTaskConfig daemonTaskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstanceId); + public JobInfoDetail refreshJobInfoDetail(Integer jobInstanceId, Integer taskId, boolean isForce) { + DaemonTaskConfig daemonTaskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstanceId, taskId); DaemonTask daemonTask = FlinkJobThreadPool.getInstance().getByTaskConfig(daemonTaskConfig); if (daemonTask != null && !isForce) { @@ -234,7 +237,7 @@ public boolean hookJobDone(String jobId, Integer taskId) { return true; } - DaemonTaskConfig config = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId()); + DaemonTaskConfig config = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId(), instance.getTaskId()); DaemonTask daemonTask = FlinkJobThreadPool.getInstance().removeByTaskConfig(config); daemonTask = Optional.ofNullable(daemonTask).orElse(DaemonTask.build(config)); @@ -263,7 +266,7 @@ public boolean hookJobDoneByHistory(String jobId) { return true; } - DaemonTaskConfig config = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId()); + DaemonTaskConfig config = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId(), instance.getTaskId()); DaemonTask daemonTask = FlinkJobThreadPool.getInstance().removeByTaskConfig(config); daemonTask = Optional.ofNullable(daemonTask).orElse(DaemonTask.build(config)); @@ -279,10 +282,11 @@ public boolean hookJobDoneByHistory(String jobId) { public void refreshJobByTaskIds(Integer... taskIds) { for (Integer taskId : taskIds) { JobInstance instance = getJobInstanceByTaskId(taskId); - DaemonTaskConfig daemonTaskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId()); + DaemonTaskConfig daemonTaskConfig = + DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId(), instance.getTaskId()); FlinkJobThreadPool.getInstance().removeByTaskConfig(daemonTaskConfig); FlinkJobThreadPool.getInstance().execute(DaemonTask.build(daemonTaskConfig)); - refreshJobInfoDetail(instance.getId(), false); + refreshJobInfoDetail(instance.getId(), instance.getTaskId(), false); } } diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index a99ee03c3e..56f206396f 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -407,7 +407,8 @@ public JobResult restartTask(Integer id, String savePointPath) throws Exception } int count = 0; while (true) { - JobInfoDetail jobInfoDetail = jobInstanceService.refreshJobInfoDetail(jobInstance.getId(), false); + JobInfoDetail jobInfoDetail = jobInstanceService.refreshJobInfoDetail( + jobInstance.getId(), jobInstance.getTaskId(), false); if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus())) { log.info( "JobInstance [{}] status is [{}], ready to submit Job", @@ -466,7 +467,7 @@ public boolean cancelTaskJob(TaskDTO task, boolean withSavePoint, boolean forceC log.warn("Stop with savePoint failed: {}, will try normal rest api stop", e.getMessage()); isSuccess = jobManager.cancelNormal(jobInstance.getJid()); } - jobInstanceService.refreshJobInfoDetail(jobInstance.getId(), true); + jobInstanceService.refreshJobInfoDetail(jobInstance.getId(), jobInstance.getTaskId(), true); return isSuccess; } @@ -605,7 +606,8 @@ public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) thro if (Asserts.isNotNull(jobInstance)) { jobInstance.setStep(lifeCycle.getValue()); boolean updatedJobInstance = jobInstanceService.updateById(jobInstance); - if (updatedJobInstance) jobInstanceService.refreshJobInfoDetail(jobInstance.getId(), true); + if (updatedJobInstance) + jobInstanceService.refreshJobInfoDetail(jobInstance.getId(), jobInstance.getTaskId(), true); log.warn( "JobInstance [{}] step change to [{}] ,Trigger Force Refresh", jobInstance.getName(), @@ -1088,10 +1090,10 @@ public List getUserTasks(Integer userId) { private Boolean hasTaskOperatePermission(Integer firstLevelOwner, List secondLevelOwners) { boolean isFirstLevelOwner = firstLevelOwner != null && firstLevelOwner == StpUtil.getLoginIdAsInt(); if (TaskOwnerLockStrategyEnum.OWNER.equals( - SystemConfiguration.getInstances().GetTaskOwnerLockStrategyValue())) { + SystemConfiguration.getInstances().getTaskOwnerLockStrategy())) { return isFirstLevelOwner; } else if (TaskOwnerLockStrategyEnum.OWNER_AND_MAINTAINER.equals( - SystemConfiguration.getInstances().GetTaskOwnerLockStrategyValue())) { + SystemConfiguration.getInstances().getTaskOwnerLockStrategy())) { return isFirstLevelOwner || (secondLevelOwners != null && secondLevelOwners.contains(StpUtil.getLoginIdAsInt())); } diff --git a/dinky-admin/src/main/java/org/dinky/ws/GlobalWebSocket.java b/dinky-admin/src/main/java/org/dinky/ws/GlobalWebSocket.java index 2e6eb66030..9d94b3872f 100644 --- a/dinky-admin/src/main/java/org/dinky/ws/GlobalWebSocket.java +++ b/dinky-admin/src/main/java/org/dinky/ws/GlobalWebSocket.java @@ -19,6 +19,7 @@ package org.dinky.ws; +import org.dinky.assertion.Asserts; import org.dinky.data.vo.SseDataVo; import org.dinky.utils.JsonUtils; import org.dinky.utils.ThreadUtil; @@ -61,7 +62,10 @@ public GlobalWebSocket() { executorService.execute(() -> { while (isRunning) { Set params = getRequestParamMap().get(value); - sendTopic(value, params, value.getInstance().autoDataSend(params)); + Map topicMap = value.getInstance().autoDataSend(params); + if (Asserts.isNotNullMap(topicMap)) { + sendTopic(value, params, topicMap); + } ThreadUtil.sleep(value.getDelaySend()); } }); diff --git a/dinky-admin/src/main/java/org/dinky/ws/GlobalWebSocketTopic.java b/dinky-admin/src/main/java/org/dinky/ws/GlobalWebSocketTopic.java index da5d6eb5b9..8a6ddf9378 100644 --- a/dinky-admin/src/main/java/org/dinky/ws/GlobalWebSocketTopic.java +++ b/dinky-admin/src/main/java/org/dinky/ws/GlobalWebSocketTopic.java @@ -24,6 +24,7 @@ import org.dinky.ws.topic.Metrics; import org.dinky.ws.topic.PrintTable; import org.dinky.ws.topic.ProcessConsole; +import org.dinky.ws.topic.TaskRunInstance; import lombok.AllArgsConstructor; import lombok.Getter; @@ -35,6 +36,7 @@ public enum GlobalWebSocketTopic { PROCESS_CONSOLE("PROCESS_CONSOLE", ProcessConsole.INSTANCE, Integer.MAX_VALUE), PRINT_TABLE("PRINT_TABLE", PrintTable.INSTANCE, Integer.MAX_VALUE), METRICS("METRICS", Metrics.INSTANCE, Integer.MAX_VALUE), + TASK_RUN_INSTANCE("TASK_RUN_INSTANCE", TaskRunInstance.INSTANCE, 1000), ; private final String topic; private final BaseTopic instance; diff --git a/dinky-admin/src/main/java/org/dinky/ws/topic/BaseTopic.java b/dinky-admin/src/main/java/org/dinky/ws/topic/BaseTopic.java index 40e1905212..78e6f985ed 100644 --- a/dinky-admin/src/main/java/org/dinky/ws/topic/BaseTopic.java +++ b/dinky-admin/src/main/java/org/dinky/ws/topic/BaseTopic.java @@ -27,11 +27,6 @@ @AllArgsConstructor public abstract class BaseTopic { public static final String NONE_PARAMS = "none-params"; - /** - * - * @return All subscription parameters - */ - // Set allParams(); /** * Data sending ideas, including data acquisition and sending @@ -44,6 +39,4 @@ public abstract class BaseTopic { * @return The data sent will be converted by JSON when it is finally sent */ public abstract Map firstDataSend(Set allParams); - - public void dataSend(Map data) {} } diff --git a/dinky-admin/src/main/java/org/dinky/ws/topic/ProcessConsole.java b/dinky-admin/src/main/java/org/dinky/ws/topic/ProcessConsole.java index a5a0c3e527..3024b47131 100644 --- a/dinky-admin/src/main/java/org/dinky/ws/topic/ProcessConsole.java +++ b/dinky-admin/src/main/java/org/dinky/ws/topic/ProcessConsole.java @@ -25,13 +25,11 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import lombok.extern.slf4j.Slf4j; @Slf4j public class ProcessConsole extends BaseTopic { - private final Map logPross = new ConcurrentHashMap<>(); public static final ProcessConsole INSTANCE = new ProcessConsole(); private ProcessConsole() {} diff --git a/dinky-admin/src/main/java/org/dinky/ws/topic/TaskRunInstance.java b/dinky-admin/src/main/java/org/dinky/ws/topic/TaskRunInstance.java new file mode 100644 index 0000000000..7225418b55 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/ws/topic/TaskRunInstance.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.ws.topic; + +import org.dinky.daemon.pool.FlinkJobThreadPool; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import cn.hutool.core.collection.CollUtil; + +public class TaskRunInstance extends BaseTopic { + public static final TaskRunInstance INSTANCE = new TaskRunInstance(); + private Set runningJobIds = CollUtil.newHashSet(); + + private TaskRunInstance() {} + + @Override + public Map autoDataSend(Set allParams) { + Set currentMonitorTaskIds = FlinkJobThreadPool.getInstance().getCurrentMonitorTaskIds(); + if (!runningJobIds.equals(currentMonitorTaskIds)) { + runningJobIds = currentMonitorTaskIds; + Map result = new HashMap<>(); + result.put("RunningTaskId", FlinkJobThreadPool.getInstance().getCurrentMonitorTaskIds()); + return result; + } + return new HashMap<>(); + } + + @Override + public Map firstDataSend(Set allParams) { + Map result = new HashMap<>(); + result.put("RunningTaskId", FlinkJobThreadPool.getInstance().getCurrentMonitorTaskIds()); + return result; + } +} diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskQueue.java b/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskQueue.java index c69001fbb3..edd11b7e8e 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskQueue.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskQueue.java @@ -94,4 +94,8 @@ public int getTaskSize() { return tasks.size(); } } + + public ArrayList getTasks() { + return tasks; + } } diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/pool/FlinkJobThreadPool.java b/dinky-daemon/src/main/java/org/dinky/daemon/pool/FlinkJobThreadPool.java index 9cd21a57f3..13f54448af 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/pool/FlinkJobThreadPool.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/pool/FlinkJobThreadPool.java @@ -27,7 +27,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * @operate @@ -132,4 +134,11 @@ public void removeWorker(int num) { public DaemonTask getByTaskConfig(DaemonTaskConfig daemonTask) { return queue.getByTaskConfig(daemonTask); } + + public Set getCurrentMonitorTaskIds() { + return queue.getTasks().stream() + .map(DaemonTask::getConfig) + .map(DaemonTaskConfig::getTaskId) + .collect(Collectors.toSet()); + } } diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTaskConfig.java b/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTaskConfig.java index cfe1087e7e..9cb06c36e8 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTaskConfig.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTaskConfig.java @@ -28,18 +28,20 @@ public class DaemonTaskConfig { private final String type; private Integer id; + private Integer taskId; - public DaemonTaskConfig(String type, Integer id) { + private DaemonTaskConfig(String type, Integer id, Integer taskId) { this.type = type; this.id = id; + this.taskId = taskId; } public DaemonTaskConfig(String type) { this.type = type; } - public static DaemonTaskConfig build(String type, Integer id) { - return new DaemonTaskConfig(type, id); + public static DaemonTaskConfig build(String type, Integer id, Integer taskId) { + return new DaemonTaskConfig(type, id, taskId); } @Override diff --git a/dinky-web/src/models/UseWebSocketModel.tsx b/dinky-web/src/models/UseWebSocketModel.tsx index edc8465833..3ceda5dbba 100644 --- a/dinky-web/src/models/UseWebSocketModel.tsx +++ b/dinky-web/src/models/UseWebSocketModel.tsx @@ -32,7 +32,8 @@ export enum Topic { JVM_INFO = 'JVM_INFO', PROCESS_CONSOLE = 'PROCESS_CONSOLE', PRINT_TABLE = 'PRINT_TABLE', - METRICS = 'METRICS' + METRICS = 'METRICS', + TASK_RUN_INSTANCE = 'TASK_RUN_INSTANCE', } export type SubscriberData = { diff --git a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx index fc72b06133..781a3b5c7b 100644 --- a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx +++ b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/index.tsx @@ -78,6 +78,7 @@ import CodeEdit from '@/components/CustomEditor/CodeEdit'; import DiffModal from '@/pages/DataStudio/CenterTabContent/SqlTask/DiffModal'; import TaskConfig from '@/pages/DataStudio/CenterTabContent/SqlTask/TaskConfig'; import SelectDb from '@/pages/DataStudio/CenterTabContent/RunToolbar/SelectDb'; +import {SseData, Topic} from "@/models/UseWebSocketModel"; export type FlinkSqlProps = { showDesc: boolean; @@ -147,13 +148,10 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { const [isFullscreen, { enterFullscreen, exitFullscreen }] = useFullscreen(containerRef); const { initialState } = useModel('@@initialState'); - const [refreshTaskStatusDelay, setRefreshTaskStatusDelay] = useState( - undefined - ); - useRafInterval(async () => { - const taskDetail = (await getTaskDetails(params.taskId))!!; - setCurrentState((prevState) => ({ ...prevState, status: taskDetail.status })); - }, refreshTaskStatusDelay); + const { subscribeTopic } = useModel('UseWebSocketModel', (model: any) => ({ + subscribeTopic: model.subscribeTopic + })); + const [isRunning, setIsRunning] = useState(false); useAsyncEffect(async () => { const taskDetail = await getTaskDetails(params.taskId); @@ -177,14 +175,14 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { } setLoading(false); }, []); - // 定时刷新作业状态 + useEffect(() => { - if (isStatusDone(currentState.status)) { - setRefreshTaskStatusDelay(undefined); - } else { - setRefreshTaskStatusDelay(3000); - } - }, [currentState.status]); + return subscribeTopic(Topic.TASK_RUN_INSTANCE, null, (data: SseData) => { + if (data?.data?.RunningTaskId) { + setIsRunning(data?.data?.RunningTaskId.includes(params.taskId)); + } + }); + }, []); // 数据初始化 useEffect(() => { @@ -548,12 +546,11 @@ export const SqlTask = memo((props: FlinkSqlProps & any) => { submitter={false} layout='horizontal' variant={'filled'} - disabled={currentState?.step === JOB_LIFE_CYCLE.PUBLISH || isLockTask} // 当该任务处于发布状态时 表单禁用 不允许修改 | when this job is publishing, the form is disabled , and it is not allowed to modify + disabled={currentState?.step === JOB_LIFE_CYCLE.PUBLISH || isLockTask} // when this job is publishing, the form is disabled , and it is not allowed to modify onValuesChange={debounce(onValuesChange, 500)} syncToInitialValues > - {/* 运行工具栏*/} { { /> { /> { { * @param currentUser * @param taskOwnerLockingStrategy * @param users + * @param currentRunningTaskIds * @returns {any} */ @@ -230,13 +231,13 @@ export const buildProjectTree = ( path: string[] = [], currentUser: UserBaseInfo.User, taskOwnerLockingStrategy: TaskOwnerLockingStrategy, - users: UserBaseInfo.User[] = [] + users: UserBaseInfo.User[] = [], + currentRunningTaskIds: number[] = [], ): any => data ? data.map((item: Catalogue) => { const currentPath = path ? [...path, item.name] : [item.name]; - // 总渲染 title const renderTitle = ( {searchTreeNode(item.name, searchValue)} @@ -255,6 +256,7 @@ export const buildProjectTree = ( // 渲染后缀图标 const renderSuffixIcon = ( <> + {currentRunningTaskIds.includes(item.taskId)?:undefined} {lockTask( item?.task?.firstLevelOwner, item?.task?.secondLevelOwners, @@ -308,7 +310,8 @@ export const buildProjectTree = ( currentPath, currentUser, taskOwnerLockingStrategy, - users + users, + currentRunningTaskIds ) }; }) diff --git a/dinky-web/src/pages/DataStudio/Toolbar/Project/index.tsx b/dinky-web/src/pages/DataStudio/Toolbar/Project/index.tsx index b0dea09060..b6fba6a9b2 100644 --- a/dinky-web/src/pages/DataStudio/Toolbar/Project/index.tsx +++ b/dinky-web/src/pages/DataStudio/Toolbar/Project/index.tsx @@ -44,6 +44,7 @@ import { useRightContext } from '@/pages/DataStudio/Toolbar/Project/RightContext import { TreeVo } from '@/pages/DataStudio/type'; import FolderModal from '@/pages/DataStudio/Toolbar/Project/FolderModal'; import { getTaskSortTypeData } from '@/pages/DataStudio/service'; +import {SseData, Topic} from "@/models/UseWebSocketModel"; export const Project = (props: any) => { const { @@ -91,6 +92,19 @@ export const Project = (props: any) => { data: { ...selectCatalogueSortTypeData }, method: 'post' }); + const { subscribeTopic } = useModel('UseWebSocketModel', (model: any) => ({ + subscribeTopic: model.subscribeTopic + })); + const [currentRunningTaskIds, setCurrentRunningTaskIds] = useState([]); + + useEffect(() => { + subscribeTopic(Topic.TASK_RUN_INSTANCE, null, (data: SseData) => { + if (data?.data?.RunningTaskId) { + setCurrentRunningTaskIds(data?.data?.RunningTaskId); + } + }); + }, []); + useEffect(() => { if (initDid) { setInitDid(loading); @@ -148,7 +162,6 @@ export const Project = (props: any) => { } }, [actionType, params]); - // tree数据初始化 useAsyncEffect(async () => { if (data) { setTreeData( @@ -158,27 +171,28 @@ export const Project = (props: any) => { [], initialState?.currentUser?.user, taskOwnerLockingStrategy, - users + users, + currentRunningTaskIds ) ); - // 这里需要再次设置expandKeys,因为网络延迟问题,导致第一次设置expandKeys无效 + // We need to set expandKeys again here because of network latency issues, which caused the first time setting expandKeys to be invalid. updateProject({ expandKeys: [...expandKeys] }); } - }, [data, searchValue]); + }, [data, searchValue, currentRunningTaskIds]); + useEffect(() => { if (data) { refresh(); } }, [selectCatalogueSortTypeData]); - // 数据初始化 useEffect(() => { getTaskSortTypeData().then(setSortData); - // 监控布局宽度高度变化,重新计算树的高度 + // Monitor layout width and height changes, recalculate tree height. const element = ref.current!!; const observer = new ResizeObserver((entries) => { if (entries?.length === 1) { - // 这里节点理应为一个,减去的高度是为搜索栏的高度 + // The node here should be one, and the height subtracted is the height of the search bar. setTreeHeight(entries[0].contentRect.height - 52); } }); diff --git a/dinky-web/src/pages/DataStudio/Toolbar/Service/index.tsx b/dinky-web/src/pages/DataStudio/Toolbar/Service/index.tsx index f73dc4cea0..df478e97f9 100644 --- a/dinky-web/src/pages/DataStudio/Toolbar/Service/index.tsx +++ b/dinky-web/src/pages/DataStudio/Toolbar/Service/index.tsx @@ -19,7 +19,7 @@ import { CenterTab, DataStudioState } from '@/pages/DataStudio/model'; import { mapDispatchToProps } from '@/pages/DataStudio/DvaFunction'; -import { Flex, Tabs, TabsProps, TreeDataNode } from 'antd'; +import { Flex, Space, Tabs, TabsProps, TreeDataNode } from 'antd'; import { Panel, PanelGroup } from 'react-resizable-panels'; import DirectoryTree from 'antd/es/tree/DirectoryTree'; import './index.less'; @@ -29,6 +29,7 @@ import { ArrowsAltOutlined, AuditOutlined, CodeOutlined, + FireOutlined, HistoryOutlined, MonitorOutlined, PartitionOutlined, @@ -54,6 +55,8 @@ import { l } from '@/utils/intl'; import { assert } from '@/pages/DataStudio/utils'; import { connect } from '@umijs/max'; import { Lineage } from '@/pages/DataStudio/Toolbar/Service/Lineage'; +import { useModel } from '@umijs/max'; +import {SseData, Topic} from "@/models/UseWebSocketModel"; const Service = (props: { showDesc: boolean; tabs: CenterTab[]; action: any }) => { const { @@ -66,6 +69,18 @@ const Service = (props: { showDesc: boolean; tabs: CenterTab[]; action: any }) = const [tabActiveKey, setTabActiveKey] = useState>({}); const [treeData, setTreeData] = useState([]); const [expandKeys, setExpandKeys] = useState([]); + const { subscribeTopic } = useModel('UseWebSocketModel', (model: any) => ({ + subscribeTopic: model.subscribeTopic + })); + const [currentRunningTaskIds, setCurrentRunningTaskIds] = useState([]); + + useEffect(() => { + return subscribeTopic(Topic.TASK_RUN_INSTANCE, null, (data: SseData) => { + if (data?.data?.RunningTaskId) { + setCurrentRunningTaskIds(data?.data?.RunningTaskId); + } + }); + }, []); const getAllNodeKeys = (data: TreeDataNode[], keys: Key[] = []) => { data.forEach((item) => { @@ -159,8 +174,8 @@ const Service = (props: { showDesc: boolean; tabs: CenterTab[]; action: any }) = } }, [props.action]); - useEffect(() => { - const treeData: TreeDataNode[] = [ + useAsyncEffect(async () => { + const newTreeData: TreeDataNode[] = [ { title: 'Task', key: 'Task', @@ -174,7 +189,7 @@ const Service = (props: { showDesc: boolean; tabs: CenterTab[]; action: any }) = // 2. 查找到对应的FlinkSql // 3. 查找到对应的task - treeData.forEach((node) => { + newTreeData.forEach((node) => { const dialect = tab.params.dialect; if ( assert(dialect, [DIALECT.FLINK_SQL, DIALECT.FLINKJAR], true, 'includes') || @@ -192,7 +207,10 @@ const Service = (props: { showDesc: boolean; tabs: CenterTab[]; action: any }) = ) as TreeDataNode; } currentDialectTree.children!!.push({ - title: tab.title, + title: ( + {tab.title} + {currentRunningTaskIds.includes(tab.params.taskId)?:undefined} + ), key: tab.params.taskId, icon: icon, isLeaf: true @@ -202,11 +220,11 @@ const Service = (props: { showDesc: boolean; tabs: CenterTab[]; action: any }) = }); } }); - setTreeData(treeData); - setExpandKeys(getAllNodeKeys(treeData)); - }, [tabs]); + setTreeData(newTreeData); + expandKeys.length == 0 && setExpandKeys(getAllNodeKeys(newTreeData)); + }, [tabs, currentRunningTaskIds]); - const renderContent = () => { + const renderContent = useMemo(() => { if (selectedKey.length === 1) { const taskId = selectedKey[0] as number; const taskParams = tabs.find((tab) => tab.params.taskId === taskId)?.params; @@ -242,7 +260,7 @@ const Service = (props: { showDesc: boolean; tabs: CenterTab[]; action: any }) = key: 'history', label: l('menu.datastudio.history'), icon: , - children: + children: }); } if (assert(taskParams?.dialect, [DIALECT.FLINK_SQL], true, 'includes')) { @@ -270,7 +288,7 @@ const Service = (props: { showDesc: boolean; tabs: CenterTab[]; action: any }) = /> ); } - }; + },[tabs, selectedKey, props.action, tabActiveKey]); return ( @@ -311,7 +329,7 @@ const Service = (props: { showDesc: boolean; tabs: CenterTab[]; action: any }) = - {renderContent()} + {renderContent} diff --git a/dinky-web/src/pages/DevOps/JobList/index.tsx b/dinky-web/src/pages/DevOps/JobList/index.tsx index 6a58905cf6..d1625390e9 100644 --- a/dinky-web/src/pages/DevOps/JobList/index.tsx +++ b/dinky-web/src/pages/DevOps/JobList/index.tsx @@ -54,6 +54,7 @@ import { buildProjectTree } from '@/pages/DataStudio/Toolbar/Project/function'; import { showFirstLevelOwner, showSecondLevelOwners } from '@/pages/DataStudio/function'; import { generateList, getLeafKeyList, searchInTree } from '@/utils/treeUtils'; import { mapDispatchToProps } from '@/pages/DataStudio/DvaFunction'; +import {SseData, Topic} from "@/models/UseWebSocketModel"; const { DirectoryTree } = Tree; @@ -75,6 +76,18 @@ const JobList = (props: connect) => { method: 'post' }); const [projectData, setProjectData] = useState([]); + const { subscribeTopic } = useModel('UseWebSocketModel', (model: any) => ({ + subscribeTopic: model.subscribeTopic + })); + const [currentRunningTaskIds, setCurrentRunningTaskIds] = useState([]); + + useEffect(() => { + return subscribeTopic(Topic.TASK_RUN_INSTANCE, null, (data: SseData) => { + if (data?.data?.RunningTaskId) { + setCurrentRunningTaskIds(data?.data?.RunningTaskId); + } + }); + }, []); useEffect(() => { setProjectData( @@ -84,13 +97,14 @@ const JobList = (props: connect) => { [], initialState?.currentUser?.user, taskOwnerLockingStrategy, - users + users, + currentRunningTaskIds ) ); if (searchValue === '' || searchValue === undefined) { - setExpandedKeys([]); + expandedKeys.length == 0 && setExpandedKeys([]); } - }, [searchValue, taskOwnerLockingStrategy, data]); + }, [searchValue, taskOwnerLockingStrategy, data, currentRunningTaskIds]); const jobListColumns: ProColumns[] = [ { diff --git a/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/ConfigurationForm/index.tsx b/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/ConfigurationForm/index.tsx index 23eb6632c6..051765f0e7 100644 --- a/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/ConfigurationForm/index.tsx +++ b/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/ConfigurationForm/index.tsx @@ -42,7 +42,6 @@ const ConfigurationForm: React.FC = (props) => { url: API_CONSTANTS.FLINK_CONF_CONFIG_OPTIONS, method: 'get' }); - console.log(data); const renderAllForm = () => { return (