diff --git a/README.md b/README.md index d3a1fd2..c163203 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,26 @@ -# spring-cloud-dataflow-local-schedule -spring cloud dataflow schedule implemented by quartz +# Spring Cloud DataFlow Quartz定时执行 + +官方版本定时任务执行只支持 Kubernetes ,使用cronJob实现。如果想本地测试,或者部署模式并不是大型集群,使用Kubernetes过于臃肿庞大。 + +使用Quartz实现定时任务执行,同时可开启Quartz分布式模式实现小型集群的定时任务管理需求 + + + +## 快速开始 + +1.创建数据表 + +执行resources/schema 文件夹下的创表SQL。使用mysql数据库作为存储 + +2.启动 Spring cloud dataflow + +java -jar spring-cloud-dataflow-server-local.jar + +3.创建定时任务开始使用 + + + +## 备注 + +基于Spring cloud DataFlow 1.7.4.RELEASE进行修改 + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..8011ef4 --- /dev/null +++ b/pom.xml @@ -0,0 +1,68 @@ + + + + 4.0.0 + spring-cloud-dataflow-server-local-autoconfig + 1.7.4.RELEASE + jar + spring-cloud-dataflow-server-local-autoconfig + Data Flow Local Server Autoconfig + + + org.springframework.cloud + spring-cloud-dataflow-parent + 1.7.4.RELEASE + + + + 2.2.3 + + + + + org.springframework.boot + spring-boot-autoconfigure + + + org.springframework.cloud + spring-cloud-dataflow-server-core + 1.7.4.RELEASE + + + org.springframework.cloud + spring-cloud-deployer-resource-maven + ${spring-cloud-deployer.version} + + + org.springframework.cloud + spring-cloud-deployer-resource-docker + ${spring-cloud-deployer.version} + + + org.springframework.cloud + spring-cloud-deployer-resource-support + ${spring-cloud-deployer.version} + + + org.springframework.cloud + spring-cloud-deployer-local + ${spring-cloud-deployer.version} + + + org.quartz-scheduler + quartz + ${quartz.version} + + + c3p0 + c3p0 + + + + + org.springframework.boot + spring-boot-starter-test + test + + + diff --git a/src/main/java/com/jingxin/cloud/dataflow/support/configuration/LocalDataFlowServerSchedulerConfiguration.java b/src/main/java/com/jingxin/cloud/dataflow/support/configuration/LocalDataFlowServerSchedulerConfiguration.java new file mode 100644 index 0000000..9a6d4b6 --- /dev/null +++ b/src/main/java/com/jingxin/cloud/dataflow/support/configuration/LocalDataFlowServerSchedulerConfiguration.java @@ -0,0 +1,29 @@ +package com.jingxin.cloud.dataflow.support.configuration; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.scheduler.spi.core.Scheduler; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import com.jingxin.cloud.dataflow.support.quartz.configuration.QuartzConfiguration; +import com.jingxin.cloud.dataflow.support.scheduler.LocalScheduler; +import com.jingxin.cloud.dataflow.support.scheduler.repository.ScheduleInfoRepository; + +@Configuration +@Import(QuartzConfiguration.class) +public class LocalDataFlowServerSchedulerConfiguration { + + @Value("${spring.cloud.dataflow.server.uri:}") + private String dataflowServerUri; + + @Bean + public Scheduler localScheduler() { + return new LocalScheduler(); + } + + @Bean + public ScheduleInfoRepository scheduleInfoRepository() { + return new ScheduleInfoRepository(); + } +} diff --git a/src/main/java/com/jingxin/cloud/dataflow/support/quartz/common/ClusterQuartzSpringJobBean.java b/src/main/java/com/jingxin/cloud/dataflow/support/quartz/common/ClusterQuartzSpringJobBean.java new file mode 100644 index 0000000..e090672 --- /dev/null +++ b/src/main/java/com/jingxin/cloud/dataflow/support/quartz/common/ClusterQuartzSpringJobBean.java @@ -0,0 +1,60 @@ +package com.jingxin.cloud.dataflow.support.quartz.common; + +import org.quartz.DisallowConcurrentExecution; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.PersistJobDataAfterExecution; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.support.ArgumentConvertingMethodInvoker; +import org.springframework.scheduling.quartz.QuartzJobBean; + +import com.jingxin.cloud.dataflow.support.quartz.common.util.SpringUtil; + +public class ClusterQuartzSpringJobBean extends QuartzJobBean { + + private final static Logger logger = LoggerFactory.getLogger(ClusterQuartzSpringJobBean.class); + + private String targetObject; + private String targetMethod; + + + @Override + protected void executeInternal(JobExecutionContext context) + throws JobExecutionException { + try { + logger.debug("execute [" + targetObject + "] at once>>>>>>"); + Object otargetObject = SpringUtil.getBean(targetObject); + + if(otargetObject != null){ + ArgumentConvertingMethodInvoker methodInvoker = new ArgumentConvertingMethodInvoker(); + methodInvoker.setTargetObject(otargetObject); + methodInvoker.setTargetMethod(targetMethod); + methodInvoker.setArguments(context); + methodInvoker.prepare(); + methodInvoker.invoke(); + }else{ + logger.error("{} can not be found in Spring Context", targetObject); + throw new JobExecutionException(targetObject+" can not be found in Spring Context"); + } + } catch (Exception e) { + throw new JobExecutionException(e); + } + } + + public void setTargetObject(String targetObject) { + this.targetObject = targetObject; + } + + public void setTargetMethod(String targetMethod) { + this.targetMethod = targetMethod; + } + + + @PersistJobDataAfterExecution + @DisallowConcurrentExecution + public static class ClusterStatefulQuartzSpringJobBean extends ClusterQuartzSpringJobBean{ + + } + +} \ No newline at end of file diff --git a/src/main/java/com/jingxin/cloud/dataflow/support/quartz/common/ClusterQuartzSpringJobFactoryBean.java b/src/main/java/com/jingxin/cloud/dataflow/support/quartz/common/ClusterQuartzSpringJobFactoryBean.java new file mode 100644 index 0000000..976114a --- /dev/null +++ b/src/main/java/com/jingxin/cloud/dataflow/support/quartz/common/ClusterQuartzSpringJobFactoryBean.java @@ -0,0 +1,23 @@ +package com.jingxin.cloud.dataflow.support.quartz.common; + +import org.quartz.impl.JobDetailImpl; +import org.springframework.scheduling.quartz.JobDetailFactoryBean; + +import com.jingxin.cloud.dataflow.support.quartz.common.ClusterQuartzSpringJobBean.ClusterStatefulQuartzSpringJobBean; + +public class ClusterQuartzSpringJobFactoryBean extends JobDetailFactoryBean{ + + private boolean concurrent = false; + + public void afterPropertiesSet() { + this.setJobClass(ClusterStatefulQuartzSpringJobBean.class); + super.afterPropertiesSet(); + JobDetailImpl jdi = (JobDetailImpl) this.getObject(); + if (concurrent) + jdi.setJobClass(ClusterQuartzSpringJobBean.class); + } + + public void setConcurrent(boolean concurrent) { + this.concurrent = concurrent; + } +} diff --git a/src/main/java/com/jingxin/cloud/dataflow/support/quartz/common/util/SpringUtil.java b/src/main/java/com/jingxin/cloud/dataflow/support/quartz/common/util/SpringUtil.java new file mode 100644 index 0000000..ad42631 --- /dev/null +++ b/src/main/java/com/jingxin/cloud/dataflow/support/quartz/common/util/SpringUtil.java @@ -0,0 +1,41 @@ +package com.jingxin.cloud.dataflow.support.quartz.common.util; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +/** + * spring 工具类,用于取出在spring容器中的对象 + * @author cyh + * + */ + +@Component +public class SpringUtil implements ApplicationContextAware{ + + private static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext)throws BeansException { + SpringUtil.applicationContext = applicationContext; + } + + public static ApplicationContext getApplicationContext() { + return applicationContext; + } + + /** + * 在Spring容器取对象 + * @param beanId + * @return + * @throws BeansException + */ + public static Object getBean(String beanId)throws BeansException{ + return applicationContext.getBean(beanId); + } + + public static T getBean(Class cls)throws BeansException{ + return applicationContext.getBean(cls); + } +} diff --git a/src/main/java/com/jingxin/cloud/dataflow/support/quartz/configuration/QuartzConfiguration.java b/src/main/java/com/jingxin/cloud/dataflow/support/quartz/configuration/QuartzConfiguration.java new file mode 100644 index 0000000..79f9eff --- /dev/null +++ b/src/main/java/com/jingxin/cloud/dataflow/support/quartz/configuration/QuartzConfiguration.java @@ -0,0 +1,82 @@ +package com.jingxin.cloud.dataflow.support.quartz.configuration; + +import java.text.ParseException; +import java.util.Properties; + +import javax.sql.DataSource; + +import org.quartz.SchedulerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Lazy; +import org.springframework.core.task.support.ExecutorServiceAdapter; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.quartz.SchedulerFactoryBean; +import org.springframework.transaction.PlatformTransactionManager; + +import com.jingxin.cloud.dataflow.support.quartz.common.util.SpringUtil; +import com.jingxin.cloud.dataflow.support.scheduler.QuartzExecutionJob; + +@Configuration +public class QuartzConfiguration { + + private static final Logger LOG = LoggerFactory.getLogger(QuartzConfiguration.class); + + @Bean(destroyMethod = "destroy") + @Lazy(false) + public SchedulerFactoryBean quartzScheduler(DataSource dataSource, + ThreadPoolTaskExecutor taskExecutor, PlatformTransactionManager transactionManager) + throws SchedulerException, ParseException{ + SchedulerFactoryBean factoryBean = new SchedulerFactoryBean(); + factoryBean.setSchedulerName("data-flow-quartz"); + factoryBean.setDataSource(dataSource); + factoryBean.setTaskExecutor(taskExecutor); + factoryBean.setAutoStartup(true); + factoryBean.setStartupDelay(60); + factoryBean.setWaitForJobsToCompleteOnShutdown(true); + factoryBean.setTransactionManager(transactionManager); + factoryBean.setQuartzProperties(quartzProperties()); + return factoryBean; + } + + public Properties quartzProperties() { + Properties properties = new Properties(); + properties.put("org.quartz.scheduler.instanceName", "system-quartz"); + properties.put("org.quartz.scheduler.instanceId", "AUTO"); + properties.put("org.quartz.jobStore.misfireThreshold", "60000"); + properties.put("org.quartz.jobStore.useProperties", "true"); + properties.put("org.quartz.jobStore.tablePrefix", "QRTZ_"); + properties.put("org.quartz.jobStore.isClustered", "true"); + properties.put("org.quartz.jobStore.clusterCheckinInterval", "20000"); + return properties; + } + + @Bean + public ThreadPoolTaskExecutor taskExecutor(){ + ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor(); + threadPool.setThreadNamePrefix("my-ThreadPool-"); + threadPool.setCorePoolSize(20); + threadPool.setMaxPoolSize(40); + threadPool.setKeepAliveSeconds(300); + threadPool.setQueueCapacity(5); + return threadPool; + } + + @Bean + public ExecutorServiceAdapter executorServiceAdapter(){ + return new ExecutorServiceAdapter(taskExecutor()); + } + + @Bean + public SpringUtil springUtil() { + return new SpringUtil(); + } + + @Bean + public QuartzExecutionJob quartzExecutionJob() { + return new QuartzExecutionJob(); + } + +} diff --git a/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/LocalScheduler.java b/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/LocalScheduler.java new file mode 100644 index 0000000..de14821 --- /dev/null +++ b/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/LocalScheduler.java @@ -0,0 +1,72 @@ +package com.jingxin.cloud.dataflow.support.scheduler; + +import java.text.ParseException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.quartz.JobKey; +import org.quartz.SchedulerException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.scheduler.spi.core.ScheduleInfo; +import org.springframework.cloud.scheduler.spi.core.ScheduleRequest; +import org.springframework.cloud.scheduler.spi.core.Scheduler; +import org.springframework.cloud.scheduler.spi.core.SchedulerPropertyKeys; +import org.springframework.scheduling.quartz.SchedulerFactoryBean; + +import com.jingxin.cloud.dataflow.support.scheduler.repository.ScheduleInfoRepository; +import com.jingxin.cloud.dataflow.support.scheduler.utils.DeploymentRequestUtil; +import com.jingxin.cloud.dataflow.support.scheduler.utils.QuartzJobGenerateUtil; +import com.jingxin.cloud.dataflow.support.scheduler.utils.QuartzJobGenerateUtil.QuartzJobInfo; + +public class LocalScheduler implements Scheduler{ + + private String target = "quartzExecutionJob"; + private String method = "execute"; + + @Override + public void schedule(ScheduleRequest scheduleRequest) { + String scheduleName = scheduleRequest.getScheduleName(); + String workJson = DeploymentRequestUtil.toJson(scheduleRequest); + String cron = scheduleRequest.getSchedulerProperties().get(SchedulerPropertyKeys.CRON_EXPRESSION); + if (StringUtils.isBlank(cron)) { + throw new IllegalArgumentException("cron can not be null"); + } + Map params = new HashMap<>(); + params.put("properties", workJson); + try { + QuartzJobInfo quartzJobInfo = + QuartzJobGenerateUtil.quartzCornJobFactoryBean(scheduleName, target, method, cron, params); + schedulerFactoryBean.getObject().scheduleJob(quartzJobInfo.getJobDetail(), quartzJobInfo.getCronTrigger()); + scheduleInfoRepository.save(scheduleName, scheduleRequest.getDefinition().getName(), cron, workJson); + } catch (ParseException | SchedulerException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public void unschedule(String scheduleName) { + try { + schedulerFactoryBean.getObject().deleteJob(new JobKey(scheduleName)); + scheduleInfoRepository.delete(scheduleName); + } catch (SchedulerException e) { + e.printStackTrace(); + } + } + + @Override + public List list(String taskDefinitionName) { + return scheduleInfoRepository.findOne(taskDefinitionName); + } + + @Override + public List list() { + return scheduleInfoRepository.findList(); + } + + @Autowired + private SchedulerFactoryBean schedulerFactoryBean; + @Autowired + private ScheduleInfoRepository scheduleInfoRepository; +} diff --git a/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/QuartzExecutionJob.java b/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/QuartzExecutionJob.java new file mode 100644 index 0000000..79e1a0e --- /dev/null +++ b/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/QuartzExecutionJob.java @@ -0,0 +1,23 @@ +package com.jingxin.cloud.dataflow.support.scheduler; + +import org.quartz.JobExecutionContext; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.dataflow.server.service.TaskService; +import org.springframework.stereotype.Component; + +import com.jingxin.cloud.dataflow.support.scheduler.utils.DeploymentRequestUtil.DeploymentRequestJsonBean; +import com.jingxin.cloud.dataflow.support.scheduler.utils.JsonUtil; + +@Component +public class QuartzExecutionJob { + + public void execute(JobExecutionContext jobExecutionContext) { + String deploymentJson = (String) jobExecutionContext.getMergedJobDataMap().get("properties"); + DeploymentRequestJsonBean jsonBean = JsonUtil.parseJson(deploymentJson, DeploymentRequestJsonBean.class); + String taskName = jsonBean.getDefinition().getName(); + taskService.executeTask(taskName, jsonBean.getDeploymentProperties(), jsonBean.getCommandlineArguments()); + } + + @Autowired + private TaskService taskService; +} diff --git a/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/repository/ScheduleInfoRepository.java b/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/repository/ScheduleInfoRepository.java new file mode 100644 index 0000000..6126d82 --- /dev/null +++ b/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/repository/ScheduleInfoRepository.java @@ -0,0 +1,72 @@ +package com.jingxin.cloud.dataflow.support.scheduler.repository; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.scheduler.spi.core.ScheduleInfo; +import org.springframework.cloud.scheduler.spi.core.SchedulerPropertyKeys; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Repository; + +@Repository +public class ScheduleInfoRepository { + + private String find_list_sql = "SELECT * FROM T_SCHEDULE_INFO"; + private String find_one_sql = "SELECT * FROM T_SCHEDULE_INFO WHERE F_TASK_NAME = ?"; + private String save_sql = "INSERT INTO T_SCHEDULE_INFO (F_SCHEDULE_NAME, F_TASK_NAME, F_TASK_CRON, F_SCHEDULE_PROPERTIES) VALUES (?,?,?,?)"; + private String delete_sql = "DELETE FROM T_SCHEDULE_INFO WHERE F_SCHEDULE_NAME = ?"; + + public List findList() { + return jdbcTemplate.query(find_list_sql, new RowMapper() { + + @Override + public ScheduleInfo mapRow(ResultSet rs, int rowNum) throws SQLException { + String scheduleName = rs.getString("f_schedule_name"); + String taskName = rs.getString("f_task_name"); + String cron = rs.getString("f_task_cron"); + ScheduleInfo scheduleInfo = new ScheduleInfo(); + scheduleInfo.setScheduleName(scheduleName); + scheduleInfo.setTaskDefinitionName(taskName); + Map properties = new HashMap<>(); + properties.put(SchedulerPropertyKeys.CRON_EXPRESSION, cron); + scheduleInfo.setScheduleProperties(properties); + return scheduleInfo; + } + + }); + } + + public List findOne(String taskDefinitionName) { + return jdbcTemplate.query(find_one_sql, new RowMapper() { + @Override + public ScheduleInfo mapRow(ResultSet rs, int rowNum) throws SQLException { + String scheduleName = rs.getString("f_schedule_name"); + String taskName = rs.getString("f_task_name"); + String cron = rs.getString("f_task_cron"); + ScheduleInfo scheduleInfo = new ScheduleInfo(); + scheduleInfo.setScheduleName(scheduleName); + scheduleInfo.setTaskDefinitionName(taskName); + Map properties = new HashMap<>(); + properties.put(SchedulerPropertyKeys.CRON_EXPRESSION, cron); + scheduleInfo.setScheduleProperties(properties); + return scheduleInfo; + } + }, taskDefinitionName); + } + + public void save(String scheduleName, String taskName, String cron, String properties) { + jdbcTemplate.update(save_sql, scheduleName, taskName, cron, properties); + } + + public void delete(String scheduleName) { + jdbcTemplate.update(delete_sql, scheduleName); + } + + @Autowired + private JdbcTemplate jdbcTemplate; +} diff --git a/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/utils/DeploymentRequestUtil.java b/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/utils/DeploymentRequestUtil.java new file mode 100644 index 0000000..78aa980 --- /dev/null +++ b/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/utils/DeploymentRequestUtil.java @@ -0,0 +1,108 @@ +package com.jingxin.cloud.dataflow.support.scheduler.utils; + +import java.util.List; +import java.util.Map; + +import org.springframework.cloud.deployer.spi.core.AppDefinition; +import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest; + +public class DeploymentRequestUtil { + + public static String toJson(AppDeploymentRequest request) { + return JsonUtil.toJson( + new DeploymentRequestJsonBean( + request.getDefinition(), "", + request.getDeploymentProperties(), request.getCommandlineArguments())); + } + + public static class DeploymentRequestJsonBean{ + + private AppJsonDefinition definition; + + private String resource; + + private Map deploymentProperties; + + private List commandlineArguments; + + public DeploymentRequestJsonBean() { + } + + public DeploymentRequestJsonBean(AppDefinition definition, String resource, + Map deploymentProperties, List commandlineArguments) { + super(); + this.definition = new AppJsonDefinition(definition); + this.resource = resource; + this.deploymentProperties = deploymentProperties; + this.commandlineArguments = commandlineArguments; + } + + public AppJsonDefinition getDefinition() { + return definition; + } + + public String getResource() { + return resource; + } + + public Map getDeploymentProperties() { + return deploymentProperties; + } + + public List getCommandlineArguments() { + return commandlineArguments; + } + + public void setDefinition(AppJsonDefinition definition) { + this.definition = definition; + } + + public void setResource(String resource) { + this.resource = resource; + } + + public void setDeploymentProperties(Map deploymentProperties) { + this.deploymentProperties = deploymentProperties; + } + + public void setCommandlineArguments(List commandlineArguments) { + this.commandlineArguments = commandlineArguments; + } + + public static class AppJsonDefinition{ + /** + * Name of the app. + */ + private String name; + + /** + * Properties for this app. + */ + private Map properties; + + public AppJsonDefinition() { + } + + public AppJsonDefinition(AppDefinition appDefinition){ + this.name = appDefinition.getName(); + this.properties = appDefinition.getProperties(); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + } + } +} diff --git a/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/utils/JsonUtil.java b/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/utils/JsonUtil.java new file mode 100644 index 0000000..261dbad --- /dev/null +++ b/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/utils/JsonUtil.java @@ -0,0 +1,86 @@ +package com.jingxin.cloud.dataflow.support.scheduler.utils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Json解析工具 + * @author cyh + * + */ +public class JsonUtil { + + public static ObjectMapper objectMapper = new ObjectMapper(); + + + /** + * 解析Json到指定的Bean中 + * @param jsonStr + * @param clz + * @return + */ + public static T parseJson(String jsonStr, Class clz){ + try { + T bean = objectMapper.readValue(jsonStr, clz); + return bean; + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + + public static List parseListJson(String jsonStr, Class clz){ + try { + JavaType javaType = + objectMapper.getTypeFactory() + .constructParametrizedType(ArrayList.class, List.class, clz); + List bean = objectMapper.readValue(jsonStr, javaType); + return bean; + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + + public static Map parse2Map(String jsonStr){ + try { + return objectMapper.readValue(jsonStr, Map.class); + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + + public static String toJson(Object object){ + try { + return objectMapper.writeValueAsString(object); + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + + public static Object parseJson(String json, Class wrap, Class... inner){ + JavaType javaType = objectMapper.getTypeFactory().constructParametrizedType(wrap, wrap, inner); + try { + return objectMapper.readValue(json, javaType); + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + + public static JsonNode parse2Tree(String jsonStr){ + try { + return objectMapper.readTree(jsonStr); + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } +} diff --git a/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/utils/QuartzJobGenerateUtil.java b/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/utils/QuartzJobGenerateUtil.java new file mode 100644 index 0000000..445dba1 --- /dev/null +++ b/src/main/java/com/jingxin/cloud/dataflow/support/scheduler/utils/QuartzJobGenerateUtil.java @@ -0,0 +1,58 @@ +package com.jingxin.cloud.dataflow.support.scheduler.utils; + +import java.text.ParseException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.quartz.CronTrigger; +import org.quartz.JobDetail; +import org.springframework.scheduling.quartz.CronTriggerFactoryBean; + +import com.jingxin.cloud.dataflow.support.quartz.common.ClusterQuartzSpringJobFactoryBean; + +public class QuartzJobGenerateUtil { + + public static QuartzJobInfo quartzCornJobFactoryBean(String name, String target, String method, String cron, + Map params) throws ParseException { + ClusterQuartzSpringJobFactoryBean jobFactory = new ClusterQuartzSpringJobFactoryBean(); + jobFactory.setDurability(true); + jobFactory.setRequestsRecovery(true); + jobFactory.setConcurrent(false); + jobFactory.setName(name); + Map jobDataMap = new HashMap(); + jobDataMap.put("targetObject", target); + jobDataMap.put("targetMethod", method); + if (params != null) { + for (Entry param : params.entrySet()) { + jobDataMap.put(param.getKey(), param.getValue()); + } + } + jobFactory.setJobDataAsMap(jobDataMap); + jobFactory.afterPropertiesSet(); + CronTriggerFactoryBean cronTriggerFactory = new CronTriggerFactoryBean(); + cronTriggerFactory.setName(name.concat("_trigger")); + cronTriggerFactory.setCronExpression(cron); + cronTriggerFactory.afterPropertiesSet(); + + return new QuartzJobInfo(jobFactory.getObject(), cronTriggerFactory.getObject()); + } + + public static class QuartzJobInfo { + + private JobDetail jobDetail; + private CronTrigger cronTrigger; + + public QuartzJobInfo(JobDetail jobDetail, CronTrigger cronTrigger) { + super(); + this.jobDetail = jobDetail; + this.cronTrigger = cronTrigger; + } + public JobDetail getJobDetail() { + return jobDetail; + } + public CronTrigger getCronTrigger() { + return cronTrigger; + } + } +} diff --git a/src/main/java/org/springframework/cloud/dataflow/autoconfigure/local/LocalDataFlowServerAutoConfiguration.java b/src/main/java/org/springframework/cloud/dataflow/autoconfigure/local/LocalDataFlowServerAutoConfiguration.java new file mode 100644 index 0000000..c5b5c4b --- /dev/null +++ b/src/main/java/org/springframework/cloud/dataflow/autoconfigure/local/LocalDataFlowServerAutoConfiguration.java @@ -0,0 +1,79 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.dataflow.autoconfigure.local; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.boot.autoconfigure.AutoConfigureBefore; +import org.springframework.cloud.dataflow.server.config.DataFlowControllerAutoConfiguration; +import org.springframework.cloud.deployer.resource.docker.DockerResourceLoader; +import org.springframework.cloud.deployer.resource.maven.MavenProperties; +import org.springframework.cloud.deployer.resource.maven.MavenResourceLoader; +import org.springframework.cloud.deployer.resource.support.DelegatingResourceLoader; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.core.io.ResourceLoader; + +import com.jingxin.cloud.dataflow.support.configuration.LocalDataFlowServerSchedulerConfiguration; + +/** + * Auto-configuration for local dataflow server. + * + * @author Janne Valkealahti + */ +@Configuration +@Import(LocalDataFlowServerSchedulerConfiguration.class) +@AutoConfigureBefore(DataFlowControllerAutoConfiguration.class) +public class LocalDataFlowServerAutoConfiguration { + + @Bean + public DelegatingResourceLoader delegatingResourceLoader(MavenProperties mavenProperties) { + DockerResourceLoader dockerLoader = new DockerResourceLoader(); + MavenResourceLoader mavenResourceLoader = new MavenResourceLoader(mavenProperties); + Map loaders = new HashMap<>(); + loaders.put("docker", dockerLoader); + loaders.put("maven", mavenResourceLoader); + return new DelegatingResourceLoader(loaders); + } + +// @Bean +// public Scheduler localScheduler() { +// return new Scheduler() { +// @Override +// public void schedule(ScheduleRequest scheduleRequest) { +// throw new UnsupportedOperationException("Interface is not implemented for schedule method."); +// } +// +// @Override +// public void unschedule(String scheduleName) { +// throw new UnsupportedOperationException("Interface is not implemented for unschedule method."); +// } +// +// @Override +// public List list(String taskDefinitionName) { +// throw new UnsupportedOperationException("Interface is not implemented for list method."); +// } +// +// @Override +// public List list() { +// throw new UnsupportedOperationException("Interface is not implemented for list method."); +// } +// }; +// } +} diff --git a/src/main/java/org/springframework/cloud/dataflow/autoconfigure/local/package-info.java b/src/main/java/org/springframework/cloud/dataflow/autoconfigure/local/package-info.java new file mode 100644 index 0000000..0ff7652 --- /dev/null +++ b/src/main/java/org/springframework/cloud/dataflow/autoconfigure/local/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Root package for the local version of Spring Cloud Data Flow Server auto configuration. + */ +package org.springframework.cloud.dataflow.autoconfigure.local; diff --git a/src/main/resources/META-INF/spring.factories b/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..e445c6c --- /dev/null +++ b/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + org.springframework.cloud.dataflow.autoconfigure.local.LocalDataFlowServerAutoConfiguration diff --git a/src/main/resources/schema/t_schedule_info.sql b/src/main/resources/schema/t_schedule_info.sql new file mode 100644 index 0000000..81c56c1 --- /dev/null +++ b/src/main/resources/schema/t_schedule_info.sql @@ -0,0 +1,30 @@ +/* +Navicat MySQL Data Transfer + +Source Server : localhost_3306 +Source Server Version : 50721 +Source Host : localhost:3306 +Source Database : dataflow_test + +Target Server Type : MYSQL +Target Server Version : 50721 +File Encoding : 65001 + +Date: 2019-03-11 16:14:18 +*/ + +SET FOREIGN_KEY_CHECKS=0; + +-- ---------------------------- +-- Table structure for t_schedule_info +-- ---------------------------- +DROP TABLE IF EXISTS `t_schedule_info`; +CREATE TABLE `t_schedule_info` ( + `f_schedule_name` varchar(255) NOT NULL, + `f_task_name` varchar(255) NOT NULL, + `f_task_cron` varchar(255) NOT NULL, + `f_schedule_properties` text NOT NULL, + `f_create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `f_update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`f_schedule_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; diff --git a/src/main/resources/schema/tables_mysql_innodb.sql b/src/main/resources/schema/tables_mysql_innodb.sql new file mode 100644 index 0000000..c0696d9 --- /dev/null +++ b/src/main/resources/schema/tables_mysql_innodb.sql @@ -0,0 +1,179 @@ +# +# In your Quartz properties file, you'll need to set +# org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate +# +# +# By: Ron Cordell - roncordell +# I didn't see this anywhere, so I thought I'd post it here. This is the script from Quartz to create the tables in a MySQL database, modified to use INNODB instead of MYISAM. + +DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS; +DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS; +DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE; +DROP TABLE IF EXISTS QRTZ_LOCKS; +DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS; +DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS; +DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS; +DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS; +DROP TABLE IF EXISTS QRTZ_TRIGGERS; +DROP TABLE IF EXISTS QRTZ_JOB_DETAILS; +DROP TABLE IF EXISTS QRTZ_CALENDARS; + +CREATE TABLE QRTZ_JOB_DETAILS( +SCHED_NAME VARCHAR(120) NOT NULL, +JOB_NAME VARCHAR(200) NOT NULL, +JOB_GROUP VARCHAR(200) NOT NULL, +DESCRIPTION VARCHAR(250) NULL, +JOB_CLASS_NAME VARCHAR(250) NOT NULL, +IS_DURABLE VARCHAR(1) NOT NULL, +IS_NONCONCURRENT VARCHAR(1) NOT NULL, +IS_UPDATE_DATA VARCHAR(1) NOT NULL, +REQUESTS_RECOVERY VARCHAR(1) NOT NULL, +JOB_DATA BLOB NULL, +PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)) +ENGINE=InnoDB; + +CREATE TABLE QRTZ_TRIGGERS ( +SCHED_NAME VARCHAR(120) NOT NULL, +TRIGGER_NAME VARCHAR(200) NOT NULL, +TRIGGER_GROUP VARCHAR(200) NOT NULL, +JOB_NAME VARCHAR(200) NOT NULL, +JOB_GROUP VARCHAR(200) NOT NULL, +DESCRIPTION VARCHAR(250) NULL, +NEXT_FIRE_TIME BIGINT(13) NULL, +PREV_FIRE_TIME BIGINT(13) NULL, +PRIORITY INTEGER NULL, +TRIGGER_STATE VARCHAR(16) NOT NULL, +TRIGGER_TYPE VARCHAR(8) NOT NULL, +START_TIME BIGINT(13) NOT NULL, +END_TIME BIGINT(13) NULL, +CALENDAR_NAME VARCHAR(200) NULL, +MISFIRE_INSTR SMALLINT(2) NULL, +JOB_DATA BLOB NULL, +PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), +FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP) +REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)) +ENGINE=InnoDB; + +CREATE TABLE QRTZ_SIMPLE_TRIGGERS ( +SCHED_NAME VARCHAR(120) NOT NULL, +TRIGGER_NAME VARCHAR(200) NOT NULL, +TRIGGER_GROUP VARCHAR(200) NOT NULL, +REPEAT_COUNT BIGINT(7) NOT NULL, +REPEAT_INTERVAL BIGINT(12) NOT NULL, +TIMES_TRIGGERED BIGINT(10) NOT NULL, +PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), +FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) +REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) +ENGINE=InnoDB; + +CREATE TABLE QRTZ_CRON_TRIGGERS ( +SCHED_NAME VARCHAR(120) NOT NULL, +TRIGGER_NAME VARCHAR(200) NOT NULL, +TRIGGER_GROUP VARCHAR(200) NOT NULL, +CRON_EXPRESSION VARCHAR(120) NOT NULL, +TIME_ZONE_ID VARCHAR(80), +PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), +FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) +REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) +ENGINE=InnoDB; + +CREATE TABLE QRTZ_SIMPROP_TRIGGERS + ( + SCHED_NAME VARCHAR(120) NOT NULL, + TRIGGER_NAME VARCHAR(200) NOT NULL, + TRIGGER_GROUP VARCHAR(200) NOT NULL, + STR_PROP_1 VARCHAR(512) NULL, + STR_PROP_2 VARCHAR(512) NULL, + STR_PROP_3 VARCHAR(512) NULL, + INT_PROP_1 INT NULL, + INT_PROP_2 INT NULL, + LONG_PROP_1 BIGINT NULL, + LONG_PROP_2 BIGINT NULL, + DEC_PROP_1 NUMERIC(13,4) NULL, + DEC_PROP_2 NUMERIC(13,4) NULL, + BOOL_PROP_1 VARCHAR(1) NULL, + BOOL_PROP_2 VARCHAR(1) NULL, + PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), + FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) + REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) +ENGINE=InnoDB; + +CREATE TABLE QRTZ_BLOB_TRIGGERS ( +SCHED_NAME VARCHAR(120) NOT NULL, +TRIGGER_NAME VARCHAR(200) NOT NULL, +TRIGGER_GROUP VARCHAR(200) NOT NULL, +BLOB_DATA BLOB NULL, +PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), +INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP), +FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) +REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) +ENGINE=InnoDB; + +CREATE TABLE QRTZ_CALENDARS ( +SCHED_NAME VARCHAR(120) NOT NULL, +CALENDAR_NAME VARCHAR(200) NOT NULL, +CALENDAR BLOB NOT NULL, +PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)) +ENGINE=InnoDB; + +CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS ( +SCHED_NAME VARCHAR(120) NOT NULL, +TRIGGER_GROUP VARCHAR(200) NOT NULL, +PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)) +ENGINE=InnoDB; + +CREATE TABLE QRTZ_FIRED_TRIGGERS ( +SCHED_NAME VARCHAR(120) NOT NULL, +ENTRY_ID VARCHAR(95) NOT NULL, +TRIGGER_NAME VARCHAR(200) NOT NULL, +TRIGGER_GROUP VARCHAR(200) NOT NULL, +INSTANCE_NAME VARCHAR(200) NOT NULL, +FIRED_TIME BIGINT(13) NOT NULL, +SCHED_TIME BIGINT(13) NOT NULL, +PRIORITY INTEGER NOT NULL, +STATE VARCHAR(16) NOT NULL, +JOB_NAME VARCHAR(200) NULL, +JOB_GROUP VARCHAR(200) NULL, +IS_NONCONCURRENT VARCHAR(1) NULL, +REQUESTS_RECOVERY VARCHAR(1) NULL, +PRIMARY KEY (SCHED_NAME,ENTRY_ID)) +ENGINE=InnoDB; + +CREATE TABLE QRTZ_SCHEDULER_STATE ( +SCHED_NAME VARCHAR(120) NOT NULL, +INSTANCE_NAME VARCHAR(200) NOT NULL, +LAST_CHECKIN_TIME BIGINT(13) NOT NULL, +CHECKIN_INTERVAL BIGINT(13) NOT NULL, +PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)) +ENGINE=InnoDB; + +CREATE TABLE QRTZ_LOCKS ( +SCHED_NAME VARCHAR(120) NOT NULL, +LOCK_NAME VARCHAR(40) NOT NULL, +PRIMARY KEY (SCHED_NAME,LOCK_NAME)) +ENGINE=InnoDB; + +CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY); +CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP); + +CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP); +CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP); +CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME); +CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP); +CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE); +CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE); +CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE); +CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME); +CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME); +CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME); +CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE); +CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE); + +CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME); +CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY); +CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP); +CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP); +CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP); +CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP); + +commit; diff --git a/src/test/java/com/jingxin/cloud/dataflow/support/test/TmpTest.java b/src/test/java/com/jingxin/cloud/dataflow/support/test/TmpTest.java new file mode 100644 index 0000000..7527f6c --- /dev/null +++ b/src/test/java/com/jingxin/cloud/dataflow/support/test/TmpTest.java @@ -0,0 +1,13 @@ +package com.jingxin.cloud.dataflow.support.test; + +import com.jingxin.cloud.dataflow.support.scheduler.utils.DeploymentRequestUtil.DeploymentRequestJsonBean; +import com.jingxin.cloud.dataflow.support.scheduler.utils.JsonUtil; + +public class TmpTest { + + public static void main(String[] args) { + String json = "{\"definition\":{\"name\":\"demoTask\",\"properties\":{\"spring.datasource.username\":\"root\",\"spring.cloud.task.name\":\"demoTask\",\"spring.datasource.url\":\"jdbc:mysql://localhost:3306/dataflow_test\",\"spring.datasource.driverClassName\":\"org.mariadb.jdbc.Driver\",\"spring.datasource.password\":\"mysql123\"}},\"resource\":\"\",\"deploymentProperties\":{},\"commandlineArguments\":[]}"; + DeploymentRequestJsonBean bean = JsonUtil.parseJson(json, DeploymentRequestJsonBean.class); + System.out.println(bean.getDefinition().getName()); + } +}