Skip to content

Commit

Permalink
Optimize Daemon Schedule, and add delete jobhistory Task (#2548)
Browse files Browse the repository at this point in the history
* Optimize the process

* Optimize Daemon Schedule

* Optimize Daemon Schedule

* Move the Build Config package location, and then delete it

* formate code

* formate code
  • Loading branch information
gaoyan1998 authored Nov 20, 2023
1 parent 2ab7026 commit 484f898
Show file tree
Hide file tree
Showing 22 changed files with 439 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

package org.dinky.configure.schedule;
package org.dinky.configure;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@
import org.dinky.data.annotations.Log;
import org.dinky.data.enums.BusinessType;
import org.dinky.data.model.ID;
import org.dinky.data.model.devops.JobManagerConfiguration;
import org.dinky.data.model.devops.TaskManagerConfiguration;
import org.dinky.data.model.ext.JobInfoDetail;
import org.dinky.data.model.job.JobInstance;
import org.dinky.data.result.ProTableResult;
import org.dinky.data.result.Result;
import org.dinky.explainer.lineage.LineageResult;
import org.dinky.job.BuildConfiguration;
import org.dinky.service.JobInstanceService;
import org.dinky.utils.BuildConfiguration;

import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -154,25 +153,6 @@ public Result<LineageResult> getLineage(@RequestParam Integer id) {
return Result.succeed(jobInstanceService.getLineage(id));
}

/**
* 获取 JobManager 的信息
*/
@GetMapping("/getJobManagerInfo")
@ApiOperation("Get job manager info")
@ApiImplicitParam(
name = "address",
value = "JobManager address",
dataType = "String",
paramType = "query",
required = true)
public Result<JobManagerConfiguration> getJobManagerInfo(@RequestParam String address) {
JobManagerConfiguration jobManagerConfiguration = new JobManagerConfiguration();
if (Asserts.isNotNullString(address)) {
BuildConfiguration.buildJobManagerConfiguration(jobManagerConfiguration, FlinkAPI.build(address));
}
return Result.succeed(jobManagerConfiguration);
}

@GetMapping("/getJobManagerLog")
@ApiOperation("Get job manager log")
@ApiImplicitParam(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.Serializable;
import java.time.LocalDateTime;

import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -107,6 +108,10 @@ public class History implements Serializable {
@ApiModelProperty(hidden = true)
private String clusterName;

@TableField(value = "count(*)", insertStrategy = FieldStrategy.NEVER, updateStrategy = FieldStrategy.NEVER)
@ApiModelProperty(value = "Group by count", dataType = "Integer")
private Long count;

@ApiModelProperty(hidden = true)
public JobInstance buildJobInstance() {
JobInstance jobInstance = new JobInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.LocalDateTime;

import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
Expand Down Expand Up @@ -146,4 +147,8 @@ public class JobInstance implements Serializable {
@TableField(fill = FieldFill.INSERT_UPDATE)
@ApiModelProperty(value = "Operator", required = true, dataType = "Integer", example = "Operator")
private Integer operator;

@TableField(value = "count(*)", insertStrategy = FieldStrategy.NEVER, updateStrategy = FieldStrategy.NEVER)
@ApiModelProperty(value = "Group by count", dataType = "Integer")
private Long count;
}
56 changes: 46 additions & 10 deletions dinky-admin/src/main/java/org/dinky/init/SystemInit.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,24 @@

import org.dinky.assertion.Asserts;
import org.dinky.context.TenantContextHolder;
import org.dinky.daemon.task.DaemonFactory;
import org.dinky.daemon.constant.FlinkTaskConstant;
import org.dinky.daemon.pool.FlinkJobThreadPool;
import org.dinky.daemon.pool.ScheduleThreadPool;
import org.dinky.daemon.task.DaemonTask;
import org.dinky.daemon.task.DaemonTaskConfig;
import org.dinky.data.exception.DinkyException;
import org.dinky.data.model.Configuration;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.data.model.Task;
import org.dinky.data.model.job.JobInstance;
import org.dinky.data.model.rbac.Tenant;
import org.dinky.data.properties.OssProperties;
import org.dinky.function.constant.PathConstant;
import org.dinky.function.pool.UdfCodePool;
import org.dinky.job.ClearJobHistoryTask;
import org.dinky.job.DynamicResizeFlinkJobPoolTask;
import org.dinky.job.FlinkJobTask;
import org.dinky.job.SystemMetricsTask;
import org.dinky.oss.OssTemplate;
import org.dinky.scheduler.client.ProjectClient;
import org.dinky.scheduler.exception.SchedulerException;
Expand All @@ -50,17 +57,18 @@
import org.dinky.utils.UDFUtils;

import org.apache.catalina.webresources.TomcatURLStreamHandlerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Profile;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.stereotype.Component;

import com.baomidou.mybatisplus.extension.activerecord.Model;
Expand Down Expand Up @@ -92,6 +100,8 @@ public class SystemInit implements ApplicationRunner {
private final TaskService taskService;
private final TenantService tenantService;
private final GitProjectService gitProjectService;
private final ScheduleThreadPool schedule;

private static Project project;

@Override
Expand All @@ -104,7 +114,7 @@ public void run(ApplicationArguments args) {
for (Tenant tenant : tenants) {
taskService.initDefaultFlinkSQLEnv(tenant.getId());
}
initTaskMonitor();
initDaemon();
initDolphinScheduler();
registerUDF();
updateGitBuildState();
Expand Down Expand Up @@ -149,7 +159,8 @@ private void initResources() {
Singleton.get(OssResourceManager.class).setOssTemplate(new OssTemplate(ossProperties));
break;
case HDFS:
final Configuration configuration = new Configuration();
final org.apache.hadoop.conf.Configuration configuration =
new org.apache.hadoop.conf.Configuration();
configuration.set(
"fs.defaultFS",
systemConfiguration
Expand All @@ -174,14 +185,39 @@ private void initResources() {
/**
* init task monitor
*/
private void initTaskMonitor() {
private void initDaemon() {
SystemConfiguration sysConfig = SystemConfiguration.getInstances();

// Init system metrics task
DaemonTask sysMetricsTask = DaemonTask.build(new DaemonTaskConfig(SystemMetricsTask.TYPE));
Configuration<Boolean> metricsSysEnable = sysConfig.getMetricsSysEnable();
Configuration<Integer> sysGatherTiming = sysConfig.getMetricsSysGatherTiming();
Consumer<Configuration<?>> metricsListener = c -> {
c.addChangeEvent(x -> {
schedule.removeSchedule(sysMetricsTask);
PeriodicTrigger trigger = new PeriodicTrigger(sysGatherTiming.getValue());
if (metricsSysEnable.getValue()) schedule.addSchedule(sysMetricsTask, trigger);
});
};
metricsListener.accept(metricsSysEnable);
metricsListener.accept(sysGatherTiming);

// Init clear job history task
DaemonTask clearJobHistoryTask = DaemonTask.build(new DaemonTaskConfig(ClearJobHistoryTask.TYPE));
schedule.addSchedule(clearJobHistoryTask, new PeriodicTrigger(1, TimeUnit.HOURS));

// Init flink job dynamic pool task
DaemonTask flinkJobPoolTask = DaemonTask.build(new DaemonTaskConfig(DynamicResizeFlinkJobPoolTask.TYPE));
schedule.addSchedule(flinkJobPoolTask, new PeriodicTrigger(FlinkTaskConstant.POLLING_GAP));

// Add flink running job task to flink job thread pool
List<JobInstance> jobInstances = jobInstanceService.listJobInstanceActive();
List<DaemonTaskConfig> configList = new ArrayList<>();
FlinkJobThreadPool flinkJobThreadPool = FlinkJobThreadPool.getInstance();
for (JobInstance jobInstance : jobInstances) {
configList.add(new DaemonTaskConfig(FlinkJobTask.TYPE, jobInstance.getId()));
DaemonTaskConfig config = new DaemonTaskConfig(FlinkJobTask.TYPE, jobInstance.getId());
DaemonTask daemonTask = DaemonTask.build(config);
flinkJobThreadPool.execute(daemonTask);
}
log.info("Number of tasks started: " + configList.size());
DaemonFactory.start(configList);
}

/**
Expand Down
79 changes: 79 additions & 0 deletions dinky-admin/src/main/java/org/dinky/job/ClearJobHistoryTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.job;

import org.dinky.context.SpringContextUtils;
import org.dinky.daemon.task.DaemonTask;
import org.dinky.daemon.task.DaemonTaskConfig;
import org.dinky.job.handler.ClearJobHistoryHandler;
import org.dinky.service.HistoryService;
import org.dinky.service.JobHistoryService;
import org.dinky.service.JobInstanceService;

import org.springframework.context.annotation.DependsOn;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@DependsOn("springContextUtils")
@Slf4j
@Data
public class ClearJobHistoryTask implements DaemonTask {

public static final String TYPE = ClearJobHistoryTask.class.toString();

private static final JobInstanceService jobInstanceService;
private static final JobHistoryService jobHistoryService;
private static final HistoryService historyService;
private static final ClearJobHistoryHandler clearJobHistoryHandler;

static {
jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class);
jobHistoryService = SpringContextUtils.getBean("jobHistoryServiceImpl", JobHistoryService.class);
historyService = SpringContextUtils.getBean("historyServiceImpl", HistoryService.class);
clearJobHistoryHandler = ClearJobHistoryHandler.builder()
.historyService(historyService)
.jobInstanceService(jobInstanceService)
.jobHistoryService(jobHistoryService)
.build();
}

@Override
public boolean dealTask() {
clearJobHistoryHandler.clearDinkyHistory(30, 20);
clearJobHistoryHandler.clearJobHistory(30, 20);
return false;
}

@Override
public DaemonTask setConfig(DaemonTaskConfig config) {
return this;
}

@Override
public DaemonTaskConfig getConfig() {
return null;
}

@Override
public String getType() {
return TYPE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.job;

import org.dinky.daemon.pool.FlinkJobThreadPool;
import org.dinky.daemon.task.DaemonTask;
import org.dinky.daemon.task.DaemonTaskConfig;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DynamicResizeFlinkJobPoolTask implements DaemonTask {

private DaemonTaskConfig config;
public static final String TYPE = DynamicResizeFlinkJobPoolTask.class.toString();
private final FlinkJobThreadPool defaultThreadPool = FlinkJobThreadPool.getInstance();

@Override
public boolean dealTask() {
int taskSize = defaultThreadPool.getTaskSize();
// Calculate the desired number of worker threads, adding one worker for every 100 tasks
int num = taskSize / 100 + 1;
// Dynamically adjust the number of worker threads in the thread pool
if (defaultThreadPool.getWorkCount() < num) {
defaultThreadPool.addWorkers(num - defaultThreadPool.getWorkCount());
} else if (defaultThreadPool.getWorkCount() > num) {
defaultThreadPool.removeWorker(defaultThreadPool.getWorkCount() - num);
}
return false;
}

@Override
public DaemonTask setConfig(DaemonTaskConfig config) {
this.config = config;
return this;
}

@Override
public DaemonTaskConfig getConfig() {
return config;
}

@Override
public String getType() {
return TYPE;
}
}
3 changes: 2 additions & 1 deletion dinky-admin/src/main/java/org/dinky/job/FlinkJobTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
public class FlinkJobTask implements DaemonTask {

private DaemonTaskConfig config;
public static final String TYPE = "jobInstance";
public static final String TYPE = FlinkJobTask.class.toString();

private static final JobInstanceService jobInstanceService;
private long preDealTime;
private long refreshCount = 0;
Expand Down
Loading

0 comments on commit 484f898

Please sign in to comment.