Skip to content

Commit

Permalink
[Feature-3939][studio] FlinkSQL Studio supports real-time update task…
Browse files Browse the repository at this point in the history
… status (#3941)
  • Loading branch information
aiwenmo authored Nov 22, 2024
1 parent 3b50876 commit 5664f55
Show file tree
Hide file tree
Showing 23 changed files with 206 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,11 @@ public Result getOneById(@RequestBody ID id) {
required = true)
public Result<JobInfoDetail> refreshJobInfoDetail(
@RequestParam Integer id, @RequestParam(defaultValue = "false") boolean isForce) {
return Result.succeed(jobInstanceService.refreshJobInfoDetail(id, isForce));
JobInstance jobInstance = jobInstanceService.getById(id);
if (jobInstance == null) {
return Result.failed(Status.JOB_INSTANCE_NOT_EXIST);
}
return Result.succeed(jobInstanceService.refreshJobInfoDetail(id, jobInstance.getTaskId(), isForce));
}

/**
Expand Down
4 changes: 2 additions & 2 deletions dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,11 @@ private void initDaemon() {
List<JobInstance> jobInstances = jobInstanceService.listJobInstanceActive();
FlinkJobThreadPool flinkJobThreadPool = FlinkJobThreadPool.getInstance();
for (JobInstance jobInstance : jobInstances) {
DaemonTaskConfig config = new DaemonTaskConfig(FlinkJobTask.TYPE, jobInstance.getId());
DaemonTaskConfig config =
DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId(), jobInstance.getTaskId());
DaemonTask daemonTask = DaemonTask.build(config);
flinkJobThreadPool.execute(daemonTask);
}
// SseSessionContextHolder.init(schedule);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ public boolean success() {
: null)
.build();
jobHistoryService.save(jobHistory);
DaemonTaskConfig taskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId());
DaemonTaskConfig taskConfig =
DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId(), jobInstance.getTaskId());
FlinkJobThreadPool.getInstance().execute(DaemonTask.build(taskConfig));
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.dinky.assertion.Asserts;
import org.dinky.cluster.FlinkClusterInfo;
import org.dinky.context.SpringContextUtils;
import org.dinky.context.TenantContextHolder;
import org.dinky.data.constant.FlinkRestResultConstant;
import org.dinky.data.dto.ClusterConfigurationDTO;
import org.dinky.data.dto.JobDataDto;
Expand Down Expand Up @@ -101,6 +102,10 @@ public class JobRefreshHandler {
* @return True if the job is done, false otherwise.
*/
public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave) {
if (Asserts.isNull(TenantContextHolder.get())) {
jobInstanceService.initTenantByJobInstanceId(
jobInfoDetail.getInstance().getId());
}
log.debug(
"Start to refresh job: {}->{}",
jobInfoDetail.getInstance().getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public interface JobInstanceService extends ISuperService<JobInstance> {
* @param jobInstanceId The ID of the job instance to refresh the job information detail for.
* @return A {@link JobInfoDetail} object representing the refreshed job information detail.
*/
JobInfoDetail refreshJobInfoDetail(Integer jobInstanceId, boolean isForce);
JobInfoDetail refreshJobInfoDetail(Integer jobInstanceId, Integer taskId, boolean isForce);

/**
* Hook the job done for the given job ID and task ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ public Result<Void> deleteCatalogueById(Integer catalogueId) {
if (currentJobInstance != null) {
// 获取前 先强制刷新一下, 避免获取任务信息状态不准确
JobInfoDetail jobInfoDetail =
jobInstanceService.refreshJobInfoDetail(task.getJobInstanceId(), true);
jobInstanceService.refreshJobInfoDetail(task.getJobInstanceId(), task.getId(), true);
if (jobInfoDetail.getInstance().getStatus().equals(JobStatus.RUNNING.getValue())) {
throw new BusException(Status.TASK_IS_RUNNING_CANNOT_DELETE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ public List<JobInstance> listJobInstanceActive() {

@Override
public JobInfoDetail getJobInfoDetail(Integer id) {
if (Asserts.isNull(TenantContextHolder.get())) {
initTenantByJobInstanceId(id);
}
return getJobInfoDetailInfo(getById(id));
}

Expand Down Expand Up @@ -199,8 +202,8 @@ public JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance) {
}

@Override
public JobInfoDetail refreshJobInfoDetail(Integer jobInstanceId, boolean isForce) {
DaemonTaskConfig daemonTaskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstanceId);
public JobInfoDetail refreshJobInfoDetail(Integer jobInstanceId, Integer taskId, boolean isForce) {
DaemonTaskConfig daemonTaskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstanceId, taskId);
DaemonTask daemonTask = FlinkJobThreadPool.getInstance().getByTaskConfig(daemonTaskConfig);

if (daemonTask != null && !isForce) {
Expand Down Expand Up @@ -234,7 +237,7 @@ public boolean hookJobDone(String jobId, Integer taskId) {
return true;
}

DaemonTaskConfig config = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId());
DaemonTaskConfig config = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId(), instance.getTaskId());
DaemonTask daemonTask = FlinkJobThreadPool.getInstance().removeByTaskConfig(config);
daemonTask = Optional.ofNullable(daemonTask).orElse(DaemonTask.build(config));

Expand Down Expand Up @@ -263,7 +266,7 @@ public boolean hookJobDoneByHistory(String jobId) {
return true;
}

DaemonTaskConfig config = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId());
DaemonTaskConfig config = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId(), instance.getTaskId());
DaemonTask daemonTask = FlinkJobThreadPool.getInstance().removeByTaskConfig(config);
daemonTask = Optional.ofNullable(daemonTask).orElse(DaemonTask.build(config));

Expand All @@ -279,10 +282,11 @@ public boolean hookJobDoneByHistory(String jobId) {
public void refreshJobByTaskIds(Integer... taskIds) {
for (Integer taskId : taskIds) {
JobInstance instance = getJobInstanceByTaskId(taskId);
DaemonTaskConfig daemonTaskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId());
DaemonTaskConfig daemonTaskConfig =
DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId(), instance.getTaskId());
FlinkJobThreadPool.getInstance().removeByTaskConfig(daemonTaskConfig);
FlinkJobThreadPool.getInstance().execute(DaemonTask.build(daemonTaskConfig));
refreshJobInfoDetail(instance.getId(), false);
refreshJobInfoDetail(instance.getId(), instance.getTaskId(), false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ public JobResult restartTask(Integer id, String savePointPath) throws Exception
}
int count = 0;
while (true) {
JobInfoDetail jobInfoDetail = jobInstanceService.refreshJobInfoDetail(jobInstance.getId(), false);
JobInfoDetail jobInfoDetail = jobInstanceService.refreshJobInfoDetail(
jobInstance.getId(), jobInstance.getTaskId(), false);
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus())) {
log.info(
"JobInstance [{}] status is [{}], ready to submit Job",
Expand Down Expand Up @@ -466,7 +467,7 @@ public boolean cancelTaskJob(TaskDTO task, boolean withSavePoint, boolean forceC
log.warn("Stop with savePoint failed: {}, will try normal rest api stop", e.getMessage());
isSuccess = jobManager.cancelNormal(jobInstance.getJid());
}
jobInstanceService.refreshJobInfoDetail(jobInstance.getId(), true);
jobInstanceService.refreshJobInfoDetail(jobInstance.getId(), jobInstance.getTaskId(), true);
return isSuccess;
}

Expand Down Expand Up @@ -605,7 +606,8 @@ public boolean changeTaskLifeRecyle(Integer taskId, JobLifeCycle lifeCycle) thro
if (Asserts.isNotNull(jobInstance)) {
jobInstance.setStep(lifeCycle.getValue());
boolean updatedJobInstance = jobInstanceService.updateById(jobInstance);
if (updatedJobInstance) jobInstanceService.refreshJobInfoDetail(jobInstance.getId(), true);
if (updatedJobInstance)
jobInstanceService.refreshJobInfoDetail(jobInstance.getId(), jobInstance.getTaskId(), true);
log.warn(
"JobInstance [{}] step change to [{}] ,Trigger Force Refresh",
jobInstance.getName(),
Expand Down Expand Up @@ -1088,10 +1090,10 @@ public List<TaskDTO> getUserTasks(Integer userId) {
private Boolean hasTaskOperatePermission(Integer firstLevelOwner, List<Integer> secondLevelOwners) {
boolean isFirstLevelOwner = firstLevelOwner != null && firstLevelOwner == StpUtil.getLoginIdAsInt();
if (TaskOwnerLockStrategyEnum.OWNER.equals(
SystemConfiguration.getInstances().GetTaskOwnerLockStrategyValue())) {
SystemConfiguration.getInstances().getTaskOwnerLockStrategy())) {
return isFirstLevelOwner;
} else if (TaskOwnerLockStrategyEnum.OWNER_AND_MAINTAINER.equals(
SystemConfiguration.getInstances().GetTaskOwnerLockStrategyValue())) {
SystemConfiguration.getInstances().getTaskOwnerLockStrategy())) {
return isFirstLevelOwner
|| (secondLevelOwners != null && secondLevelOwners.contains(StpUtil.getLoginIdAsInt()));
}
Expand Down
6 changes: 5 additions & 1 deletion dinky-admin/src/main/java/org/dinky/ws/GlobalWebSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.ws;

import org.dinky.assertion.Asserts;
import org.dinky.data.vo.SseDataVo;
import org.dinky.utils.JsonUtils;
import org.dinky.utils.ThreadUtil;
Expand Down Expand Up @@ -61,7 +62,10 @@ public GlobalWebSocket() {
executorService.execute(() -> {
while (isRunning) {
Set<String> params = getRequestParamMap().get(value);
sendTopic(value, params, value.getInstance().autoDataSend(params));
Map<String, Object> topicMap = value.getInstance().autoDataSend(params);
if (Asserts.isNotNullMap(topicMap)) {
sendTopic(value, params, topicMap);
}
ThreadUtil.sleep(value.getDelaySend());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.dinky.ws.topic.Metrics;
import org.dinky.ws.topic.PrintTable;
import org.dinky.ws.topic.ProcessConsole;
import org.dinky.ws.topic.TaskRunInstance;

import lombok.AllArgsConstructor;
import lombok.Getter;
Expand All @@ -35,6 +36,7 @@ public enum GlobalWebSocketTopic {
PROCESS_CONSOLE("PROCESS_CONSOLE", ProcessConsole.INSTANCE, Integer.MAX_VALUE),
PRINT_TABLE("PRINT_TABLE", PrintTable.INSTANCE, Integer.MAX_VALUE),
METRICS("METRICS", Metrics.INSTANCE, Integer.MAX_VALUE),
TASK_RUN_INSTANCE("TASK_RUN_INSTANCE", TaskRunInstance.INSTANCE, 1000),
;
private final String topic;
private final BaseTopic instance;
Expand Down
7 changes: 0 additions & 7 deletions dinky-admin/src/main/java/org/dinky/ws/topic/BaseTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@
@AllArgsConstructor
public abstract class BaseTopic {
public static final String NONE_PARAMS = "none-params";
/**
*
* @return All subscription parameters
*/
// Set<String> allParams();

/**
* Data sending ideas, including data acquisition and sending
Expand All @@ -44,6 +39,4 @@ public abstract class BaseTopic {
* @return The data sent will be converted by JSON when it is finally sent
*/
public abstract Map<String, Object> firstDataSend(Set<String> allParams);

public void dataSend(Map<String, Object> data) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ProcessConsole extends BaseTopic {
private final Map<String, ProcessEntity> logPross = new ConcurrentHashMap<>();
public static final ProcessConsole INSTANCE = new ProcessConsole();

private ProcessConsole() {}
Expand Down
54 changes: 54 additions & 0 deletions dinky-admin/src/main/java/org/dinky/ws/topic/TaskRunInstance.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.ws.topic;

import org.dinky.daemon.pool.FlinkJobThreadPool;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import cn.hutool.core.collection.CollUtil;

public class TaskRunInstance extends BaseTopic {
public static final TaskRunInstance INSTANCE = new TaskRunInstance();
private Set<Integer> runningJobIds = CollUtil.newHashSet();

private TaskRunInstance() {}

@Override
public Map<String, Object> autoDataSend(Set<String> allParams) {
Set<Integer> currentMonitorTaskIds = FlinkJobThreadPool.getInstance().getCurrentMonitorTaskIds();
if (!runningJobIds.equals(currentMonitorTaskIds)) {
runningJobIds = currentMonitorTaskIds;
Map<String, Object> result = new HashMap<>();
result.put("RunningTaskId", FlinkJobThreadPool.getInstance().getCurrentMonitorTaskIds());
return result;
}
return new HashMap<>();
}

@Override
public Map<String, Object> firstDataSend(Set<String> allParams) {
Map<String, Object> result = new HashMap<>();
result.put("RunningTaskId", FlinkJobThreadPool.getInstance().getCurrentMonitorTaskIds());
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,8 @@ public int getTaskSize() {
return tasks.size();
}
}

public ArrayList<T> getTasks() {
return tasks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* @operate
Expand Down Expand Up @@ -132,4 +134,11 @@ public void removeWorker(int num) {
public DaemonTask getByTaskConfig(DaemonTaskConfig daemonTask) {
return queue.getByTaskConfig(daemonTask);
}

public Set<Integer> getCurrentMonitorTaskIds() {
return queue.getTasks().stream()
.map(DaemonTask::getConfig)
.map(DaemonTaskConfig::getTaskId)
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,20 @@ public class DaemonTaskConfig {

private final String type;
private Integer id;
private Integer taskId;

public DaemonTaskConfig(String type, Integer id) {
private DaemonTaskConfig(String type, Integer id, Integer taskId) {
this.type = type;
this.id = id;
this.taskId = taskId;
}

public DaemonTaskConfig(String type) {
this.type = type;
}

public static DaemonTaskConfig build(String type, Integer id) {
return new DaemonTaskConfig(type, id);
public static DaemonTaskConfig build(String type, Integer id, Integer taskId) {
return new DaemonTaskConfig(type, id, taskId);
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion dinky-web/src/models/UseWebSocketModel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ export enum Topic {
JVM_INFO = 'JVM_INFO',
PROCESS_CONSOLE = 'PROCESS_CONSOLE',
PRINT_TABLE = 'PRINT_TABLE',
METRICS = 'METRICS'
METRICS = 'METRICS',
TASK_RUN_INSTANCE = 'TASK_RUN_INSTANCE',
}

export type SubscriberData = {
Expand Down
Loading

0 comments on commit 5664f55

Please sign in to comment.