From 787d1e94eb7cd6b6bedfce8a159f0a54f5ee252a Mon Sep 17 00:00:00 2001 From: Hongsheng Zhong Date: Wed, 20 Sep 2023 17:11:52 +0800 Subject: [PATCH] Add run_count in job props --- .../job/service/impl/AbstractPipelineJobAPIImpl.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java index bb32906c9d039..4978dbd619241 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java @@ -107,6 +107,7 @@ protected JobConfigurationPOJO convertJobConfiguration(final PipelineJobConfigur String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER); result.getProps().setProperty("create_time", createTimeFormat); result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); + result.getProps().setProperty("run_count", "1"); result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName())); return result; } @@ -125,6 +126,7 @@ public void startDisabledJob(final String jobId) { jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); jobConfigPOJO.getProps().remove("stop_time"); jobConfigPOJO.getProps().remove("stop_time_millis"); + jobConfigPOJO.getProps().setProperty("run_count", String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count", "0")) + 1)); String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId); pipelineDistributedBarrier.register(barrierEnablePath, jobConfigPOJO.getShardingTotalCount()); PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO); @@ -154,7 +156,13 @@ protected void dropJob(final String jobId) { PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId); } - protected final JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) { + /** + * Get ElasticJob configuration POJO. + * + * @param jobId job id + * @return ElasticJob configuration POJO + */ + public final JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) { JobConfigurationPOJO result = PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobConfiguration(jobId); ShardingSpherePreconditions.checkNotNull(result, () -> new PipelineJobNotFoundException(jobId)); return result;