diff --git a/dinky-admin/src/main/java/org/dinky/context/MetricsContextHolder.java b/dinky-admin/src/main/java/org/dinky/context/MetricsContextHolder.java index c6fd22edeb..ecb0578a1c 100644 --- a/dinky-admin/src/main/java/org/dinky/context/MetricsContextHolder.java +++ b/dinky-admin/src/main/java/org/dinky/context/MetricsContextHolder.java @@ -78,6 +78,11 @@ private static synchronized void writeMetrics(List metricsList) { metrics.setDate(now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); metrics.setContent(JSONUtil.toJsonStr(metrics.getContent())); }); - PaimonUtil.write(PaimonTableConstant.DINKY_METRICS, metricsList, MetricsVO.class); + try { + PaimonUtil.write(PaimonTableConstant.DINKY_METRICS, metricsList, MetricsVO.class); + } catch (Exception e) { + // todo 此处最好发个告警 + log.error("write metrics error", e); + } } } diff --git a/dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java b/dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java index 525efb84c6..49097993ce 100644 --- a/dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java +++ b/dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java @@ -33,10 +33,12 @@ import org.springframework.context.annotation.DependsOn; +import lombok.Data; import lombok.extern.slf4j.Slf4j; @DependsOn("springContextUtils") @Slf4j +@Data public class FlinkJobTask implements DaemonTask { private DaemonTaskConfig config; diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java index d77941770f..ba75779d0b 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java @@ -24,6 +24,7 @@ import org.dinky.data.model.ext.JobInfoDetail; import org.dinky.data.vo.MetricsVO; import org.dinky.utils.HttpUtils; +import org.dinky.utils.TimeUtil; import java.time.LocalDateTime; import java.util.Arrays; @@ -67,6 +68,7 @@ public static void writeFlinkMetrics(JobInfoDetail jobInfoDetail) { metricsVO.setContent(customMetricsList); metricsVO.setHeartTime(LocalDateTime.now()); metricsVO.setModel(jobId); + metricsVO.setDate(TimeUtil.nowStr("yyyy-MM-dd")); MetricsContextHolder.getInstances().sendAsync(metricsVO.getModel(), metricsVO); } diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java index 90062c1bca..b3c606f68d 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java @@ -22,7 +22,6 @@ import org.dinky.assertion.Asserts; import org.dinky.context.TenantContextHolder; import org.dinky.daemon.pool.DefaultThreadPool; -import org.dinky.daemon.task.DaemonFactory; import org.dinky.daemon.task.DaemonTask; import org.dinky.daemon.task.DaemonTaskConfig; import org.dinky.data.dto.ClusterConfigurationDTO; @@ -40,7 +39,6 @@ import org.dinky.explainer.lineage.LineageBuilder; import org.dinky.explainer.lineage.LineageResult; import org.dinky.job.FlinkJobTask; -import org.dinky.job.handler.JobRefreshHandler; import org.dinky.mapper.JobInstanceMapper; import org.dinky.mybatis.service.impl.SuperServiceImpl; import org.dinky.mybatis.util.ProTableUtil; @@ -54,6 +52,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import org.springframework.stereotype.Service; @@ -188,17 +187,23 @@ public JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance) { @Override public JobInfoDetail refreshJobInfoDetail(Integer jobInstanceId, boolean isForce) { - JobInfoDetail jobInfoDetail = getJobInfoDetail(jobInstanceId); - // Directly returns database data if the task has completed and is not a forced refresh - if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus()) && !isForce) { + DaemonTaskConfig daemonTaskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstanceId); + DaemonTask daemonTask = DefaultThreadPool.getInstance().getByTaskConfig(daemonTaskConfig); + + if (daemonTask != null) { + daemonTask.dealTask(); + return ((FlinkJobTask) daemonTask).getJobInfoDetail(); + } else if (isForce) { + daemonTask = DaemonTask.build(daemonTaskConfig); + daemonTask.dealTask(); + JobInfoDetail jobInfoDetail = ((FlinkJobTask) daemonTask).getJobInfoDetail(); + if (!JobStatus.isDone(jobInfoDetail.getInstance().getStatus())) { + DefaultThreadPool.getInstance().execute(daemonTask); + } return jobInfoDetail; + } else { + return getJobInfoDetail(jobInstanceId); } - boolean isDone = JobRefreshHandler.refreshJob(jobInfoDetail, true); - // If the task becomes incomplete after a forced refresh, it is re-queued to the task - if (!isDone) { - DaemonFactory.refeshOraddTask(DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstanceId)); - } - return jobInfoDetail; } @Override @@ -211,11 +216,11 @@ public boolean hookJobDone(String jobId, Integer taskId) { // returning true to prevent retry. return true; } + DaemonTaskConfig config = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId()); - DaemonTask daemonTask = DefaultThreadPool.getInstance().dequeueByTask(config); - if (daemonTask == null) { - daemonTask = DaemonTask.build(config); - } + DaemonTask daemonTask = DefaultThreadPool.getInstance().removeByTaskConfig(config); + daemonTask = Optional.ofNullable(daemonTask).orElse(DaemonTask.build(config)); + boolean isDone = daemonTask.dealTask(); // If the task is not completed, it is re-queued if (!isDone) { @@ -228,6 +233,9 @@ public boolean hookJobDone(String jobId, Integer taskId) { public void refreshJobByTaskIds(Integer... taskIds) { for (Integer taskId : taskIds) { JobInstance instance = getJobInstanceByTaskId(taskId); + DaemonTaskConfig daemonTaskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId()); + DefaultThreadPool.getInstance().removeByTaskConfig(daemonTaskConfig); + DefaultThreadPool.getInstance().execute(DaemonTask.build(daemonTaskConfig)); refreshJobInfoDetail(instance.getId(), false); } } diff --git a/dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java b/dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java index b4ede38421..1c20873d22 100644 --- a/dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java +++ b/dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java @@ -69,6 +69,7 @@ import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.TimeInterval; import cn.hutool.core.map.MapUtil; +import cn.hutool.core.text.StrFormatter; import cn.hutool.core.util.ModifierUtil; import cn.hutool.core.util.ReflectUtil; import cn.hutool.core.util.StrUtil; @@ -106,7 +107,7 @@ public static void write(String table, List dataList, Class clazz) { if (CollUtil.isEmpty(dataList)) { return; } - Table paimonTable = createOrGetTable(table, null); + Table paimonTable = createOrGetTable(table, clazz); BatchWriteBuilder writeBuilder = paimonTable.newBatchWriteBuilder(); // 2. Write records in distributed tasks @@ -120,19 +121,24 @@ public static void write(String table, List dataList, Class clazz) { DataField dataField = fields.get(i); DataType type = dataField.type(); String fieldName = StrUtil.toCamelCase(dataField.name()); - if (type.getTypeRoot() == DataTypeRoot.VARCHAR) { - BinaryWriter.write( - writer, - i, - BinaryString.fromString(JSONUtil.toJsonStr(ReflectUtil.getFieldValue(t, fieldName))), - type, - null); - } else if (type.getTypeRoot() == DataTypeRoot.TIME_WITHOUT_TIME_ZONE) { - Timestamp timestamp = - Timestamp.fromLocalDateTime((LocalDateTime) ReflectUtil.getFieldValue(t, fieldName)); - BinaryWriter.write(writer, i, timestamp, type, null); - } else { - BinaryWriter.write(writer, i, ReflectUtil.getFieldValue(t, fieldName), type, null); + Object fieldValue = ReflectUtil.getFieldValue(t, fieldName); + try { + if (type.getTypeRoot() == DataTypeRoot.VARCHAR) { + BinaryWriter.write( + writer, i, BinaryString.fromString(JSONUtil.toJsonStr(fieldValue)), type, null); + } else if (type.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { + Timestamp timestamp = Timestamp.fromLocalDateTime((LocalDateTime) fieldValue); + BinaryWriter.write(writer, i, timestamp, type, null); + } else { + BinaryWriter.write(writer, i, fieldValue, type, null); + } + } catch (Throwable e) { + String err = StrFormatter.format( + "write table: [{}], data filed [{}], value: [{}] error", + paimonTable.name(), + fieldName, + fieldValue); + throw new RuntimeException(err, e); } } write.write(row); diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskQueue.java b/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskQueue.java index c51eaddffd..c69001fbb3 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskQueue.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskQueue.java @@ -22,7 +22,7 @@ import org.dinky.daemon.task.DaemonTask; import org.dinky.daemon.task.DaemonTaskConfig; -import java.util.LinkedList; +import java.util.ArrayList; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -31,20 +31,19 @@ @Slf4j public class TaskQueue { - private final LinkedList tasks = new LinkedList<>(); + private final ArrayList tasks = new ArrayList<>(); + private int _next_index = 0; private final Object lock = new Object(); - public void enqueue(T task) { + public void addTask(T task) { synchronized (lock) { + tasks.add(task); lock.notifyAll(); - // prevent duplicate additions - dequeueByTask(task.getConfig()); - tasks.addLast(task); } } - public T dequeue() { + public T getNext() { synchronized (lock) { while (tasks.isEmpty()) { try { @@ -53,11 +52,16 @@ public T dequeue() { log.error(e.getMessage(), e); } } - return tasks.removeFirst(); + if (_next_index >= tasks.size()) { + _next_index = 0; + } + T task = tasks.get(_next_index); + _next_index++; + return task; } } - public T dequeueByTask(DaemonTaskConfig task) { + public T getByTaskConfig(DaemonTaskConfig task) { synchronized (lock) { T find = null; for (T t : tasks) { @@ -65,6 +69,13 @@ public T dequeueByTask(DaemonTaskConfig task) { find = t; } } + return find; + } + } + + public T removeByTaskConfig(DaemonTaskConfig task) { + synchronized (lock) { + T find = getByTaskConfig(task); if (find != null) { tasks.remove(find); } @@ -72,6 +83,12 @@ public T dequeueByTask(DaemonTaskConfig task) { } } + public void removeByTask(T task) { + synchronized (lock) { + tasks.remove(task); + } + } + public int getTaskSize() { synchronized (lock) { return tasks.size(); diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskWorker.java b/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskWorker.java index ecef11e032..d7a0eff14d 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskWorker.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskWorker.java @@ -50,12 +50,12 @@ public TaskWorker(TaskQueue queue) { public void run() { log.debug("TaskWorker run:" + Thread.currentThread().getName()); while (running) { - DaemonTask daemonTask = queue.dequeue(); + DaemonTask daemonTask = queue.getNext(); if (daemonTask != null) { try { boolean done = daemonTask.dealTask(); - if (!done) { - queue.enqueue(daemonTask); + if (done) { + queue.removeByTask(daemonTask); } } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/pool/DefaultThreadPool.java b/dinky-daemon/src/main/java/org/dinky/daemon/pool/DefaultThreadPool.java index 9d0c91244c..b5fc17cbd5 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/pool/DefaultThreadPool.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/pool/DefaultThreadPool.java @@ -47,35 +47,25 @@ public class DefaultThreadPool implements ThreadPool { private final TaskQueue queue = new TaskQueue<>(); - private static DefaultThreadPool defaultThreadPool; - private DefaultThreadPool() { addWorkers(DEFAULT_WORKER_NUM); } + private static final class DefaultThreadPoolHolder { + private static final DefaultThreadPool defaultThreadPool = new DefaultThreadPool(); + } + public static DefaultThreadPool getInstance() { - if (defaultThreadPool == null) { - synchronized (DefaultThreadPool.class) { - if (defaultThreadPool == null) { - defaultThreadPool = new DefaultThreadPool(); - } - } - } - return defaultThreadPool; + return DefaultThreadPoolHolder.defaultThreadPool; } @Override public void execute(DaemonTask daemonTask) { if (daemonTask != null) { - queue.enqueue(daemonTask); + queue.addTask(daemonTask); } } - @Override - public DaemonTask dequeueByTask(DaemonTaskConfig daemonTask) { - return queue.dequeueByTask(daemonTask); - } - @Override public void addWorkers(int num) { synchronized (lock) { @@ -132,6 +122,14 @@ public int getTaskSize() { return queue.getTaskSize(); } + public DaemonTask getByTaskConfig(DaemonTaskConfig daemonTask) { + return queue.getByTaskConfig(daemonTask); + } + + public DaemonTask removeByTaskConfig(DaemonTaskConfig daemonTask) { + return queue.removeByTaskConfig(daemonTask); + } + public int getWorkCount() { synchronized (lock) { return this.workerNum.get(); diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/pool/ThreadPool.java b/dinky-daemon/src/main/java/org/dinky/daemon/pool/ThreadPool.java index d4ddd8c425..85cbf2b37e 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/pool/ThreadPool.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/pool/ThreadPool.java @@ -20,7 +20,6 @@ package org.dinky.daemon.pool; import org.dinky.daemon.task.DaemonTask; -import org.dinky.daemon.task.DaemonTaskConfig; /** * @operate @@ -31,8 +30,6 @@ public interface ThreadPool { // 执行任务 void execute(DaemonTask daemonTask); - DaemonTask dequeueByTask(DaemonTaskConfig daemonTask); - // 关闭连接池 void shutdown();