diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java index b193b5324f5cf..ca3a5165184ad 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java @@ -43,13 +43,11 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import java.util.stream.IntStream; /** * Pipeline data source sink. @@ -274,10 +272,7 @@ private void executeBatchDelete(final Connection connection, final List 1 != value)) { - log.warn("batchDelete failed, counts={}, sql={}, dataRecords={}", Arrays.toString(counts), deleteSQL, dataRecords); - } + preparedStatement.executeBatch(); } finally { batchDeleteStatement.set(null); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java index bb32906c9d039..4978dbd619241 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java @@ -107,6 +107,7 @@ protected JobConfigurationPOJO convertJobConfiguration(final PipelineJobConfigur String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER); result.getProps().setProperty("create_time", createTimeFormat); result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); + result.getProps().setProperty("run_count", "1"); result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName())); return result; } @@ -125,6 +126,7 @@ public void startDisabledJob(final String jobId) { jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); jobConfigPOJO.getProps().remove("stop_time"); jobConfigPOJO.getProps().remove("stop_time_millis"); + jobConfigPOJO.getProps().setProperty("run_count", String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count", "0")) + 1)); String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId); pipelineDistributedBarrier.register(barrierEnablePath, jobConfigPOJO.getShardingTotalCount()); PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO); @@ -154,7 +156,13 @@ protected void dropJob(final String jobId) { PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId); } - protected final JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) { + /** + * Get ElasticJob configuration POJO. + * + * @param jobId job id + * @return ElasticJob configuration POJO + */ + public final JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) { JobConfigurationPOJO result = PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobConfiguration(jobId); ShardingSpherePreconditions.checkNotNull(result, () -> new PipelineJobNotFoundException(jobId)); return result; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java index 755a049cf125a..08415b7ecba8c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java @@ -149,6 +149,10 @@ private final class InventoryTaskExecuteCallback implements ExecuteCallback { @Override public void onSuccess() { + if (jobItemContext.isStopping()) { + log.info("Inventory task onSuccess, stopping true, ignore"); + return; + } inventorySuccessCallback(); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java index 697fc3b978720..5f4edfc4bda48 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java @@ -190,6 +190,10 @@ private final class CDCExecuteCallback implements ExecuteCallback { @Override public void onSuccess() { + if (jobItemContext.isStopping()) { + log.info("onSuccess, stopping true, ignore"); + return; + } log.info("onSuccess, all {} tasks finished.", identifier); } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index 4a5a3fe622c8c..a55671d4a18a8 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -101,8 +101,11 @@ protected void runBlocking() { parentJobConfig, jobAPI.buildPipelineProcessContext(parentJobConfig), jobItemContext.getProgressContext()); consistencyChecker.set(checker); Map checkResultMap = checker.check(checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps()); - log.info("job {} with check algorithm '{}' data consistency checker result: {}", parentJobId, checkJobConfig.getAlgorithmTypeName(), checkResultMap); - PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId, checkJobId, checkResultMap); + log.info("job {} with check algorithm '{}' data consistency checker result: {}, stopping: {}", + parentJobId, checkJobConfig.getAlgorithmTypeName(), checkResultMap, jobItemContext.isStopping()); + if (!jobItemContext.isStopping()) { + PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId, checkJobId, checkResultMap); + } } finally { jobItemContext.getProgressContext().setCheckEndTimeMillis(System.currentTimeMillis()); } @@ -121,6 +124,10 @@ private final class CheckExecuteCallback implements ExecuteCallback { @Override public void onSuccess() { + if (jobItemContext.isStopping()) { + log.info("onSuccess, stopping true, ignore"); + return; + } log.info("onSuccess, check job id: {}, parent job id: {}", checkJobId, parentJobId); jobItemContext.setStatus(JobStatus.FINISHED); checkJobAPI.persistJobItemProgress(jobItemContext); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java index 3862d8c061bd5..ccafd074e0fe4 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java @@ -64,15 +64,15 @@ void assertCountAndDataCheck() throws SQLException { governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO)); governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(), 0, ""); Map actual = new MigrationDataConsistencyChecker(jobConfig, new MigrationProcessContext(jobConfig.getJobId(), null), - createConsistencyCheckJobItemProgressContext()).check("FIXTURE", null); + createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE", null); String checkKey = "t_order"; assertTrue(actual.get(checkKey).getCountCheckResult().isMatched()); assertThat(actual.get(checkKey).getCountCheckResult().getSourceRecordsCount(), is(actual.get(checkKey).getCountCheckResult().getTargetRecordsCount())); assertTrue(actual.get(checkKey).getContentCheckResult().isMatched()); } - private ConsistencyCheckJobItemProgressContext createConsistencyCheckJobItemProgressContext() { - return new ConsistencyCheckJobItemProgressContext("", 0, "H2"); + private ConsistencyCheckJobItemProgressContext createConsistencyCheckJobItemProgressContext(final String jobId) { + return new ConsistencyCheckJobItemProgressContext(jobId, 0, "H2"); } private MigrationJobConfiguration createJobConfiguration() throws SQLException {