Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
  • Loading branch information
c0ying committed Sep 4, 2019
1 parent fd34473 commit 9aca7e8
Show file tree
Hide file tree
Showing 19 changed files with 1,070 additions and 2 deletions.
28 changes: 26 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,26 @@
# spring-cloud-dataflow-local-schedule
spring cloud dataflow schedule implemented by quartz
# Spring Cloud DataFlow Quartz定时执行

官方版本定时任务执行只支持 Kubernetes ,使用cronJob实现。如果想本地测试,或者部署模式并不是大型集群,使用Kubernetes过于臃肿庞大。

使用Quartz实现定时任务执行,同时可开启Quartz分布式模式实现小型集群的定时任务管理需求



## 快速开始

1.创建数据表

执行resources/schema 文件夹下的创表SQL。使用mysql数据库作为存储

2.启动 Spring cloud dataflow

java -jar spring-cloud-dataflow-server-local.jar

3.创建定时任务开始使用



## 备注

基于Spring cloud DataFlow 1.7.4.RELEASE进行修改

68 changes: 68 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-dataflow-server-local-autoconfig</artifactId>
<version>1.7.4.RELEASE</version>
<packaging>jar</packaging>
<name>spring-cloud-dataflow-server-local-autoconfig</name>
<description>Data Flow Local Server Autoconfig</description>

<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dataflow-parent</artifactId>
<version>1.7.4.RELEASE</version>
</parent>

<properties>
<quartz.version>2.2.3</quartz.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dataflow-server-core</artifactId>
<version>1.7.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-deployer-resource-maven</artifactId>
<version>${spring-cloud-deployer.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-deployer-resource-docker</artifactId>
<version>${spring-cloud-deployer.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-deployer-resource-support</artifactId>
<version>${spring-cloud-deployer.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-deployer-local</artifactId>
<version>${spring-cloud-deployer.version}</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>${quartz.version}</version>
<exclusions>
<exclusion>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.jingxin.cloud.dataflow.support.configuration;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.scheduler.spi.core.Scheduler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import com.jingxin.cloud.dataflow.support.quartz.configuration.QuartzConfiguration;
import com.jingxin.cloud.dataflow.support.scheduler.LocalScheduler;
import com.jingxin.cloud.dataflow.support.scheduler.repository.ScheduleInfoRepository;

@Configuration
@Import(QuartzConfiguration.class)
public class LocalDataFlowServerSchedulerConfiguration {

@Value("${spring.cloud.dataflow.server.uri:}")
private String dataflowServerUri;

@Bean
public Scheduler localScheduler() {
return new LocalScheduler();
}

@Bean
public ScheduleInfoRepository scheduleInfoRepository() {
return new ScheduleInfoRepository();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.jingxin.cloud.dataflow.support.quartz.common;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.support.ArgumentConvertingMethodInvoker;
import org.springframework.scheduling.quartz.QuartzJobBean;

import com.jingxin.cloud.dataflow.support.quartz.common.util.SpringUtil;

public class ClusterQuartzSpringJobBean extends QuartzJobBean {

private final static Logger logger = LoggerFactory.getLogger(ClusterQuartzSpringJobBean.class);

private String targetObject;
private String targetMethod;


@Override
protected void executeInternal(JobExecutionContext context)
throws JobExecutionException {
try {
logger.debug("execute [" + targetObject + "] at once>>>>>>");
Object otargetObject = SpringUtil.getBean(targetObject);

if(otargetObject != null){
ArgumentConvertingMethodInvoker methodInvoker = new ArgumentConvertingMethodInvoker();
methodInvoker.setTargetObject(otargetObject);
methodInvoker.setTargetMethod(targetMethod);
methodInvoker.setArguments(context);
methodInvoker.prepare();
methodInvoker.invoke();
}else{
logger.error("{} can not be found in Spring Context", targetObject);
throw new JobExecutionException(targetObject+" can not be found in Spring Context");
}
} catch (Exception e) {
throw new JobExecutionException(e);
}
}

public void setTargetObject(String targetObject) {
this.targetObject = targetObject;
}

public void setTargetMethod(String targetMethod) {
this.targetMethod = targetMethod;
}


@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public static class ClusterStatefulQuartzSpringJobBean extends ClusterQuartzSpringJobBean{

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.jingxin.cloud.dataflow.support.quartz.common;

import org.quartz.impl.JobDetailImpl;
import org.springframework.scheduling.quartz.JobDetailFactoryBean;

import com.jingxin.cloud.dataflow.support.quartz.common.ClusterQuartzSpringJobBean.ClusterStatefulQuartzSpringJobBean;

public class ClusterQuartzSpringJobFactoryBean extends JobDetailFactoryBean{

private boolean concurrent = false;

public void afterPropertiesSet() {
this.setJobClass(ClusterStatefulQuartzSpringJobBean.class);
super.afterPropertiesSet();
JobDetailImpl jdi = (JobDetailImpl) this.getObject();
if (concurrent)
jdi.setJobClass(ClusterQuartzSpringJobBean.class);
}

public void setConcurrent(boolean concurrent) {
this.concurrent = concurrent;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.jingxin.cloud.dataflow.support.quartz.common.util;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
* spring 工具类,用于取出在spring容器中的对象
* @author cyh
*
*/

@Component
public class SpringUtil implements ApplicationContextAware{

private static ApplicationContext applicationContext;

@Override
public void setApplicationContext(ApplicationContext applicationContext)throws BeansException {
SpringUtil.applicationContext = applicationContext;
}

public static ApplicationContext getApplicationContext() {
return applicationContext;
}

/**
* 在Spring容器取对象
* @param beanId
* @return
* @throws BeansException
*/
public static Object getBean(String beanId)throws BeansException{
return applicationContext.getBean(beanId);
}

public static<T> T getBean(Class<T> cls)throws BeansException{
return applicationContext.getBean(cls);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.jingxin.cloud.dataflow.support.quartz.configuration;

import java.text.ParseException;
import java.util.Properties;

import javax.sql.DataSource;

import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.task.support.ExecutorServiceAdapter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;

import com.jingxin.cloud.dataflow.support.quartz.common.util.SpringUtil;
import com.jingxin.cloud.dataflow.support.scheduler.QuartzExecutionJob;

@Configuration
public class QuartzConfiguration {

private static final Logger LOG = LoggerFactory.getLogger(QuartzConfiguration.class);

@Bean(destroyMethod = "destroy")
@Lazy(false)
public SchedulerFactoryBean quartzScheduler(DataSource dataSource,
ThreadPoolTaskExecutor taskExecutor, PlatformTransactionManager transactionManager)
throws SchedulerException, ParseException{
SchedulerFactoryBean factoryBean = new SchedulerFactoryBean();
factoryBean.setSchedulerName("data-flow-quartz");
factoryBean.setDataSource(dataSource);
factoryBean.setTaskExecutor(taskExecutor);
factoryBean.setAutoStartup(true);
factoryBean.setStartupDelay(60);
factoryBean.setWaitForJobsToCompleteOnShutdown(true);
factoryBean.setTransactionManager(transactionManager);
factoryBean.setQuartzProperties(quartzProperties());
return factoryBean;
}

public Properties quartzProperties() {
Properties properties = new Properties();
properties.put("org.quartz.scheduler.instanceName", "system-quartz");
properties.put("org.quartz.scheduler.instanceId", "AUTO");
properties.put("org.quartz.jobStore.misfireThreshold", "60000");
properties.put("org.quartz.jobStore.useProperties", "true");
properties.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
properties.put("org.quartz.jobStore.isClustered", "true");
properties.put("org.quartz.jobStore.clusterCheckinInterval", "20000");
return properties;
}

@Bean
public ThreadPoolTaskExecutor taskExecutor(){
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setThreadNamePrefix("my-ThreadPool-");
threadPool.setCorePoolSize(20);
threadPool.setMaxPoolSize(40);
threadPool.setKeepAliveSeconds(300);
threadPool.setQueueCapacity(5);
return threadPool;
}

@Bean
public ExecutorServiceAdapter executorServiceAdapter(){
return new ExecutorServiceAdapter(taskExecutor());
}

@Bean
public SpringUtil springUtil() {
return new SpringUtil();
}

@Bean
public QuartzExecutionJob quartzExecutionJob() {
return new QuartzExecutionJob();
}

}
Loading

0 comments on commit 9aca7e8

Please sign in to comment.