From 18d4216080b3a80371fed1a5536549e4b653e5aa Mon Sep 17 00:00:00 2001 From: ikiler Date: Sun, 22 Oct 2023 00:18:52 +0800 Subject: [PATCH 1/6] Optimize the process --- dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java | 7 +++++++ .../main/java/org/dinky/service/impl/TaskServiceImpl.java | 1 + 2 files changed, 8 insertions(+) diff --git a/dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java b/dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java index 3e144b6b5f..9dcc3e73d5 100644 --- a/dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java +++ b/dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java @@ -19,6 +19,7 @@ package org.dinky.aop; +import org.apache.http.util.TextUtils; import org.dinky.context.ConsoleContextHolder; import org.dinky.process.annotations.ExecuteProcess; import org.dinky.process.annotations.ProcessId; @@ -82,6 +83,12 @@ public Object processAround(ProceedingJoinPoint joinPoint, ExecuteProcess execut @Around(value = "@annotation(processStep)") public Object processStepAround(ProceedingJoinPoint joinPoint, ProcessStep processStep) throws Throwable { + String processName = MDC.get(PROCESS_NAME); + if (TextUtils.isEmpty(processName)){ + log.warn("Process {} does not exist, This registration step {} was abandoned", processName,processStep.type()); + return joinPoint.proceed(); + } + Object result; // Record the current step and restore it after the execution is completed String parentStep = MDC.get(PROCESS_STEP); diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java index 9cd8792572..71ad2a7a23 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskServiceImpl.java @@ -207,6 +207,7 @@ public JobResult executeJob(TaskDTO task) throws Exception { return jobResult; } + @ProcessStep(type = ProcessStepType.SUBMIT_BUILD_CONFIG) public JobConfig buildJobConfig(TaskDTO task) { task.setStatement(buildEnvSql(task) + task.getStatement()); JobConfig config = task.getJobConfig(); From de0cbc859f6281814988a0719dd925ad2dbb5065 Mon Sep 17 00:00:00 2001 From: ikiler Date: Sun, 19 Nov 2023 17:30:54 +0800 Subject: [PATCH 2/6] Optimize Daemon Schedule --- .../{schedule => }/ScheduledConfig.java | 2 +- .../org/dinky/data/model/job/History.java | 5 + .../org/dinky/data/model/job/JobInstance.java | 5 + .../main/java/org/dinky/init/SystemInit.java | 56 ++++++++-- .../org/dinky/job/BuildConfiguration.java | 1 + .../org/dinky/job/ClearJobHistoryTask.java | 79 +++++++++++++ .../job/DynamicResizeFlinkJobPoolTask.java | 64 +++++++++++ .../main/java/org/dinky/job/FlinkJobTask.java | 3 +- .../SystemMetricsTask.java} | 86 ++++++--------- .../job/handler/ClearJobHistoryHandler.java | 104 ++++++++++++++++++ .../job/{ => handler}/Job2MysqlHandler.java | 13 ++- .../dinky/job/handler/JobAlertHandler.java | 4 +- .../job/handler/SystemMetricsHandler.java | 54 +++++++++ .../service/impl/JobInstanceServiceImpl.java | 14 +-- .../services/org.dinky.daemon.task.DaemonTask | 5 +- .../services/org.dinky.job.JobHandler | 2 +- .../daemon/constant/FlinkTaskConstant.java | 2 +- ...hreadPool.java => FlinkJobThreadPool.java} | 8 +- .../dinky/daemon/pool/ScheduleThreadPool.java | 23 ++-- .../org/dinky/daemon/task/DaemonFactory.java | 93 ---------------- .../dinky/daemon/task/DaemonTaskConfig.java | 6 +- 21 files changed, 439 insertions(+), 190 deletions(-) rename dinky-admin/src/main/java/org/dinky/configure/{schedule => }/ScheduledConfig.java (97%) create mode 100644 dinky-admin/src/main/java/org/dinky/job/ClearJobHistoryTask.java create mode 100644 dinky-admin/src/main/java/org/dinky/job/DynamicResizeFlinkJobPoolTask.java rename dinky-admin/src/main/java/org/dinky/{configure/schedule/metrics/GatherSysIndicator.java => job/SystemMetricsTask.java} (53%) create mode 100644 dinky-admin/src/main/java/org/dinky/job/handler/ClearJobHistoryHandler.java rename dinky-admin/src/main/java/org/dinky/job/{ => handler}/Job2MysqlHandler.java (95%) create mode 100644 dinky-admin/src/main/java/org/dinky/job/handler/SystemMetricsHandler.java rename dinky-daemon/src/main/java/org/dinky/daemon/pool/{DefaultThreadPool.java => FlinkJobThreadPool.java} (94%) rename dinky-admin/src/main/java/org/dinky/configure/schedule/BaseSchedule.java => dinky-daemon/src/main/java/org/dinky/daemon/pool/ScheduleThreadPool.java (76%) delete mode 100644 dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonFactory.java diff --git a/dinky-admin/src/main/java/org/dinky/configure/schedule/ScheduledConfig.java b/dinky-admin/src/main/java/org/dinky/configure/ScheduledConfig.java similarity index 97% rename from dinky-admin/src/main/java/org/dinky/configure/schedule/ScheduledConfig.java rename to dinky-admin/src/main/java/org/dinky/configure/ScheduledConfig.java index 5a7290e483..4c94b78670 100644 --- a/dinky-admin/src/main/java/org/dinky/configure/schedule/ScheduledConfig.java +++ b/dinky-admin/src/main/java/org/dinky/configure/ScheduledConfig.java @@ -17,7 +17,7 @@ * */ -package org.dinky.configure.schedule; +package org.dinky.configure; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; diff --git a/dinky-admin/src/main/java/org/dinky/data/model/job/History.java b/dinky-admin/src/main/java/org/dinky/data/model/job/History.java index 8194af5141..25120003bf 100644 --- a/dinky-admin/src/main/java/org/dinky/data/model/job/History.java +++ b/dinky-admin/src/main/java/org/dinky/data/model/job/History.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.time.LocalDateTime; +import com.baomidou.mybatisplus.annotation.FieldStrategy; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableName; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -107,6 +108,10 @@ public class History implements Serializable { @ApiModelProperty(hidden = true) private String clusterName; + @TableField(value = "count(*)", insertStrategy = FieldStrategy.NEVER, updateStrategy = FieldStrategy.NEVER) + @ApiModelProperty(value = "Group by count", dataType = "Integer") + private Long count; + @ApiModelProperty(hidden = true) public JobInstance buildJobInstance() { JobInstance jobInstance = new JobInstance(); diff --git a/dinky-admin/src/main/java/org/dinky/data/model/job/JobInstance.java b/dinky-admin/src/main/java/org/dinky/data/model/job/JobInstance.java index 4c4e7c4a36..ad9fe5dfb1 100644 --- a/dinky-admin/src/main/java/org/dinky/data/model/job/JobInstance.java +++ b/dinky-admin/src/main/java/org/dinky/data/model/job/JobInstance.java @@ -23,6 +23,7 @@ import java.time.LocalDateTime; import com.baomidou.mybatisplus.annotation.FieldFill; +import com.baomidou.mybatisplus.annotation.FieldStrategy; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; @@ -146,4 +147,8 @@ public class JobInstance implements Serializable { @TableField(fill = FieldFill.INSERT_UPDATE) @ApiModelProperty(value = "Operator", required = true, dataType = "Integer", example = "Operator") private Integer operator; + + @TableField(value = "count(*)", insertStrategy = FieldStrategy.NEVER, updateStrategy = FieldStrategy.NEVER) + @ApiModelProperty(value = "Group by count", dataType = "Integer") + private Long count; } diff --git a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java index 36e3130dd9..1c707f4452 100644 --- a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java +++ b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java @@ -23,9 +23,13 @@ import org.dinky.assertion.Asserts; import org.dinky.context.TenantContextHolder; -import org.dinky.daemon.task.DaemonFactory; +import org.dinky.daemon.constant.FlinkTaskConstant; +import org.dinky.daemon.pool.FlinkJobThreadPool; +import org.dinky.daemon.pool.ScheduleThreadPool; +import org.dinky.daemon.task.DaemonTask; import org.dinky.daemon.task.DaemonTaskConfig; import org.dinky.data.exception.DinkyException; +import org.dinky.data.model.Configuration; import org.dinky.data.model.SystemConfiguration; import org.dinky.data.model.Task; import org.dinky.data.model.job.JobInstance; @@ -33,7 +37,10 @@ import org.dinky.data.properties.OssProperties; import org.dinky.function.constant.PathConstant; import org.dinky.function.pool.UdfCodePool; +import org.dinky.job.ClearJobHistoryTask; +import org.dinky.job.DynamicResizeFlinkJobPoolTask; import org.dinky.job.FlinkJobTask; +import org.dinky.job.SystemMetricsTask; import org.dinky.oss.OssTemplate; import org.dinky.scheduler.client.ProjectClient; import org.dinky.scheduler.exception.SchedulerException; @@ -50,17 +57,18 @@ import org.dinky.utils.UDFUtils; import org.apache.catalina.webresources.TomcatURLStreamHandlerFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.annotation.Profile; import org.springframework.core.annotation.Order; +import org.springframework.scheduling.support.PeriodicTrigger; import org.springframework.stereotype.Component; import com.baomidou.mybatisplus.extension.activerecord.Model; @@ -92,6 +100,8 @@ public class SystemInit implements ApplicationRunner { private final TaskService taskService; private final TenantService tenantService; private final GitProjectService gitProjectService; + private final ScheduleThreadPool schedule; + private static Project project; @Override @@ -104,7 +114,7 @@ public void run(ApplicationArguments args) { for (Tenant tenant : tenants) { taskService.initDefaultFlinkSQLEnv(tenant.getId()); } - initTaskMonitor(); + initDaemon(); initDolphinScheduler(); registerUDF(); updateGitBuildState(); @@ -149,7 +159,8 @@ private void initResources() { Singleton.get(OssResourceManager.class).setOssTemplate(new OssTemplate(ossProperties)); break; case HDFS: - final Configuration configuration = new Configuration(); + final org.apache.hadoop.conf.Configuration configuration = + new org.apache.hadoop.conf.Configuration(); configuration.set( "fs.defaultFS", systemConfiguration @@ -174,14 +185,39 @@ private void initResources() { /** * init task monitor */ - private void initTaskMonitor() { + private void initDaemon() { + SystemConfiguration sysConfig = SystemConfiguration.getInstances(); + + // Init system metrics task + DaemonTask sysMetricsTask = DaemonTask.build(new DaemonTaskConfig(SystemMetricsTask.TYPE)); + Configuration metricsSysEnable = sysConfig.getMetricsSysEnable(); + Configuration sysGatherTiming = sysConfig.getMetricsSysGatherTiming(); + Consumer> metricsListener = c -> { + c.addChangeEvent(x -> { + schedule.removeSchedule(sysMetricsTask); + PeriodicTrigger trigger = new PeriodicTrigger(sysGatherTiming.getValue()); + if (metricsSysEnable.getValue()) schedule.addSchedule(sysMetricsTask, trigger); + }); + }; + metricsListener.accept(metricsSysEnable); + metricsListener.accept(sysGatherTiming); + + // Init clear job history task + DaemonTask clearJobHistoryTask = DaemonTask.build(new DaemonTaskConfig(ClearJobHistoryTask.TYPE)); + schedule.addSchedule(clearJobHistoryTask, new PeriodicTrigger(1, TimeUnit.HOURS)); + + // Init flink job dynamic pool task + DaemonTask flinkJobPoolTask = DaemonTask.build(new DaemonTaskConfig(DynamicResizeFlinkJobPoolTask.TYPE)); + schedule.addSchedule(flinkJobPoolTask, new PeriodicTrigger(FlinkTaskConstant.POLLING_GAP)); + + // Add flink running job task to flink job thread pool List jobInstances = jobInstanceService.listJobInstanceActive(); - List configList = new ArrayList<>(); + FlinkJobThreadPool flinkJobThreadPool = FlinkJobThreadPool.getInstance(); for (JobInstance jobInstance : jobInstances) { - configList.add(new DaemonTaskConfig(FlinkJobTask.TYPE, jobInstance.getId())); + DaemonTaskConfig config = new DaemonTaskConfig(FlinkJobTask.TYPE, jobInstance.getId()); + DaemonTask daemonTask = DaemonTask.build(config); + flinkJobThreadPool.execute(daemonTask); } - log.info("Number of tasks started: " + configList.size()); - DaemonFactory.start(configList); } /** diff --git a/dinky-admin/src/main/java/org/dinky/job/BuildConfiguration.java b/dinky-admin/src/main/java/org/dinky/job/BuildConfiguration.java index 3e424582ac..3048c95382 100644 --- a/dinky-admin/src/main/java/org/dinky/job/BuildConfiguration.java +++ b/dinky-admin/src/main/java/org/dinky/job/BuildConfiguration.java @@ -34,6 +34,7 @@ import com.fasterxml.jackson.databind.JsonNode; +// TODO 这个类不应该存在这里,?????????? public class BuildConfiguration { public static void buildJobManagerConfiguration( diff --git a/dinky-admin/src/main/java/org/dinky/job/ClearJobHistoryTask.java b/dinky-admin/src/main/java/org/dinky/job/ClearJobHistoryTask.java new file mode 100644 index 0000000000..ff897a0bf5 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/job/ClearJobHistoryTask.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.job; + +import org.dinky.context.SpringContextUtils; +import org.dinky.daemon.task.DaemonTask; +import org.dinky.daemon.task.DaemonTaskConfig; +import org.dinky.job.handler.ClearJobHistoryHandler; +import org.dinky.service.HistoryService; +import org.dinky.service.JobHistoryService; +import org.dinky.service.JobInstanceService; + +import org.springframework.context.annotation.DependsOn; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +@DependsOn("springContextUtils") +@Slf4j +@Data +public class ClearJobHistoryTask implements DaemonTask { + + public static final String TYPE = ClearJobHistoryTask.class.toString(); + + private static final JobInstanceService jobInstanceService; + private static final JobHistoryService jobHistoryService; + private static final HistoryService historyService; + private static final ClearJobHistoryHandler clearJobHistoryHandler; + + static { + jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class); + jobHistoryService = SpringContextUtils.getBean("jobHistoryServiceImpl", JobHistoryService.class); + historyService = SpringContextUtils.getBean("historyServiceImpl", HistoryService.class); + clearJobHistoryHandler = ClearJobHistoryHandler.builder() + .historyService(historyService) + .jobInstanceService(jobInstanceService) + .jobHistoryService(jobHistoryService) + .build(); + } + + @Override + public boolean dealTask() { + clearJobHistoryHandler.clearDinkyHistory(30, 20); + clearJobHistoryHandler.clearJobHistory(30, 20); + return false; + } + + @Override + public DaemonTask setConfig(DaemonTaskConfig config) { + return this; + } + + @Override + public DaemonTaskConfig getConfig() { + return null; + } + + @Override + public String getType() { + return TYPE; + } +} diff --git a/dinky-admin/src/main/java/org/dinky/job/DynamicResizeFlinkJobPoolTask.java b/dinky-admin/src/main/java/org/dinky/job/DynamicResizeFlinkJobPoolTask.java new file mode 100644 index 0000000000..262d6a425a --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/job/DynamicResizeFlinkJobPoolTask.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.job; + +import org.dinky.daemon.pool.FlinkJobThreadPool; +import org.dinky.daemon.task.DaemonTask; +import org.dinky.daemon.task.DaemonTaskConfig; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class DynamicResizeFlinkJobPoolTask implements DaemonTask { + + private DaemonTaskConfig config; + public static final String TYPE = DynamicResizeFlinkJobPoolTask.class.toString(); + private final FlinkJobThreadPool defaultThreadPool = FlinkJobThreadPool.getInstance(); + + @Override + public boolean dealTask() { + int taskSize = defaultThreadPool.getTaskSize(); + // Calculate the desired number of worker threads, adding one worker for every 100 tasks + int num = taskSize / 100 + 1; + // Dynamically adjust the number of worker threads in the thread pool + if (defaultThreadPool.getWorkCount() < num) { + defaultThreadPool.addWorkers(num - defaultThreadPool.getWorkCount()); + } else if (defaultThreadPool.getWorkCount() > num) { + defaultThreadPool.removeWorker(defaultThreadPool.getWorkCount() - num); + } + return false; + } + + @Override + public DaemonTask setConfig(DaemonTaskConfig config) { + this.config = config; + return this; + } + + @Override + public DaemonTaskConfig getConfig() { + return config; + } + + @Override + public String getType() { + return TYPE; + } +} diff --git a/dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java b/dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java index 49097993ce..91f03f9344 100644 --- a/dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java +++ b/dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java @@ -42,7 +42,8 @@ public class FlinkJobTask implements DaemonTask { private DaemonTaskConfig config; - public static final String TYPE = "jobInstance"; + public static final String TYPE = FlinkJobTask.class.toString(); + private static final JobInstanceService jobInstanceService; private long preDealTime; private long refreshCount = 0; diff --git a/dinky-admin/src/main/java/org/dinky/configure/schedule/metrics/GatherSysIndicator.java b/dinky-admin/src/main/java/org/dinky/job/SystemMetricsTask.java similarity index 53% rename from dinky-admin/src/main/java/org/dinky/configure/schedule/metrics/GatherSysIndicator.java rename to dinky-admin/src/main/java/org/dinky/job/SystemMetricsTask.java index f10a965dea..fb049d93ae 100644 --- a/dinky-admin/src/main/java/org/dinky/configure/schedule/metrics/GatherSysIndicator.java +++ b/dinky-admin/src/main/java/org/dinky/job/SystemMetricsTask.java @@ -17,85 +17,53 @@ * */ -package org.dinky.configure.schedule.metrics; +package org.dinky.job; -import org.dinky.configure.schedule.BaseSchedule; -import org.dinky.context.MetricsContextHolder; +import org.dinky.daemon.task.DaemonTask; +import org.dinky.daemon.task.DaemonTaskConfig; import org.dinky.data.annotations.GaugeM; -import org.dinky.data.enums.MetricsType; import org.dinky.data.metrics.BaseMetrics; -import org.dinky.data.metrics.Cpu; -import org.dinky.data.metrics.Jvm; -import org.dinky.data.metrics.Mem; import org.dinky.data.metrics.MetricsTotal; -import org.dinky.data.model.SystemConfiguration; -import org.dinky.data.vo.MetricsVO; +import org.dinky.job.handler.SystemMetricsHandler; import java.lang.reflect.Field; -import java.time.LocalDateTime; import java.util.Arrays; -import javax.annotation.PostConstruct; - -import org.springframework.scheduling.support.PeriodicTrigger; -import org.springframework.stereotype.Component; +import org.springframework.context.annotation.DependsOn; import cn.hutool.core.annotation.AnnotationUtil; import cn.hutool.core.lang.Opt; import cn.hutool.core.util.ReflectUtil; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; -import lombok.RequiredArgsConstructor; +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import lombok.Data; import lombok.extern.slf4j.Slf4j; -@Component -@RequiredArgsConstructor +@DependsOn("springContextUtils") @Slf4j -public class GatherSysIndicator extends BaseSchedule { - private final MeterRegistry registry; +@Data +public class SystemMetricsTask implements DaemonTask { + private final MeterRegistry registry = new CompositeMeterRegistry(); + + private DaemonTaskConfig config; + public static final String TYPE = SystemMetricsTask.class.toString(); - @PostConstruct - public void init() { + @Override + public DaemonTask setConfig(DaemonTaskConfig config) { + this.config = config; MetricsTotal metricsTotal = MetricsTotal.instance; registerMetrics(metricsTotal.getJvm()); registerMetrics(metricsTotal.getCpu()); registerMetrics(metricsTotal.getMem()); - org.dinky.data.model.Configuration metricsSysEnable = - SystemConfiguration.getInstances().getMetricsSysEnable(); - String key = metricsSysEnable.getKey(); - - metricsSysEnable.addChangeEvent(x -> { - if (x) { - addSchedule( - key, - this::updateState, - new PeriodicTrigger(SystemConfiguration.getInstances() - .getMetricsSysGatherTiming() - .getValue())); - } else { - removeSchedule(key); - log.info("Information collection for jvm is turned off(已关闭对jvm的信息收集)"); - } - }); + return this; } - public void updateState() { - log.debug("Collecting jvm related information."); - MetricsTotal metricsTotal = MetricsTotal.instance; - LocalDateTime now = LocalDateTime.now(); - - metricsTotal.setJvm(Jvm.of()); - metricsTotal.setCpu(Cpu.of()); - metricsTotal.setMem(Mem.of()); - - MetricsVO metrics = new MetricsVO(); - metrics.setContent(metricsTotal); - metrics.setHeartTime(now); - metrics.setModel(MetricsType.LOCAL.getType()); - MetricsContextHolder.getInstances().sendAsync(metrics.getModel(), metrics); - - log.debug("Collecting jvm information ends."); + @Override + public boolean dealTask() { + SystemMetricsHandler.refresh(); + return false; } private void registerMetrics(BaseMetrics baseMetrics) { @@ -118,4 +86,14 @@ private void registerMetrics(BaseMetrics baseMetrics) { .register(registry)); } } + + @Override + public DaemonTaskConfig getConfig() { + return config; + } + + @Override + public String getType() { + return TYPE; + } } diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/ClearJobHistoryHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/ClearJobHistoryHandler.java new file mode 100644 index 0000000000..8bfa08f6b6 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/job/handler/ClearJobHistoryHandler.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.job.handler; + +import org.dinky.data.model.job.History; +import org.dinky.data.model.job.JobInstance; +import org.dinky.service.HistoryService; +import org.dinky.service.JobHistoryService; +import org.dinky.service.JobInstanceService; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.stream.Collectors; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; + +import lombok.Builder; + +@Builder +public class ClearJobHistoryHandler { + private JobInstanceService jobInstanceService; + private JobHistoryService jobHistoryService; + private HistoryService historyService; + + /** + * Clears job history records based on the specified criteria. + * @param maxRetainDays The maximum number of days to retain job history. + * @param maxRetainCount The maximum count to retain job history. + */ + public void clearJobHistory(Integer maxRetainDays, Integer maxRetainCount) { + // Query job instance records, grouped by task ID + List jobInstanceList = jobInstanceService + .lambdaQuery() + .select(JobInstance::getTaskId, JobInstance::getCount) + .groupBy(JobInstance::getTaskId) + .list(); + + // Iterate over job instance records + for (JobInstance jobInstance : jobInstanceList) { + // Check if the count exceeds the maximum retain count + if (jobInstance.getCount() > maxRetainCount) { + // Create a query wrapper to delete job instances older than the maximum retain days + QueryWrapper deleteWrapper = new QueryWrapper<>(); + deleteWrapper + .lambda() + .eq(JobInstance::getTaskId, jobInstance.getTaskId()) + .lt(JobInstance::getCreateTime, LocalDateTime.now().minusDays(maxRetainDays)); + // Retrieve the list of job instances to be deleted + List deleteList = jobInstanceService.list(deleteWrapper); + List historyDeleteIds = deleteList.stream() + .map(JobInstance::getHistoryId) + .collect(Collectors.toList()); + jobHistoryService.removeBatchByIds(historyDeleteIds); + jobInstanceService.remove(deleteWrapper); + } + } + } + + /** + * Clears dinky history records based on the specified criteria. + * @param maxRetainDays The maximum number of days to retain dinky history. + * @param maxRetainCount The maximum count to retain dinky history. + */ + public void clearDinkyHistory(Integer maxRetainDays, Integer maxRetainCount) { + // Query history records, grouped by task ID + List historyList = historyService + .lambdaQuery() + .select(History::getTaskId, History::getCount) + .groupBy(History::getTaskId) + .list(); + + // Iterate over history records + for (History history : historyList) { + // Check if the count exceeds the maximum retain count + if (history.getCount() > maxRetainCount) { + // Create a query wrapper to delete history records older than the maximum retain days + QueryWrapper deleteWrapper = new QueryWrapper<>(); + deleteWrapper + .lambda() + .eq(History::getTaskId, history.getTaskId()) + .lt(History::getStartTime, LocalDateTime.now().minusDays(maxRetainDays)); + historyService.remove(deleteWrapper); + } + } + } + +} diff --git a/dinky-admin/src/main/java/org/dinky/job/Job2MysqlHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/Job2MysqlHandler.java similarity index 95% rename from dinky-admin/src/main/java/org/dinky/job/Job2MysqlHandler.java rename to dinky-admin/src/main/java/org/dinky/job/handler/Job2MysqlHandler.java index cb4f41ba3f..f29c704df1 100644 --- a/dinky-admin/src/main/java/org/dinky/job/Job2MysqlHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/Job2MysqlHandler.java @@ -17,11 +17,12 @@ * */ -package org.dinky.job; +package org.dinky.job.handler; import org.dinky.assertion.Asserts; import org.dinky.context.SpringContextUtils; -import org.dinky.daemon.task.DaemonFactory; +import org.dinky.daemon.pool.FlinkJobThreadPool; +import org.dinky.daemon.task.DaemonTask; import org.dinky.daemon.task.DaemonTaskConfig; import org.dinky.data.dto.ClusterInstanceDTO; import org.dinky.data.enums.JobStatus; @@ -33,6 +34,10 @@ import org.dinky.data.model.mapping.ClusterConfigurationMapping; import org.dinky.data.model.mapping.ClusterInstanceMapping; import org.dinky.gateway.enums.GatewayType; +import org.dinky.job.FlinkJobTask; +import org.dinky.job.Job; +import org.dinky.job.JobContextHolder; +import org.dinky.job.JobHandler; import org.dinky.service.ClusterConfigurationService; import org.dinky.service.ClusterInstanceService; import org.dinky.service.HistoryService; @@ -195,8 +200,8 @@ public boolean success() { : null) .build(); jobHistoryService.save(jobHistory); - - DaemonFactory.refeshOraddTask(DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId())); + DaemonTaskConfig taskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId()); + FlinkJobThreadPool.getInstance().execute(DaemonTask.build(taskConfig)); return true; } diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java index 847eaf1c93..68063bb861 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobAlertHandler.java @@ -27,7 +27,7 @@ import org.dinky.assertion.Asserts; import org.dinky.context.FreeMarkerHolder; import org.dinky.context.SpringContextUtils; -import org.dinky.daemon.pool.DefaultThreadPool; +import org.dinky.daemon.pool.FlinkJobThreadPool; import org.dinky.data.dto.AlertRuleDTO; import org.dinky.data.dto.TaskDTO; import org.dinky.data.enums.Status; @@ -102,7 +102,7 @@ public class JobAlertHandler { public static JobAlertHandler getInstance() { if (defaultJobAlertHandler == null) { - synchronized (DefaultThreadPool.class) { + synchronized (FlinkJobThreadPool.class) { if (defaultJobAlertHandler == null) { defaultJobAlertHandler = new JobAlertHandler(); } diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/SystemMetricsHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/SystemMetricsHandler.java new file mode 100644 index 0000000000..0ad409e962 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/job/handler/SystemMetricsHandler.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.job.handler; + +import org.dinky.context.MetricsContextHolder; +import org.dinky.data.enums.MetricsType; +import org.dinky.data.metrics.Cpu; +import org.dinky.data.metrics.Jvm; +import org.dinky.data.metrics.Mem; +import org.dinky.data.metrics.MetricsTotal; +import org.dinky.data.vo.MetricsVO; + +import java.time.LocalDateTime; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SystemMetricsHandler { + + public static void refresh() { + log.debug("Collecting jvm related information."); + MetricsTotal metricsTotal = MetricsTotal.instance; + LocalDateTime now = LocalDateTime.now(); + + metricsTotal.setJvm(Jvm.of()); + metricsTotal.setCpu(Cpu.of()); + metricsTotal.setMem(Mem.of()); + + MetricsVO metrics = new MetricsVO(); + metrics.setContent(metricsTotal); + metrics.setHeartTime(now); + metrics.setModel(MetricsType.LOCAL.getType()); + MetricsContextHolder.getInstances().sendAsync(metrics.getModel(), metrics); + + log.debug("Collecting jvm information ends."); + } +} diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java index b3c606f68d..19c17416b7 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java @@ -21,7 +21,7 @@ import org.dinky.assertion.Asserts; import org.dinky.context.TenantContextHolder; -import org.dinky.daemon.pool.DefaultThreadPool; +import org.dinky.daemon.pool.FlinkJobThreadPool; import org.dinky.daemon.task.DaemonTask; import org.dinky.daemon.task.DaemonTaskConfig; import org.dinky.data.dto.ClusterConfigurationDTO; @@ -188,7 +188,7 @@ public JobInfoDetail getJobInfoDetailInfo(JobInstance jobInstance) { @Override public JobInfoDetail refreshJobInfoDetail(Integer jobInstanceId, boolean isForce) { DaemonTaskConfig daemonTaskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstanceId); - DaemonTask daemonTask = DefaultThreadPool.getInstance().getByTaskConfig(daemonTaskConfig); + DaemonTask daemonTask = FlinkJobThreadPool.getInstance().getByTaskConfig(daemonTaskConfig); if (daemonTask != null) { daemonTask.dealTask(); @@ -198,7 +198,7 @@ public JobInfoDetail refreshJobInfoDetail(Integer jobInstanceId, boolean isForce daemonTask.dealTask(); JobInfoDetail jobInfoDetail = ((FlinkJobTask) daemonTask).getJobInfoDetail(); if (!JobStatus.isDone(jobInfoDetail.getInstance().getStatus())) { - DefaultThreadPool.getInstance().execute(daemonTask); + FlinkJobThreadPool.getInstance().execute(daemonTask); } return jobInfoDetail; } else { @@ -218,13 +218,13 @@ public boolean hookJobDone(String jobId, Integer taskId) { } DaemonTaskConfig config = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId()); - DaemonTask daemonTask = DefaultThreadPool.getInstance().removeByTaskConfig(config); + DaemonTask daemonTask = FlinkJobThreadPool.getInstance().removeByTaskConfig(config); daemonTask = Optional.ofNullable(daemonTask).orElse(DaemonTask.build(config)); boolean isDone = daemonTask.dealTask(); // If the task is not completed, it is re-queued if (!isDone) { - DefaultThreadPool.getInstance().execute(daemonTask); + FlinkJobThreadPool.getInstance().execute(daemonTask); } return isDone; } @@ -234,8 +234,8 @@ public void refreshJobByTaskIds(Integer... taskIds) { for (Integer taskId : taskIds) { JobInstance instance = getJobInstanceByTaskId(taskId); DaemonTaskConfig daemonTaskConfig = DaemonTaskConfig.build(FlinkJobTask.TYPE, instance.getId()); - DefaultThreadPool.getInstance().removeByTaskConfig(daemonTaskConfig); - DefaultThreadPool.getInstance().execute(DaemonTask.build(daemonTaskConfig)); + FlinkJobThreadPool.getInstance().removeByTaskConfig(daemonTaskConfig); + FlinkJobThreadPool.getInstance().execute(DaemonTask.build(daemonTaskConfig)); refreshJobInfoDetail(instance.getId(), false); } } diff --git a/dinky-admin/src/main/resources/META-INF/services/org.dinky.daemon.task.DaemonTask b/dinky-admin/src/main/resources/META-INF/services/org.dinky.daemon.task.DaemonTask index 0007f40d79..9a08fd8d74 100644 --- a/dinky-admin/src/main/resources/META-INF/services/org.dinky.daemon.task.DaemonTask +++ b/dinky-admin/src/main/resources/META-INF/services/org.dinky.daemon.task.DaemonTask @@ -1 +1,4 @@ -org.dinky.job.FlinkJobTask \ No newline at end of file +org.dinky.job.FlinkJobTask +org.dinky.job.DynamicResizeFlinkJobPoolTask +org.dinky.job.SystemMetricsTask +org.dinky.job.ClearJobHistoryTask \ No newline at end of file diff --git a/dinky-admin/src/main/resources/META-INF/services/org.dinky.job.JobHandler b/dinky-admin/src/main/resources/META-INF/services/org.dinky.job.JobHandler index cc774e9984..1eba58d2d9 100644 --- a/dinky-admin/src/main/resources/META-INF/services/org.dinky.job.JobHandler +++ b/dinky-admin/src/main/resources/META-INF/services/org.dinky.job.JobHandler @@ -1 +1 @@ -org.dinky.job.Job2MysqlHandler \ No newline at end of file +org.dinky.job.handler.Job2MysqlHandler \ No newline at end of file diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/constant/FlinkTaskConstant.java b/dinky-daemon/src/main/java/org/dinky/daemon/constant/FlinkTaskConstant.java index 67fbbc4b0c..b9000e02ef 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/constant/FlinkTaskConstant.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/constant/FlinkTaskConstant.java @@ -25,7 +25,7 @@ public interface FlinkTaskConstant { int TIME_SLEEP = 1000 * 5; /** 启动线程轮询日志时间,用于设置work等信息 */ - int MAX_POLLING_GAP = 1000; + int POLLING_GAP = 5000; /** 最小 */ int MIN_POLLING_GAP = 50; } diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/pool/DefaultThreadPool.java b/dinky-daemon/src/main/java/org/dinky/daemon/pool/FlinkJobThreadPool.java similarity index 94% rename from dinky-daemon/src/main/java/org/dinky/daemon/pool/DefaultThreadPool.java rename to dinky-daemon/src/main/java/org/dinky/daemon/pool/FlinkJobThreadPool.java index b5fc17cbd5..4a159db162 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/pool/DefaultThreadPool.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/pool/FlinkJobThreadPool.java @@ -33,7 +33,7 @@ * @operate * @return */ -public class DefaultThreadPool implements ThreadPool { +public class FlinkJobThreadPool implements ThreadPool { private static final int MAX_WORKER_NUM = 10; private static final int DEFAULT_WORKER_NUM = 5; @@ -47,15 +47,15 @@ public class DefaultThreadPool implements ThreadPool { private final TaskQueue queue = new TaskQueue<>(); - private DefaultThreadPool() { + private FlinkJobThreadPool() { addWorkers(DEFAULT_WORKER_NUM); } private static final class DefaultThreadPoolHolder { - private static final DefaultThreadPool defaultThreadPool = new DefaultThreadPool(); + private static final FlinkJobThreadPool defaultThreadPool = new FlinkJobThreadPool(); } - public static DefaultThreadPool getInstance() { + public static FlinkJobThreadPool getInstance() { return DefaultThreadPoolHolder.defaultThreadPool; } diff --git a/dinky-admin/src/main/java/org/dinky/configure/schedule/BaseSchedule.java b/dinky-daemon/src/main/java/org/dinky/daemon/pool/ScheduleThreadPool.java similarity index 76% rename from dinky-admin/src/main/java/org/dinky/configure/schedule/BaseSchedule.java rename to dinky-daemon/src/main/java/org/dinky/daemon/pool/ScheduleThreadPool.java index 784ff6064a..dfc37143cb 100644 --- a/dinky-admin/src/main/java/org/dinky/configure/schedule/BaseSchedule.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/pool/ScheduleThreadPool.java @@ -17,29 +17,32 @@ * */ -package org.dinky.configure.schedule; +package org.dinky.daemon.pool; + +import org.dinky.daemon.task.DaemonTask; import java.util.HashMap; import java.util.concurrent.ScheduledFuture; -import javax.annotation.Resource; - +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.stereotype.Component; -public abstract class BaseSchedule { +@Component +public class ScheduleThreadPool { private static final HashMap> SCHEDULE_MAP = new HashMap<>(); - @Resource + @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; - protected void addSchedule(String key, Runnable runnable, Trigger trigger) { - ScheduledFuture schedule = threadPoolTaskScheduler.schedule(runnable, trigger); - getScheduleMap().put(key, schedule); + public void addSchedule(DaemonTask task, Trigger trigger) { + ScheduledFuture schedule = threadPoolTaskScheduler.schedule(task::dealTask, trigger); + getScheduleMap().put(task.getType(), schedule); } - protected void removeSchedule(String key) { - ScheduledFuture scheduledFuture = getScheduleMap().get(key); + public void removeSchedule(DaemonTask task) { + ScheduledFuture scheduledFuture = getScheduleMap().get(task.getType()); if (scheduledFuture != null) { scheduledFuture.cancel(true); } diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonFactory.java b/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonFactory.java deleted file mode 100644 index 7cbc295ee4..0000000000 --- a/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonFactory.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.dinky.daemon.task; - -import org.dinky.daemon.constant.FlinkTaskConstant; -import org.dinky.daemon.pool.DefaultThreadPool; - -import java.util.List; - -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class DaemonFactory { - - /** - * - *

Start the daemon thread.

- *

This method accepts a List<{@link org.dinky.daemon.task.DaemonTaskConfig}> parameter to configure daemon tasks. - * Inside the method, it creates a thread and performs the following operations:

- * - *
    - *
  1. Iterate through each configuration item in configList and construct the corresponding DaemonTask.
  2. - *
  3. Submit each DaemonTask to the thread pool for execution.
  4. - *
  5. Enter an infinite loop where the following actions are performed: - *
      - *
    • Calculate the waiting time based on the task count, ensuring that the polling interval between tasks - * stays within the specified minimum and maximum intervals.
    • - *
    • Calculate the desired number of working threads, num, increasing one working thread for - * every 100 tasks.
    • - *
    • Dynamically adjust the number of working threads in the thread pool based on a comparison between - * the actual number of working threads and the desired quantity.
    • - *
    - *
  6. - *
- */ - public static void start(List configList) { - Thread thread = new Thread(() -> { - DefaultThreadPool defaultThreadPool = DefaultThreadPool.getInstance(); - for (DaemonTaskConfig config : configList) { - // Build a daemon task based on the config and Submit the task to the thread pool for execution - DaemonTask daemonTask = DaemonTask.build(config); - defaultThreadPool.execute(daemonTask); - } - - while (true) { - int taskSize = defaultThreadPool.getTaskSize(); - try { - // where (taskSize + 1) is to avoid dividing by 0 when taskSize is 0. - Thread.sleep(Math.max( - FlinkTaskConstant.MAX_POLLING_GAP / (taskSize + 1), FlinkTaskConstant.MIN_POLLING_GAP)); - } catch (InterruptedException e) { - log.error(e.getMessage(), e); - } - - // Calculate the desired number of worker threads, adding one worker for every 100 tasks - int num = taskSize / 100 + 1; - - // Dynamically adjust the number of worker threads in the thread pool - if (defaultThreadPool.getWorkCount() < num) { - defaultThreadPool.addWorkers(num - defaultThreadPool.getWorkCount()); - } else if (defaultThreadPool.getWorkCount() > num) { - defaultThreadPool.removeWorker(defaultThreadPool.getWorkCount() - num); - } - } - }); - thread.start(); - } - - /** - * @param config - * add task - * */ - public static void refeshOraddTask(DaemonTaskConfig config) { - DefaultThreadPool.getInstance().execute(DaemonTask.build(config)); - } -} diff --git a/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTaskConfig.java b/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTaskConfig.java index 811e9a2993..cfe1087e7e 100644 --- a/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTaskConfig.java +++ b/dinky-daemon/src/main/java/org/dinky/daemon/task/DaemonTaskConfig.java @@ -27,13 +27,17 @@ public class DaemonTaskConfig { private final String type; - private final Integer id; + private Integer id; public DaemonTaskConfig(String type, Integer id) { this.type = type; this.id = id; } + public DaemonTaskConfig(String type) { + this.type = type; + } + public static DaemonTaskConfig build(String type, Integer id) { return new DaemonTaskConfig(type, id); } From a78fae08af43c907e126b1cab15b1acb308e0845 Mon Sep 17 00:00:00 2001 From: ikiler Date: Sun, 19 Nov 2023 17:34:36 +0800 Subject: [PATCH 3/6] Optimize Daemon Schedule --- .../java/org/dinky/job/handler/ClearJobHistoryHandler.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/ClearJobHistoryHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/ClearJobHistoryHandler.java index 8bfa08f6b6..36a567879c 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/ClearJobHistoryHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/ClearJobHistoryHandler.java @@ -64,9 +64,8 @@ public void clearJobHistory(Integer maxRetainDays, Integer maxRetainCount) { .lt(JobInstance::getCreateTime, LocalDateTime.now().minusDays(maxRetainDays)); // Retrieve the list of job instances to be deleted List deleteList = jobInstanceService.list(deleteWrapper); - List historyDeleteIds = deleteList.stream() - .map(JobInstance::getHistoryId) - .collect(Collectors.toList()); + List historyDeleteIds = + deleteList.stream().map(JobInstance::getHistoryId).collect(Collectors.toList()); jobHistoryService.removeBatchByIds(historyDeleteIds); jobInstanceService.remove(deleteWrapper); } @@ -100,5 +99,4 @@ public void clearDinkyHistory(Integer maxRetainDays, Integer maxRetainCount) { } } } - } From e44c78bc2bd260e44b0f2938f7a4bdaf0305fcef Mon Sep 17 00:00:00 2001 From: ikiler Date: Sun, 19 Nov 2023 23:18:23 +0800 Subject: [PATCH 4/6] Move the Build Config package location, and then delete it --- .../controller/JobInstanceController.java | 22 +------------------ .../{job => utils}/BuildConfiguration.java | 4 ++-- 2 files changed, 3 insertions(+), 23 deletions(-) rename dinky-admin/src/main/java/org/dinky/{job => utils}/BuildConfiguration.java (98%) diff --git a/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java b/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java index 0670e2970d..8eb82d3157 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java @@ -24,14 +24,13 @@ import org.dinky.data.annotations.Log; import org.dinky.data.enums.BusinessType; import org.dinky.data.model.ID; -import org.dinky.data.model.devops.JobManagerConfiguration; import org.dinky.data.model.devops.TaskManagerConfiguration; import org.dinky.data.model.ext.JobInfoDetail; import org.dinky.data.model.job.JobInstance; import org.dinky.data.result.ProTableResult; import org.dinky.data.result.Result; import org.dinky.explainer.lineage.LineageResult; -import org.dinky.job.BuildConfiguration; +import org.dinky.utils.BuildConfiguration; import org.dinky.service.JobInstanceService; import java.util.HashSet; @@ -154,25 +153,6 @@ public Result getLineage(@RequestParam Integer id) { return Result.succeed(jobInstanceService.getLineage(id)); } - /** - * 获取 JobManager 的信息 - */ - @GetMapping("/getJobManagerInfo") - @ApiOperation("Get job manager info") - @ApiImplicitParam( - name = "address", - value = "JobManager address", - dataType = "String", - paramType = "query", - required = true) - public Result getJobManagerInfo(@RequestParam String address) { - JobManagerConfiguration jobManagerConfiguration = new JobManagerConfiguration(); - if (Asserts.isNotNullString(address)) { - BuildConfiguration.buildJobManagerConfiguration(jobManagerConfiguration, FlinkAPI.build(address)); - } - return Result.succeed(jobManagerConfiguration); - } - @GetMapping("/getJobManagerLog") @ApiOperation("Get job manager log") @ApiImplicitParam( diff --git a/dinky-admin/src/main/java/org/dinky/job/BuildConfiguration.java b/dinky-admin/src/main/java/org/dinky/utils/BuildConfiguration.java similarity index 98% rename from dinky-admin/src/main/java/org/dinky/job/BuildConfiguration.java rename to dinky-admin/src/main/java/org/dinky/utils/BuildConfiguration.java index 3048c95382..52d308c218 100644 --- a/dinky-admin/src/main/java/org/dinky/job/BuildConfiguration.java +++ b/dinky-admin/src/main/java/org/dinky/utils/BuildConfiguration.java @@ -17,7 +17,7 @@ * */ -package org.dinky.job; +package org.dinky.utils; import org.dinky.api.FlinkAPI; import org.dinky.assertion.Asserts; @@ -34,7 +34,7 @@ import com.fasterxml.jackson.databind.JsonNode; -// TODO 这个类不应该存在这里,?????????? +// TODO 后面优化掉 public class BuildConfiguration { public static void buildJobManagerConfiguration( From 24772dbf427cd89b161601c1f4f0726fb0d48812 Mon Sep 17 00:00:00 2001 From: ikiler Date: Sun, 19 Nov 2023 23:34:01 +0800 Subject: [PATCH 5/6] formate code --- .../controller/JobInstanceController.java | 2 +- .../org/dinky/utils/BuildConfiguration.java | 37 ------------------- 2 files changed, 1 insertion(+), 38 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java b/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java index 8eb82d3157..e749d36086 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/JobInstanceController.java @@ -30,8 +30,8 @@ import org.dinky.data.result.ProTableResult; import org.dinky.data.result.Result; import org.dinky.explainer.lineage.LineageResult; -import org.dinky.utils.BuildConfiguration; import org.dinky.service.JobInstanceService; +import org.dinky.utils.BuildConfiguration; import java.util.HashSet; import java.util.Set; diff --git a/dinky-admin/src/main/java/org/dinky/utils/BuildConfiguration.java b/dinky-admin/src/main/java/org/dinky/utils/BuildConfiguration.java index 52d308c218..ceb71b4951 100644 --- a/dinky-admin/src/main/java/org/dinky/utils/BuildConfiguration.java +++ b/dinky-admin/src/main/java/org/dinky/utils/BuildConfiguration.java @@ -24,7 +24,6 @@ import org.dinky.data.model.devops.JobManagerConfiguration; import org.dinky.data.model.devops.TaskContainerConfigInfo; import org.dinky.data.model.devops.TaskManagerConfiguration; -import org.dinky.utils.JsonUtils; import java.util.HashMap; import java.util.LinkedHashMap; @@ -37,42 +36,6 @@ // TODO 后面优化掉 public class BuildConfiguration { - public static void buildJobManagerConfiguration( - JobManagerConfiguration jobManagerConfiguration, FlinkAPI flinkAPI) { - - // 获取jobManager metrics - Map jobManagerMetricsMap = new HashMap<>(8); - List jobManagerMetricsItemsList = - JsonUtils.toList(JsonUtils.toJsonString(flinkAPI.getJobManagerMetrics()), LinkedHashMap.class); - jobManagerMetricsItemsList.forEach(mapItems -> { - String configKey = (String) mapItems.get("id"); - String configValue = (String) mapItems.get("value"); - if (Asserts.isNotNullString(configKey) && Asserts.isNotNullString(configValue)) { - jobManagerMetricsMap.put(configKey, configValue); - } - }); - // 获取jobManager配置信息 - Map jobManagerConfigMap = new HashMap<>(8); - List jobManagerConfigMapItemsList = - JsonUtils.toList(JsonUtils.toJsonString(flinkAPI.getJobManagerConfig()), LinkedHashMap.class); - jobManagerConfigMapItemsList.forEach(mapItems -> { - String configKey = (String) mapItems.get("key"); - String configValue = (String) mapItems.get("value"); - if (Asserts.isNotNullString(configKey) && Asserts.isNotNullString(configValue)) { - jobManagerConfigMap.put(configKey, configValue); - } - }); - // 获取jobManager日志 - String jobMangerLog = flinkAPI.getJobManagerLog(); - // 获取jobManager标准输出日志 - String jobManagerStdOut = flinkAPI.getJobManagerStdOut(); - - jobManagerConfiguration.setMetrics(jobManagerMetricsMap); - jobManagerConfiguration.setJobManagerConfig(jobManagerConfigMap); - jobManagerConfiguration.setJobManagerLog(jobMangerLog); - jobManagerConfiguration.setJobManagerStdout(jobManagerStdOut); - } - public static void buildTaskManagerConfiguration( Set taskManagerConfigurationList, FlinkAPI flinkAPI, From 3c53605e51c8098a043b0bd736bd7f78e7c89dad Mon Sep 17 00:00:00 2001 From: ikiler Date: Sun, 19 Nov 2023 23:49:26 +0800 Subject: [PATCH 6/6] formate code --- .../src/main/java/org/dinky/utils/BuildConfiguration.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/dinky-admin/src/main/java/org/dinky/utils/BuildConfiguration.java b/dinky-admin/src/main/java/org/dinky/utils/BuildConfiguration.java index ceb71b4951..f65aa695c2 100644 --- a/dinky-admin/src/main/java/org/dinky/utils/BuildConfiguration.java +++ b/dinky-admin/src/main/java/org/dinky/utils/BuildConfiguration.java @@ -21,14 +21,9 @@ import org.dinky.api.FlinkAPI; import org.dinky.assertion.Asserts; -import org.dinky.data.model.devops.JobManagerConfiguration; import org.dinky.data.model.devops.TaskContainerConfigInfo; import org.dinky.data.model.devops.TaskManagerConfiguration; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; import java.util.Set; import com.fasterxml.jackson.databind.JsonNode;