From bf41095f8d9e3bca55f6432dc751ab57d371d0e5 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Sun, 26 Nov 2023 01:00:47 +0800 Subject: [PATCH] Refactor PipelineJobManager (#29207) * Refactor PipelineJobConfigurationManager * Refactor MigrationJobAPI * Remove SPI from PipelineJobOption * Remove SPI from PipelineJobOption * Refactor PipelineJobManager * Refactor PipelineJobManager --- .../core/job/service/PipelineJobManager.java | 6 +- .../api/impl/MigrationJobAPITest.java | 80 +++++++++---------- 2 files changed, 40 insertions(+), 46 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index 27540a94a9dee..adc980be60bd2 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -58,19 +58,17 @@ public final class PipelineJobManager { * Start job. * * @param jobConfig job configuration - * @return job id */ - public Optional start(final PipelineJobConfiguration jobConfig) { + public void start(final PipelineJobConfiguration jobConfig) { String jobId = jobConfig.getJobId(); ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId)); PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)); if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobId)) { log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId); - return Optional.of(jobId); + return; } governanceFacade.getJobFacade().getJob().create(jobId, jobOption.getJobClass()); governanceFacade.getJobFacade().getConfiguration().persist(jobId, new PipelineJobConfigurationManager(jobOption).convertToJobConfigurationPOJO(jobConfig)); - return Optional.of(jobId); } /** diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index 55e7c4be872e5..06b98abb2f2be 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -19,6 +19,7 @@ import lombok.SneakyThrows; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory; @@ -32,11 +33,11 @@ import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption; @@ -129,9 +130,9 @@ static void afterClass() { @Test void assertStartAndList() { - Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); - assertTrue(jobId.isPresent()); - JobConfigurationPOJO jobConfigPOJO = getJobConfigurationPOJO(jobId.get()); + PipelineJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + jobManager.start(jobConfig); + JobConfigurationPOJO jobConfigPOJO = getJobConfigurationPOJO(jobConfig.getJobId()); assertFalse(jobConfigPOJO.isDisabled()); assertThat(jobConfigPOJO.getShardingTotalCount(), is(1)); } @@ -142,46 +143,43 @@ private JobConfigurationPOJO getJobConfigurationPOJO(final String jobId) { @Test void assertStartOrStopById() { - Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); - assertTrue(jobId.isPresent()); - assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled()); + PipelineJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + jobManager.start(jobConfig); + assertFalse(getJobConfigurationPOJO(jobConfig.getJobId()).isDisabled()); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); - jobManager.stop(jobId.get()); - assertTrue(getJobConfigurationPOJO(jobId.get()).isDisabled()); - jobManager.startDisabledJob(jobId.get()); - assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled()); + jobManager.stop(jobConfig.getJobId()); + assertTrue(getJobConfigurationPOJO(jobConfig.getJobId()).isDisabled()); + jobManager.startDisabledJob(jobConfig.getJobId()); + assertFalse(getJobConfigurationPOJO(jobConfig.getJobId()).isDisabled()); } @Test void assertRollback() throws SQLException { - Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); - assertTrue(jobId.isPresent()); - MigrationJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId.get()); - initTableData(jobConfig); + PipelineJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + jobManager.start(jobConfig); + initTableData(jobConfigManager.getJobConfiguration(jobConfig.getJobId())); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); - jobAPI.rollback(jobId.get()); - assertNull(getJobConfigurationPOJO(jobId.get())); + jobAPI.rollback(jobConfig.getJobId()); + assertNull(getJobConfigurationPOJO(jobConfig.getJobId())); } @Test void assertCommit() { - Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); - assertTrue(jobId.isPresent()); - MigrationJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId.get()); - initTableData(jobConfig); + PipelineJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + jobManager.start(jobConfig); + initTableData(jobConfigManager.getJobConfiguration(jobConfig.getJobId())); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); - jobAPI.commit(jobId.get()); - assertNull(getJobConfigurationPOJO(jobId.get())); + jobAPI.commit(jobConfig.getJobId()); + assertNull(getJobConfigurationPOJO(jobConfig.getJobId())); } @Test void assertGetProgress() { MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); - Optional jobId = jobManager.start(jobConfig); - assertTrue(jobId.isPresent()); + jobManager.start(jobConfig); Map jobProgressMap = transmissionJobManager.getJobProgress(jobConfig); assertThat(jobProgressMap.size(), is(1)); } @@ -190,10 +188,9 @@ void assertGetProgress() { void assertDataConsistencyCheck() { MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); initTableData(jobConfig); - Optional jobId = jobManager.start(jobConfig); - assertTrue(jobId.isPresent()); + jobManager.start(jobConfig); Map checkResultMap = jobOption.buildDataConsistencyChecker( - jobConfig, jobOption.buildProcessContext(jobConfig), new ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", null); + jobConfig, jobOption.buildProcessContext(jobConfig), new ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0, "H2")).check("FIXTURE", null); assertThat(checkResultMap.size(), is(1)); String checkKey = "t_order"; assertTrue(checkResultMap.get(checkKey).isMatched()); @@ -202,12 +199,11 @@ void assertDataConsistencyCheck() { @Test void assertSwitchClusterConfigurationSucceed() { - final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); - Optional jobId = jobManager.start(jobConfig); - assertTrue(jobId.isPresent()); + MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + jobManager.start(jobConfig); MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig); jobItemManager.persistProgress(jobItemContext); - jobItemManager.updateStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK); + jobItemManager.updateStatus(jobConfig.getJobId(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK); Map progress = transmissionJobManager.getJobProgress(jobConfig); for (Entry entry : progress.entrySet()) { assertThat(entry.getValue().getStatus(), is(JobStatus.EXECUTE_INVENTORY_TASK)); @@ -237,7 +233,7 @@ private void initTableData(final DataSource pipelineDataSource) throws SQLExcept @Test void assertRenewJobStatus() { - final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig); jobItemManager.persistProgress(jobItemContext); jobItemManager.updateStatus(jobConfig.getJobId(), 0, JobStatus.FINISHED); @@ -306,13 +302,13 @@ void assertShowMigrationSourceResources() { @Test void assertGetJobItemInfosAtBegin() { - Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); - assertTrue(jobId.isPresent()); + MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + jobManager.start(jobConfig); YamlTransmissionJobItemProgress yamlJobItemProgress = new YamlTransmissionJobItemProgress(); yamlJobItemProgress.setStatus(JobStatus.RUNNING.name()); yamlJobItemProgress.setSourceDatabaseType("MySQL"); - PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobId.get(), 0, YamlEngine.marshal(yamlJobItemProgress)); - List jobItemInfos = transmissionJobManager.getJobItemInfos(jobId.get()); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 0, YamlEngine.marshal(yamlJobItemProgress)); + List jobItemInfos = transmissionJobManager.getJobItemInfos(jobConfig.getJobId()); assertThat(jobItemInfos.size(), is(1)); TransmissionJobItemInfo jobItemInfo = jobItemInfos.get(0); assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.RUNNING)); @@ -321,15 +317,15 @@ void assertGetJobItemInfosAtBegin() { @Test void assertGetJobItemInfosAtIncrementTask() { - Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); - assertTrue(jobId.isPresent()); + MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + jobManager.start(jobConfig); YamlTransmissionJobItemProgress yamlJobItemProgress = new YamlTransmissionJobItemProgress(); yamlJobItemProgress.setSourceDatabaseType("MySQL"); yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name()); yamlJobItemProgress.setProcessedRecordsCount(100); yamlJobItemProgress.setInventoryRecordsCount(50); - PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobId.get(), 0, YamlEngine.marshal(yamlJobItemProgress)); - List jobItemInfos = transmissionJobManager.getJobItemInfos(jobId.get()); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 0, YamlEngine.marshal(yamlJobItemProgress)); + List jobItemInfos = transmissionJobManager.getJobItemInfos(jobConfig.getJobId()); TransmissionJobItemInfo jobItemInfo = jobItemInfos.get(0); assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.EXECUTE_INCREMENTAL_TASK)); assertThat(jobItemInfo.getInventoryFinishedPercentage(), is(100));