diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java index 1566479ee3365..31c4622c1d5ec 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java @@ -43,10 +43,6 @@ @Slf4j public abstract class AbstractInseparablePipelineJob extends AbstractPipelineJob { - protected AbstractInseparablePipelineJob(final String jobId) { - super(jobId); - } - @Override public final void execute(final ShardingContext shardingContext) { String jobId = shardingContext.getJobName(); @@ -130,7 +126,6 @@ private void updateJobItemStatus(final T jobItemContext, final PipelineJobType j } private void executeIncrementalTasks(final PipelineJobType jobType, final Collection jobItemContexts) { - log.info("Execute incremental tasks, jobId={}", getJobId()); Collection> futures = new LinkedList<>(); for (T each : jobItemContexts) { if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java index 75b09d70ad4ec..cc07cae08aeb4 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java @@ -17,12 +17,9 @@ package org.apache.shardingsphere.data.pipeline.core.job; -import lombok.AccessLevel; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService; -import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener; import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; @@ -30,7 +27,6 @@ import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener; import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader; import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap; -import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.infra.util.close.QuietlyCloser; import java.util.ArrayList; @@ -49,23 +45,12 @@ public abstract class AbstractPipelineJob implements PipelineJob { private static final long JOB_WAITING_TIMEOUT_MILLS = 2000L; - @Getter - private final String jobId; - - @Getter(AccessLevel.PROTECTED) - private final PipelineJobType jobType; - private final AtomicBoolean stopping = new AtomicBoolean(false); private final AtomicReference jobBootstrap = new AtomicReference<>(); private final Map tasksRunners = new ConcurrentHashMap<>(); - protected AbstractPipelineJob(final String jobId) { - this.jobId = jobId; - jobType = TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType()); - } - /** * Is stopping. * @@ -107,16 +92,17 @@ protected final boolean addTasksRunner(final int shardingItem, final PipelineTas @Override public final void stop() { + Optional jobId = tasksRunners.values().stream().findFirst().map(each -> each.getJobItemContext().getJobId()); try { stopping.set(true); log.info("Stop tasks runner, jobId={}", jobId); tasksRunners.values().forEach(PipelineTasksRunner::stop); - awaitJobStopped(jobId); + jobId.ifPresent(this::awaitJobStopped); if (null != jobBootstrap.get()) { jobBootstrap.get().shutdown(); } } finally { - PipelineJobProgressPersistService.remove(jobId); + jobId.ifPresent(PipelineJobProgressPersistService::remove); tasksRunners.values().stream().map(each -> each.getJobItemContext().getJobProcessContext()).forEach(QuietlyCloser::close); clean(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java index 2c83e033062fe..46044d11a2718 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java @@ -37,10 +37,6 @@ @Slf4j public abstract class AbstractSeparablePipelineJob extends AbstractPipelineJob { - protected AbstractSeparablePipelineJob(final String jobId) { - super(jobId); - } - @Override public final void execute(final ShardingContext shardingContext) { String jobId = shardingContext.getJobName(); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java index 133ac1669928b..2e7821704599d 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java @@ -87,8 +87,7 @@ public final class CDCJob extends AbstractInseparablePipelineJob(new CDCJobType().getYamlJobItemProgressSwapper()); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java index eec99b71fbcd8..4d1409798c1a2 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java @@ -212,7 +212,7 @@ private static TransmissionJobItemProgress getTransmissionJobItemProgress(final * @param sink sink */ public void start(final String jobId, final PipelineSink sink) { - CDCJob job = new CDCJob(jobId, sink); + CDCJob job = new CDCJob(sink); PipelineJobRegistry.add(jobId, job); enable(jobId); JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java index df48a9815ccaa..5eedb8770e4b8 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java @@ -35,10 +35,6 @@ */ public final class ConsistencyCheckJob extends AbstractSeparablePipelineJob { - public ConsistencyCheckJob(final String jobId) { - super(jobId); - } - @Override public ConsistencyCheckJobItemContext buildJobItemContext(final ShardingContext shardingContext) { ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter()); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java index b41748418a6fe..954e63fe64049 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java @@ -37,7 +37,7 @@ protected void onDeleted(final JobConfiguration jobConfig) { @Override protected AbstractPipelineJob buildPipelineJob(final String jobId) { - return new ConsistencyCheckJob(jobId); + return new ConsistencyCheckJob(); } @Override diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java index 9ed45a30c2896..56a4463de5a45 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java @@ -74,8 +74,7 @@ public final class MigrationJob extends AbstractSeparablePipelineJob(new MigrationJobType().getYamlJobItemProgressSwapper()); processConfigPersistService = new PipelineProcessConfigurationPersistService(); dataSourceManager = new DefaultPipelineDataSourceManager(); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java index e8c2c0b024062..eb7266b676e17 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java @@ -40,7 +40,7 @@ protected void onDeleted(final JobConfiguration jobConfig) { @Override protected AbstractPipelineJob buildPipelineJob(final String jobId) { - return new MigrationJob(jobId); + return new MigrationJob(); } @Override diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java index 3b91f6ffc0e57..a1aa5dd694b15 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java @@ -54,7 +54,7 @@ void assertBuildPipelineJobItemContext() { Map expectTableCheckPosition = Collections.singletonMap("t_order", 100); PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId, 0, YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition))); - ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob(checkJobId); + ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob(); ConsistencyCheckJobItemContext actual = consistencyCheckJob.buildJobItemContext( new ShardingContext(checkJobId, "", 1, YamlEngine.marshal(createYamlConsistencyCheckJobConfiguration(checkJobId)), 0, "")); assertThat(actual.getProgressContext().getSourceTableCheckPositions(), is(expectTableCheckPosition));