Skip to content

Commit

Permalink
Fix some npe (#2597)
Browse files Browse the repository at this point in the history
* fix alert some npe

* fix alert some npe
  • Loading branch information
gaoyan1998 authored Dec 4, 2023
1 parent 17ed721 commit b5b99f6
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.dinky.job.JobConfig;
import org.dinky.utils.TimeUtil;

import java.time.LocalDateTime;
import java.util.Optional;

import com.fasterxml.jackson.annotation.JsonProperty;

import cn.hutool.core.text.StrFormatter;
Expand Down Expand Up @@ -144,6 +147,10 @@ private static String buildTaskUrl(JobInstance jobInstance) {
jobInstance.getTaskId());
}

private static String getTime(LocalDateTime time) {
return time == null ? "" : TimeUtil.convertTimeToString(time);
}

public static JobAlertData buildData(JobInfoDetail jobInfoDetail) {
JobAlertDataBuilder builder = JobAlertData.builder();
builder.alertTime(TimeUtil.nowStr());
Expand All @@ -163,12 +170,13 @@ public static JobAlertData buildData(JobInfoDetail jobInfoDetail) {
.taskUrl(buildTaskUrl(jobInstance))
.jobName(jobInstance.getName())
.jobId(jobInstance.getJid())
.duration(jobInstance.getDuration())
.jobStartTime(TimeUtil.convertTimeToString(jobInstance.getCreateTime()))
.jobEndTime(TimeUtil.convertTimeToString(jobInstance.getFinishTime()));
.duration(Optional.ofNullable(jobInstance.getDuration()).orElse(0L))
.jobStartTime(getTime(jobInstance.getCreateTime()))
.jobEndTime(getTime(jobInstance.getFinishTime()));
if (job != null) {
builder.batchModel(job.isBatchModel());
}

if (clusterInstance != null) {
builder.clusterName(clusterInstance.getName())
.clusterType(clusterInstance.getType())
Expand All @@ -180,7 +188,13 @@ public static JobAlertData buildData(JobInfoDetail jobInfoDetail) {
} else if (exceptions != null && ExceptionRule.isException(id, exceptions)) {
// The error message is too long to send an alarm,
// and only the first line of abnormal information is used
builder.isException(true).errorMsg(exceptions.getRootException().split("\n")[0]);
String err = Optional.ofNullable(exceptions.getRootException())
.orElse("dinky didn't get any ERROR!")
.split("\n")[0];
if (err.length() > 100) {
err = err.substring(0, 100) + "...";
}
builder.isException(true).errorMsg(err);
}

if (checkpoints != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.dinky.daemon.pool.FlinkJobThreadPool;
import org.dinky.data.dto.AlertRuleDTO;
import org.dinky.data.dto.TaskDTO;
import org.dinky.data.enums.JobLifeCycle;
import org.dinky.data.enums.Status;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.alert.AlertGroup;
import org.dinky.data.model.alert.AlertHistory;
import org.dinky.data.model.alert.AlertInstance;
Expand All @@ -45,6 +47,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import org.jeasy.rules.api.Facts;
Expand Down Expand Up @@ -117,7 +120,13 @@ public JobAlertHandler() {
public void check(JobInfoDetail jobInfoDetail) {
Facts ruleFacts = new Facts();
JobAlertData jobAlertData = JobAlertData.buildData(jobInfoDetail);
JsonUtils.toMap(jobAlertData).forEach(ruleFacts::put);
JsonUtils.toMap(jobAlertData).forEach((k, v) -> {
if (v == null) {
throw new DinkyException(StrFormatter.format(
"When deal alert job data, the key [{}] value is null, its maybe dinky bug,please report", k));
}
ruleFacts.put(k, v);
});
rulesEngine.fire(rules, ruleFacts);
}

Expand Down Expand Up @@ -171,6 +180,10 @@ private Rule buildRule(AlertRuleDTO alertRuleDTO) {
*/
private void executeAlertAction(Facts facts, AlertRuleDTO alertRuleDTO) {
TaskDTO task = taskService.getTaskInfoById(facts.get(JobAlertRuleOptions.FIELD_TASK_ID));
if (!Objects.equals(task.getStep(), JobLifeCycle.PUBLISH.getValue())) {
// Only publish job can be alerted
return;
}
Map<String, Object> dataModel = new HashMap<>(facts.asMap());
dataModel.put(JobAlertRuleOptions.OPTIONS_JOB_ALERT_RULE, alertRuleDTO);
String alertContent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;

/**
* The Job Metrics Handler class is used to process operations related to job metrics。
*/
@Slf4j
public class JobMetricsHandler {

/**
Expand All @@ -63,13 +65,17 @@ public static void writeFlinkMetrics(JobInfoDetail jobInfoDetail) {
() -> fetchFlinkMetrics(e.getKey(), e.getValue(), jobManagerUrls, jobId)))
.toArray(CompletableFuture[]::new);
// Wait for all Completable Future executions to finish
AsyncUtil.waitAll(array);
MetricsVO metricsVO = new MetricsVO();
metricsVO.setContent(customMetricsList);
metricsVO.setHeartTime(LocalDateTime.now());
metricsVO.setModel(jobId);
metricsVO.setDate(TimeUtil.nowStr("yyyy-MM-dd"));
MetricsContextHolder.getInstances().sendAsync(metricsVO.getModel(), metricsVO);
try {
AsyncUtil.waitAll(array);
MetricsVO metricsVO = new MetricsVO();
metricsVO.setContent(customMetricsList);
metricsVO.setHeartTime(LocalDateTime.now());
metricsVO.setModel(jobId);
metricsVO.setDate(TimeUtil.nowStr("yyyy-MM-dd"));
MetricsContextHolder.getInstances().sendAsync(metricsVO.getModel(), metricsVO);
} catch (Exception e) {
log.error("Get and save Flink metrics error", e);
}
}

/**
Expand Down

0 comments on commit b5b99f6

Please sign in to comment.