From b5b99f67b0ce56f6266bc54aab57ac1de8104738 Mon Sep 17 00:00:00 2001 From: gaoyan Date: Mon, 4 Dec 2023 18:21:23 +0800 Subject: [PATCH] Fix some npe (#2597) * fix alert some npe * fix alert some npe --- .../dinky/data/model/ext/JobAlertData.java | 22 +++++++++++++++---- .../dinky/job/handler/JobAlertHandler.java | 15 ++++++++++++- .../dinky/job/handler/JobMetricsHandler.java | 20 +++++++++++------ 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/data/model/ext/JobAlertData.java b/dinky-admin/src/main/java/org/dinky/data/model/ext/JobAlertData.java index bc3ba5a980..c261056282 100644 --- a/dinky-admin/src/main/java/org/dinky/data/model/ext/JobAlertData.java +++ b/dinky-admin/src/main/java/org/dinky/data/model/ext/JobAlertData.java @@ -31,6 +31,9 @@ import org.dinky.job.JobConfig; import org.dinky.utils.TimeUtil; +import java.time.LocalDateTime; +import java.util.Optional; + import com.fasterxml.jackson.annotation.JsonProperty; import cn.hutool.core.text.StrFormatter; @@ -144,6 +147,10 @@ private static String buildTaskUrl(JobInstance jobInstance) { jobInstance.getTaskId()); } + private static String getTime(LocalDateTime time) { + return time == null ? "" : TimeUtil.convertTimeToString(time); + } + public static JobAlertData buildData(JobInfoDetail jobInfoDetail) { JobAlertDataBuilder builder = JobAlertData.builder(); builder.alertTime(TimeUtil.nowStr()); @@ -163,12 +170,13 @@ public static JobAlertData buildData(JobInfoDetail jobInfoDetail) { .taskUrl(buildTaskUrl(jobInstance)) .jobName(jobInstance.getName()) .jobId(jobInstance.getJid()) - .duration(jobInstance.getDuration()) - .jobStartTime(TimeUtil.convertTimeToString(jobInstance.getCreateTime())) - .jobEndTime(TimeUtil.convertTimeToString(jobInstance.getFinishTime())); + .duration(Optional.ofNullable(jobInstance.getDuration()).orElse(0L)) + .jobStartTime(getTime(jobInstance.getCreateTime())) + .jobEndTime(getTime(jobInstance.getFinishTime())); if (job != null) { builder.batchModel(job.isBatchModel()); } + if (clusterInstance != null) { builder.clusterName(clusterInstance.getName()) .clusterType(clusterInstance.getType()) @@ -180,7 +188,13 @@ public static JobAlertData buildData(JobInfoDetail jobInfoDetail) { } else if (exceptions != null && ExceptionRule.isException(id, exceptions)) { // The error message is too long to send an alarm, // and only the first line of abnormal information is used - builder.isException(true).errorMsg(exceptions.getRootException().split("\n")[0]); + String err = Optional.ofNullable(exceptions.getRootException()) + .orElse("dinky didn't get any ERROR!") + .split("\n")[0]; + if (err.length() > 100) { + err = err.substring(0, 100) + "..."; + } + builder.isException(true).errorMsg(err); } if (checkpoints != null) { diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java index c37d9063b7..f055d125f5 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java @@ -28,7 +28,9 @@ import org.dinky.daemon.pool.FlinkJobThreadPool; import org.dinky.data.dto.AlertRuleDTO; import org.dinky.data.dto.TaskDTO; +import org.dinky.data.enums.JobLifeCycle; import org.dinky.data.enums.Status; +import org.dinky.data.exception.DinkyException; import org.dinky.data.model.alert.AlertGroup; import org.dinky.data.model.alert.AlertHistory; import org.dinky.data.model.alert.AlertInstance; @@ -45,6 +47,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import org.jeasy.rules.api.Facts; @@ -117,7 +120,13 @@ public JobAlertHandler() { public void check(JobInfoDetail jobInfoDetail) { Facts ruleFacts = new Facts(); JobAlertData jobAlertData = JobAlertData.buildData(jobInfoDetail); - JsonUtils.toMap(jobAlertData).forEach(ruleFacts::put); + JsonUtils.toMap(jobAlertData).forEach((k, v) -> { + if (v == null) { + throw new DinkyException(StrFormatter.format( + "When deal alert job data, the key [{}] value is null, its maybe dinky bug,please report", k)); + } + ruleFacts.put(k, v); + }); rulesEngine.fire(rules, ruleFacts); } @@ -171,6 +180,10 @@ private Rule buildRule(AlertRuleDTO alertRuleDTO) { */ private void executeAlertAction(Facts facts, AlertRuleDTO alertRuleDTO) { TaskDTO task = taskService.getTaskInfoById(facts.get(JobAlertRuleOptions.FIELD_TASK_ID)); + if (!Objects.equals(task.getStep(), JobLifeCycle.PUBLISH.getValue())) { + // Only publish job can be alerted + return; + } Map dataModel = new HashMap<>(facts.asMap()); dataModel.put(JobAlertRuleOptions.OPTIONS_JOB_ALERT_RULE, alertRuleDTO); String alertContent; diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java index ba75779d0b..d753b7faba 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobMetricsHandler.java @@ -39,10 +39,12 @@ import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; +import lombok.extern.slf4j.Slf4j; /** * The Job Metrics Handler class is used to process operations related to job metrics。 */ +@Slf4j public class JobMetricsHandler { /** @@ -63,13 +65,17 @@ public static void writeFlinkMetrics(JobInfoDetail jobInfoDetail) { () -> fetchFlinkMetrics(e.getKey(), e.getValue(), jobManagerUrls, jobId))) .toArray(CompletableFuture[]::new); // Wait for all Completable Future executions to finish - AsyncUtil.waitAll(array); - MetricsVO metricsVO = new MetricsVO(); - metricsVO.setContent(customMetricsList); - metricsVO.setHeartTime(LocalDateTime.now()); - metricsVO.setModel(jobId); - metricsVO.setDate(TimeUtil.nowStr("yyyy-MM-dd")); - MetricsContextHolder.getInstances().sendAsync(metricsVO.getModel(), metricsVO); + try { + AsyncUtil.waitAll(array); + MetricsVO metricsVO = new MetricsVO(); + metricsVO.setContent(customMetricsList); + metricsVO.setHeartTime(LocalDateTime.now()); + metricsVO.setModel(jobId); + metricsVO.setDate(TimeUtil.nowStr("yyyy-MM-dd")); + MetricsContextHolder.getInstances().sendAsync(metricsVO.getModel(), metricsVO); + } catch (Exception e) { + log.error("Get and save Flink metrics error", e); + } } /**