Skip to content

Commit

Permalink
Fix damemon, Paimon , Metrics some bug (#2532)
Browse files Browse the repository at this point in the history
* Optimize the process

* fix some bug
  • Loading branch information
gaoyan1998 authored Nov 16, 2023
1 parent 39775d7 commit ca88bac
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ private static synchronized void writeMetrics(List<MetricsVO> metricsList) {
metrics.setDate(now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
metrics.setContent(JSONUtil.toJsonStr(metrics.getContent()));
});
PaimonUtil.write(PaimonTableConstant.DINKY_METRICS, metricsList, MetricsVO.class);
try {
PaimonUtil.write(PaimonTableConstant.DINKY_METRICS, metricsList, MetricsVO.class);
} catch (Exception e) {
// todo 此处最好发个告警
log.error("write metrics error", e);
}
}
}
2 changes: 2 additions & 0 deletions dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@

import org.springframework.context.annotation.DependsOn;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@DependsOn("springContextUtils")
@Slf4j
@Data
public class FlinkJobTask implements DaemonTask {

private DaemonTaskConfig config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.dinky.data.model.ext.JobInfoDetail;
import org.dinky.data.vo.MetricsVO;
import org.dinky.utils.HttpUtils;
import org.dinky.utils.TimeUtil;

import java.time.LocalDateTime;
import java.util.Arrays;
Expand Down Expand Up @@ -67,6 +68,7 @@ public static void writeFlinkMetrics(JobInfoDetail jobInfoDetail) {
metricsVO.setContent(customMetricsList);
metricsVO.setHeartTime(LocalDateTime.now());
metricsVO.setModel(jobId);
metricsVO.setDate(TimeUtil.nowStr("yyyy-MM-dd"));
MetricsContextHolder.getInstances().sendAsync(metricsVO.getModel(), metricsVO);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.dinky.assertion.Asserts;
import org.dinky.context.TenantContextHolder;
import org.dinky.daemon.pool.DefaultThreadPool;
import org.dinky.daemon.task.DaemonFactory;
import org.dinky.daemon.task.DaemonTask;
import org.dinky.daemon.task.DaemonTaskConfig;
import org.dinky.data.dto.ClusterConfigurationDTO;
Expand All @@ -40,7 +39,6 @@
import org.dinky.explainer.lineage.LineageBuilder;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.job.FlinkJobTask;
import org.dinky.job.handler.JobRefreshHandler;
import org.dinky.mapper.JobInstanceMapper;
import org.dinky.mybatis.service.impl.SuperServiceImpl;
import org.dinky.mybatis.util.ProTableUtil;
Expand All @@ -54,6 +52,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -188,17 +187,23 @@ public JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance) {

@Override
public JobInfoDetail refreshJobInfoDetail(Integer jobInstanceId, boolean isForce) {
JobInfoDetail jobInfoDetail = getJobInfoDetail(jobInstanceId);
// Directly returns database data if the task has completed and is not a forced refresh
if (JobStatus.isDone(jobInfoDetail.getInstance().getStatus()) && !isForce) {
DaemonTaskConfig daemonTaskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstanceId);
DaemonTask daemonTask = DefaultThreadPool.getInstance().getByTaskConfig(daemonTaskConfig);

if (daemonTask != null) {
daemonTask.dealTask();
return ((FlinkJobTask) daemonTask).getJobInfoDetail();
} else if (isForce) {
daemonTask = DaemonTask.build(daemonTaskConfig);
daemonTask.dealTask();
JobInfoDetail jobInfoDetail = ((FlinkJobTask) daemonTask).getJobInfoDetail();
if (!JobStatus.isDone(jobInfoDetail.getInstance().getStatus())) {
DefaultThreadPool.getInstance().execute(daemonTask);
}
return jobInfoDetail;
} else {
return getJobInfoDetail(jobInstanceId);
}
boolean isDone = JobRefreshHandler.refreshJob(jobInfoDetail, true);
// If the task becomes incomplete after a forced refresh, it is re-queued to the task
if (!isDone) {
DaemonFactory.refeshOraddTask(DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstanceId));
}
return jobInfoDetail;
}

@Override
Expand All @@ -211,11 +216,11 @@ public boolean hookJobDone(String jobId, Integer taskId) {
// returning true to prevent retry.
return true;
}

DaemonTaskConfig config = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId());
DaemonTask daemonTask = DefaultThreadPool.getInstance().dequeueByTask(config);
if (daemonTask == null) {
daemonTask = DaemonTask.build(config);
}
DaemonTask daemonTask = DefaultThreadPool.getInstance().removeByTaskConfig(config);
daemonTask = Optional.ofNullable(daemonTask).orElse(DaemonTask.build(config));

boolean isDone = daemonTask.dealTask();
// If the task is not completed, it is re-queued
if (!isDone) {
Expand All @@ -228,6 +233,9 @@ public boolean hookJobDone(String jobId, Integer taskId) {
public void refreshJobByTaskIds(Integer... taskIds) {
for (Integer taskId : taskIds) {
JobInstance instance = getJobInstanceByTaskId(taskId);
DaemonTaskConfig daemonTaskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId());
DefaultThreadPool.getInstance().removeByTaskConfig(daemonTaskConfig);
DefaultThreadPool.getInstance().execute(DaemonTask.build(daemonTaskConfig));
refreshJobInfoDetail(instance.getId(), false);
}
}
Expand Down
34 changes: 20 additions & 14 deletions dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.TimeInterval;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.ModifierUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
Expand Down Expand Up @@ -106,7 +107,7 @@ public static <T> void write(String table, List<T> dataList, Class<?> clazz) {
if (CollUtil.isEmpty(dataList)) {
return;
}
Table paimonTable = createOrGetTable(table, null);
Table paimonTable = createOrGetTable(table, clazz);
BatchWriteBuilder writeBuilder = paimonTable.newBatchWriteBuilder();

// 2. Write records in distributed tasks
Expand All @@ -120,19 +121,24 @@ public static <T> void write(String table, List<T> dataList, Class<?> clazz) {
DataField dataField = fields.get(i);
DataType type = dataField.type();
String fieldName = StrUtil.toCamelCase(dataField.name());
if (type.getTypeRoot() == DataTypeRoot.VARCHAR) {
BinaryWriter.write(
writer,
i,
BinaryString.fromString(JSONUtil.toJsonStr(ReflectUtil.getFieldValue(t, fieldName))),
type,
null);
} else if (type.getTypeRoot() == DataTypeRoot.TIME_WITHOUT_TIME_ZONE) {
Timestamp timestamp =
Timestamp.fromLocalDateTime((LocalDateTime) ReflectUtil.getFieldValue(t, fieldName));
BinaryWriter.write(writer, i, timestamp, type, null);
} else {
BinaryWriter.write(writer, i, ReflectUtil.getFieldValue(t, fieldName), type, null);
Object fieldValue = ReflectUtil.getFieldValue(t, fieldName);
try {
if (type.getTypeRoot() == DataTypeRoot.VARCHAR) {
BinaryWriter.write(
writer, i, BinaryString.fromString(JSONUtil.toJsonStr(fieldValue)), type, null);
} else if (type.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
Timestamp timestamp = Timestamp.fromLocalDateTime((LocalDateTime) fieldValue);
BinaryWriter.write(writer, i, timestamp, type, null);
} else {
BinaryWriter.write(writer, i, fieldValue, type, null);
}
} catch (Throwable e) {
String err = StrFormatter.format(
"write table: [{}], data filed [{}], value: [{}] error",
paimonTable.name(),
fieldName,
fieldValue);
throw new RuntimeException(err, e);
}
}
write.write(row);
Expand Down
35 changes: 26 additions & 9 deletions dinky-daemon/src/main/java/org/dinky/daemon/entity/TaskQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.dinky.daemon.task.DaemonTask;
import org.dinky.daemon.task.DaemonTaskConfig;

import java.util.LinkedList;
import java.util.ArrayList;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -31,20 +31,19 @@
@Slf4j
public class TaskQueue<T extends DaemonTask> {

private final LinkedList<T> tasks = new LinkedList<>();
private final ArrayList<T> tasks = new ArrayList<>();
private int _next_index = 0;

private final Object lock = new Object();

public void enqueue(T task) {
public void addTask(T task) {
synchronized (lock) {
tasks.add(task);
lock.notifyAll();
// prevent duplicate additions
dequeueByTask(task.getConfig());
tasks.addLast(task);
}
}

public T dequeue() {
public T getNext() {
synchronized (lock) {
while (tasks.isEmpty()) {
try {
Expand All @@ -53,25 +52,43 @@ public T dequeue() {
log.error(e.getMessage(), e);
}
}
return tasks.removeFirst();
if (_next_index >= tasks.size()) {
_next_index = 0;
}
T task = tasks.get(_next_index);
_next_index++;
return task;
}
}

public T dequeueByTask(DaemonTaskConfig task) {
public T getByTaskConfig(DaemonTaskConfig task) {
synchronized (lock) {
T find = null;
for (T t : tasks) {
if (t.getConfig().equals(task)) {
find = t;
}
}
return find;
}
}

public T removeByTaskConfig(DaemonTaskConfig task) {
synchronized (lock) {
T find = getByTaskConfig(task);
if (find != null) {
tasks.remove(find);
}
return find;
}
}

public void removeByTask(T task) {
synchronized (lock) {
tasks.remove(task);
}
}

public int getTaskSize() {
synchronized (lock) {
return tasks.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ public TaskWorker(TaskQueue<DaemonTask> queue) {
public void run() {
log.debug("TaskWorker run:" + Thread.currentThread().getName());
while (running) {
DaemonTask daemonTask = queue.dequeue();
DaemonTask daemonTask = queue.getNext();
if (daemonTask != null) {
try {
boolean done = daemonTask.dealTask();
if (!done) {
queue.enqueue(daemonTask);
if (done) {
queue.removeByTask(daemonTask);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,35 +47,25 @@ public class DefaultThreadPool implements ThreadPool {

private final TaskQueue<DaemonTask> queue = new TaskQueue<>();

private static DefaultThreadPool defaultThreadPool;

private DefaultThreadPool() {
addWorkers(DEFAULT_WORKER_NUM);
}

private static final class DefaultThreadPoolHolder {
private static final DefaultThreadPool defaultThreadPool = new DefaultThreadPool();
}

public static DefaultThreadPool getInstance() {
if (defaultThreadPool == null) {
synchronized (DefaultThreadPool.class) {
if (defaultThreadPool == null) {
defaultThreadPool = new DefaultThreadPool();
}
}
}
return defaultThreadPool;
return DefaultThreadPoolHolder.defaultThreadPool;
}

@Override
public void execute(DaemonTask daemonTask) {
if (daemonTask != null) {
queue.enqueue(daemonTask);
queue.addTask(daemonTask);
}
}

@Override
public DaemonTask dequeueByTask(DaemonTaskConfig daemonTask) {
return queue.dequeueByTask(daemonTask);
}

@Override
public void addWorkers(int num) {
synchronized (lock) {
Expand Down Expand Up @@ -132,6 +122,14 @@ public int getTaskSize() {
return queue.getTaskSize();
}

public DaemonTask getByTaskConfig(DaemonTaskConfig daemonTask) {
return queue.getByTaskConfig(daemonTask);
}

public DaemonTask removeByTaskConfig(DaemonTaskConfig daemonTask) {
return queue.removeByTaskConfig(daemonTask);
}

public int getWorkCount() {
synchronized (lock) {
return this.workerNum.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.dinky.daemon.pool;

import org.dinky.daemon.task.DaemonTask;
import org.dinky.daemon.task.DaemonTaskConfig;

/**
* @operate
Expand All @@ -31,8 +30,6 @@ public interface ThreadPool {
// 执行任务
void execute(DaemonTask daemonTask);

DaemonTask dequeueByTask(DaemonTaskConfig daemonTask);

// 关闭连接池
void shutdown();

Expand Down

0 comments on commit ca88bac

Please sign in to comment.