diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java index f437ad6d5472d..2465838abc4ca 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.exception.job; +import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress; import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException; import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState; @@ -27,7 +28,7 @@ public final class UncompletedConsistencyCheckJobExistsException extends Pipelin private static final long serialVersionUID = 2854259384634892428L; - public UncompletedConsistencyCheckJobExistsException(final String jobId) { - super(XOpenSQLState.GENERAL_ERROR, 96, String.format("Uncompleted consistency check job `%s` exists.", jobId)); + public UncompletedConsistencyCheckJobExistsException(final String jobId, final ConsistencyCheckJobItemProgress progress) { + super(XOpenSQLState.GENERAL_ERROR, 96, String.format("Uncompleted consistency check job `%s` exists, progress `%s`", jobId, progress)); } } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOption.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOption.java index c793a22bd500f..d6f2301893163 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOption.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOption.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl; import com.google.common.base.Strings; -import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress; @@ -26,6 +25,7 @@ import org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory; import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException; import org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException; @@ -65,7 +65,6 @@ /** * Consistency check job option. */ -@Slf4j public final class ConsistencyCheckJobOption implements PipelineJobOption { /** @@ -73,42 +72,45 @@ public final class ConsistencyCheckJobOption implements PipelineJobOption { * * @param param create consistency check job parameter * @return job id - * @throws UncompletedConsistencyCheckJobExistsException uncompleted consistency check job exists exception */ public String createJobAndStart(final CreateConsistencyCheckJobParameter param) { String parentJobId = param.getParentJobId(); PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)); Optional latestCheckJobId = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId); if (latestCheckJobId.isPresent()) { - PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()); - Optional progress = jobItemManager.getProgress(latestCheckJobId.get(), 0); - if (!progress.isPresent() || JobStatus.FINISHED != progress.get().getStatus()) { - log.info("check job already exists and status is not FINISHED, progress={}", progress); - throw new UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get()); - } + Optional progress = new PipelineJobItemManager(getYamlJobItemProgressSwapper()).getProgress(latestCheckJobId.get(), 0); + ShardingSpherePreconditions.checkState(progress.isPresent() && JobStatus.FINISHED == progress.get().getStatus(), + () -> new UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get(), progress.orElse(null))); } - verifyPipelineDatabaseType(param); + checkPipelineDatabaseType(param); PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(parentJobId); - String result = latestCheckJobId.map(s -> new ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)).marshal(); + String result = latestCheckJobId.map(optional -> new ConsistencyCheckJobId(contextKey, parentJobId, optional)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)).marshal(); governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, result); governanceFacade.getJobFacade().getCheck().deleteCheckJobResult(parentJobId, result); new PipelineJobManager(this).drop(result); - YamlConsistencyCheckJobConfiguration yamlConfig = new YamlConsistencyCheckJobConfiguration(); - yamlConfig.setJobId(result); - yamlConfig.setParentJobId(parentJobId); - yamlConfig.setAlgorithmTypeName(param.getAlgorithmTypeName()); - yamlConfig.setAlgorithmProps(param.getAlgorithmProps()); - yamlConfig.setSourceDatabaseType(param.getSourceDatabaseType().getType()); - new PipelineJobManager(this).start(new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig)); + new PipelineJobManager(this).start(new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(getYamlConfiguration(result, parentJobId, param))); return result; } - private void verifyPipelineDatabaseType(final CreateConsistencyCheckJobParameter param) { - Collection supportedDatabaseTypes = TableDataConsistencyCheckerFactory.newInstance(param.getAlgorithmTypeName(), param.getAlgorithmProps()).getSupportedDatabaseTypes(); + private void checkPipelineDatabaseType(final CreateConsistencyCheckJobParameter param) { + Collection supportedDatabaseTypes; + try (TableDataConsistencyChecker checker = TableDataConsistencyCheckerFactory.newInstance(param.getAlgorithmTypeName(), param.getAlgorithmProps())) { + supportedDatabaseTypes = checker.getSupportedDatabaseTypes(); + } ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getSourceDatabaseType()), () -> new UnsupportedPipelineDatabaseTypeException(param.getSourceDatabaseType())); ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getTargetDatabaseType()), () -> new UnsupportedPipelineDatabaseTypeException(param.getTargetDatabaseType())); } + private YamlConsistencyCheckJobConfiguration getYamlConfiguration(final String jobId, final String parentJobId, final CreateConsistencyCheckJobParameter param) { + YamlConsistencyCheckJobConfiguration result = new YamlConsistencyCheckJobConfiguration(); + result.setJobId(jobId); + result.setParentJobId(parentJobId); + result.setAlgorithmTypeName(param.getAlgorithmTypeName()); + result.setAlgorithmProps(param.getAlgorithmProps()); + result.setSourceDatabaseType(param.getSourceDatabaseType().getType()); + return result; + } + @Override public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() { return true; diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOptionTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOptionTest.java index 9e282b66d7e03..a4863dce9b6c1 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOptionTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOptionTest.java @@ -52,9 +52,9 @@ class ConsistencyCheckJobOptionTest { - private final ConsistencyCheckJobOption jobAPI = new ConsistencyCheckJobOption(); + private final ConsistencyCheckJobOption jobOption = new ConsistencyCheckJobOption(); - private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new YamlMigrationJobConfigurationSwapper(); @@ -67,9 +67,9 @@ public static void beforeClass() { void assertCreateJobConfig() { MigrationJobConfiguration parentJobConfig = jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration()); String parentJobId = parentJobConfig.getJobId(); - String checkJobId = jobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null, + String checkJobId = jobOption.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null, parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType())); - ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobConfigurationManager(jobAPI).getJobConfiguration(checkJobId); + ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(checkJobId); int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE; String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal(); assertThat(checkJobConfig.getJobId(), is(expectCheckJobId)); @@ -85,7 +85,7 @@ void assertDropByParentJobId() { PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()); int expectedSequence = 1; for (int i = 0; i < 3; i++) { - String checkJobId = jobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null, + String checkJobId = jobOption.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null, parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType())); ConsistencyCheckJobItemContext checkJobItemContext = new ConsistencyCheckJobItemContext( new ConsistencyCheckJobConfiguration(checkJobId, parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")), 0, JobStatus.FINISHED, null); @@ -98,12 +98,12 @@ void assertDropByParentJobId() { } expectedSequence = 2; for (int i = 0; i < 2; i++) { - jobAPI.dropByParentJobId(parentJobId); + jobOption.dropByParentJobId(parentJobId); Optional latestCheckJobId = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId); assertTrue(latestCheckJobId.isPresent()); assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()), is(expectedSequence--)); } - jobAPI.dropByParentJobId(parentJobId); + jobOption.dropByParentJobId(parentJobId); Optional latestCheckJobId = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId); assertFalse(latestCheckJobId.isPresent()); }