From 1928e739ba2328f16356c74c23e1196002675bae Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Thu, 16 Nov 2023 23:44:30 +0800 Subject: [PATCH] Refactor PipelineJobAPI.getJobConfiguration() (#29059) --- .../data/pipeline/core/job/service/PipelineJobAPI.java | 8 -------- .../distsql/handler/update/CheckMigrationJobUpdater.java | 3 ++- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 5 ----- .../data/pipeline/cdc/handler/CDCBackendHandler.java | 5 +++-- .../api/impl/ConsistencyCheckJobAPI.java | 7 +------ .../task/ConsistencyCheckTasksRunner.java | 2 +- .../scenario/migration/api/impl/MigrationJobAPI.java | 9 ++------- .../api/impl/ConsistencyCheckJobAPITest.java | 2 +- .../scenario/migration/api/impl/MigrationJobAPITest.java | 6 +++--- 9 files changed, 13 insertions(+), 34 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java index b823bba76bf0b..4ac59eefcf944 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java @@ -34,14 +34,6 @@ @SingletonSPI public interface PipelineJobAPI extends TypedSPI { - /** - * Get job configuration. - * - * @param jobId job id - * @return job configuration - */ - PipelineJobConfiguration getJobConfiguration(String jobId); - /** * Get job configuration. * diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java index a875439f4da27..547d6255f4b9f 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.migration.distsql.handler.update; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter; @@ -46,7 +47,7 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme String algorithmTypeName = null == typeStrategy ? null : typeStrategy.getName(); Properties algorithmProps = null == typeStrategy ? null : typeStrategy.getProps(); String jobId = sqlStatement.getJobId(); - MigrationJobConfiguration jobConfig = migrationJobAPI.getJobConfiguration(jobId); + MigrationJobConfiguration jobConfig = migrationJobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); verifyInventoryFinished(jobConfig); checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType())); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 1993cde6e7b89..4d6d31a3cfbee 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -278,11 +278,6 @@ public CDCProcessContext buildPipelineProcessContext(final PipelineJobConfigurat return new CDCProcessContext(pipelineJobConfig.getJobId(), showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId()))); } - @Override - public CDCJobConfiguration getJobConfiguration(final String jobId) { - return getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); - } - @Override public CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) { return new YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java index ad7f7b7741001..2ef78db661ef0 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java @@ -47,6 +47,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType; @@ -78,7 +79,7 @@ public final class CDCBackendHandler { * @return database */ public String getDatabaseNameByJobId(final String jobId) { - return jobAPI.getJobConfiguration(jobId).getDatabaseName(); + return jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)).getDatabaseName(); } /** @@ -126,7 +127,7 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod * @param connectionContext connection context */ public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) { - CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(jobId); + CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId)); if (PipelineJobCenter.isJobExisting(jobId)) { PipelineJobCenter.stop(jobId); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index ac99a1917c8b8..88e39ee70d309 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -312,7 +312,7 @@ private void fillInJobItemInfoWithTimes(final ConsistencyCheckJobItemInfo result } private void fillInJobItemInfoWithCheckAlgorithm(final ConsistencyCheckJobItemInfo result, final String checkJobId) { - ConsistencyCheckJobConfiguration jobConfig = getJobConfiguration(checkJobId); + ConsistencyCheckJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId)); result.setAlgorithmType(jobConfig.getAlgorithmTypeName()); if (null != jobConfig.getAlgorithmProps()) { result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue())).collect(Collectors.joining(","))); @@ -329,11 +329,6 @@ private void fillInJobItemInfoWithCheckResult(final ConsistencyCheckJobItemInfo } } - @Override - public ConsistencyCheckJobConfiguration getJobConfiguration(final String jobId) { - return getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); - } - @Override public ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) { return new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index 3ae18d5812900..9c8b1f0f9491c 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -98,7 +98,7 @@ protected void runBlocking() { checkJobAPI.persistJobItemProgress(jobItemContext); JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId); InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType()); - PipelineJobConfiguration parentJobConfig = jobAPI.getJobConfiguration(parentJobId); + PipelineJobConfiguration parentJobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId)); try { PipelineDataConsistencyChecker checker = jobAPI.buildPipelineDataConsistencyChecker( parentJobConfig, jobAPI.buildPipelineProcessContext(parentJobConfig), jobItemContext.getProgressContext()); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 1fe00789a8286..52527c619011f 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -226,11 +226,6 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina } } - @Override - public MigrationJobConfiguration getJobConfiguration(final String jobId) { - return getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); - } - @Override public MigrationJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) { return new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); @@ -328,7 +323,7 @@ private void dropCheckJobs(final String jobId) { } private void cleanTempTableOnRollback(final String jobId) throws SQLException { - MigrationJobConfiguration jobConfig = getJobConfiguration(jobId); + MigrationJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType()); TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); try ( @@ -352,7 +347,7 @@ public void commit(final String jobId) { PipelineJobManager jobManager = new PipelineJobManager(this); jobManager.stop(jobId); dropCheckJobs(jobId); - MigrationJobConfiguration jobConfig = getJobConfiguration(jobId); + MigrationJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName()); jobManager.drop(jobId); log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java index 6e90b0b222498..169f76ab6d52d 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java @@ -64,7 +64,7 @@ void assertCreateJobConfig() { String parentJobId = parentJobConfig.getJobId(); String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null, parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType())); - ConsistencyCheckJobConfiguration checkJobConfig = checkJobAPI.getJobConfiguration(checkJobId); + ConsistencyCheckJobConfiguration checkJobConfig = checkJobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId)); int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE; String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal(); assertThat(checkJobConfig.getJobId(), is(expectCheckJobId)); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index e206c20287567..bbd98ed6fbd4d 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -142,7 +142,7 @@ void assertStartOrStopById() { void assertRollback() throws SQLException { Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); assertTrue(jobId.isPresent()); - MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get()); + MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get())); initTableData(jobConfig); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); @@ -154,7 +154,7 @@ void assertRollback() throws SQLException { void assertCommit() { Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); assertTrue(jobId.isPresent()); - MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(jobId.get()); + MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get())); initTableData(jobConfig); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); @@ -277,7 +277,7 @@ void assertCreateJobConfig() throws SQLException { initIntPrimaryEnvironment(); SourceTargetEntry sourceTargetEntry = new SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order"); String jobId = jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), "logic_db")); - MigrationJobConfiguration actual = jobAPI.getJobConfiguration(jobId); + MigrationJobConfiguration actual = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); assertThat(actual.getTargetDatabaseName(), is("logic_db")); List dataNodeLines = actual.getJobShardingDataNodes(); assertThat(dataNodeLines.size(), is(1));