From dfa35b2b100a64757739daf73c1a55bfd7c6777b Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 19 Nov 2023 12:38:22 +0800 Subject: [PATCH] Add generic type of PipelineJobManager.getJobConfiguration() --- .../data/pipeline/core/job/service/PipelineJobManager.java | 6 ++++-- .../distsql/handler/update/CheckMigrationJobUpdater.java | 2 +- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 4 ++-- .../data/pipeline/cdc/handler/CDCBackendHandler.java | 4 ++-- .../consistencycheck/api/impl/ConsistencyCheckJobAPI.java | 3 +-- .../scenario/migration/api/impl/MigrationJobAPI.java | 6 +++--- .../api/impl/ConsistencyCheckJobAPITest.java | 3 +-- .../scenario/migration/api/impl/MigrationJobAPITest.java | 6 +++--- 8 files changed, 17 insertions(+), 17 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index eccbdd80227b8..0e8d2a2d6826f 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -58,10 +58,12 @@ public final class PipelineJobManager { * Get job configuration. * * @param jobConfigPOJO job configuration POJO + * @param type of pipeline job configuration * @return pipeline job configuration */ - public PipelineJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) { - return jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); + @SuppressWarnings("unchecked") + public T getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) { + return (T) jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); } /** diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java index 86d9150871211..f59ed72f6a6f1 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java @@ -49,7 +49,7 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme String algorithmTypeName = null == typeStrategy ? null : typeStrategy.getName(); Properties algorithmProps = null == typeStrategy ? null : typeStrategy.getProps(); String jobId = sqlStatement.getJobId(); - MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new PipelineJobManager(migrationJobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + MigrationJobConfiguration jobConfig = new PipelineJobManager(migrationJobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); verifyInventoryFinished(jobConfig); checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType())); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 135d62f77104e..6f711ab199211 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -291,7 +291,7 @@ public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() { public TableBasedPipelineJobInfo getJobInfo(final String jobId) { JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobConfigPOJO); - CDCJobConfiguration jobConfig = (CDCJobConfiguration) new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO); + CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO); return new TableBasedPipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames())); } @@ -306,7 +306,7 @@ public void commit(final String jobId) { */ public void dropStreaming(final String jobId) { JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); - CDCJobConfiguration jobConfig = (CDCJobConfiguration) new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO); + CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO); ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineInternalException("Can't drop streaming job which is active")); new PipelineJobManager(this).drop(jobId); cleanup(jobConfig); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java index bc041d9db8133..1f1a9473cd42e 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java @@ -82,7 +82,7 @@ public final class CDCBackendHandler { * @return database */ public String getDatabaseNameByJobId(final String jobId) { - return ((CDCJobConfiguration) jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId))).getDatabaseName(); + return jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)).getDatabaseName(); } /** @@ -130,7 +130,7 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod * @param connectionContext connection context */ public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) { - CDCJobConfiguration cdcJobConfig = (CDCJobConfiguration) jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + CDCJobConfiguration cdcJobConfig = jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId)); if (PipelineJobCenter.isJobExisting(jobId)) { PipelineJobCenter.stop(jobId); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index 93520db26f8c1..51f8e5edc113f 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -268,8 +268,7 @@ private void fillInJobItemInfoWithTimes(final ConsistencyCheckJobItemInfo result } private void fillInJobItemInfoWithCheckAlgorithm(final ConsistencyCheckJobItemInfo result, final String checkJobId) { - ConsistencyCheckJobConfiguration jobConfig = (ConsistencyCheckJobConfiguration) new PipelineJobManager(this) - .getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId)); + ConsistencyCheckJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId)); result.setAlgorithmType(jobConfig.getAlgorithmTypeName()); if (null != jobConfig.getAlgorithmProps()) { result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue())).collect(Collectors.joining(","))); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 4f7a023d21be7..c5d551a55ce89 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -214,7 +214,7 @@ public TableBasedPipelineJobInfo getJobInfo(final String jobId) { JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobConfigPOJO); List sourceTables = new LinkedList<>(); - ((MigrationJobConfiguration) new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO)).getJobShardingDataNodes().forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes() + new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes() .forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode))))); return new TableBasedPipelineJobInfo(jobMetaData, String.join(",", sourceTables)); } @@ -324,7 +324,7 @@ private void dropCheckJobs(final String jobId) { } private void cleanTempTableOnRollback(final String jobId) throws SQLException { - MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + MigrationJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType()); TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); try ( @@ -348,7 +348,7 @@ public void commit(final String jobId) { PipelineJobManager jobManager = new PipelineJobManager(this); jobManager.stop(jobId); dropCheckJobs(jobId); - MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + MigrationJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName()); jobManager.drop(jobId); log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java index 04c35a541bbf8..7762178b079e7 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java @@ -69,8 +69,7 @@ void assertCreateJobConfig() { String parentJobId = parentJobConfig.getJobId(); String checkJobId = jobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null, parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType())); - ConsistencyCheckJobConfiguration checkJobConfig = (ConsistencyCheckJobConfiguration) new PipelineJobManager(jobAPI) - .getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId)); + ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId)); int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE; String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal(); assertThat(checkJobConfig.getJobId(), is(expectCheckJobId)); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index e70b05acc0180..3e67089680ae4 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -150,7 +150,7 @@ void assertStartOrStopById() { void assertRollback() throws SQLException { Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); assertTrue(jobId.isPresent()); - MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get())); + MigrationJobConfiguration jobConfig = jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get())); initTableData(jobConfig); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); @@ -162,7 +162,7 @@ void assertRollback() throws SQLException { void assertCommit() { Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); assertTrue(jobId.isPresent()); - MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get())); + MigrationJobConfiguration jobConfig = jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get())); initTableData(jobConfig); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); @@ -285,7 +285,7 @@ void assertCreateJobConfig() throws SQLException { initIntPrimaryEnvironment(); SourceTargetEntry sourceTargetEntry = new SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order"); String jobId = jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), "logic_db")); - MigrationJobConfiguration actual = (MigrationJobConfiguration) jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + MigrationJobConfiguration actual = jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); assertThat(actual.getTargetDatabaseName(), is("logic_db")); List dataNodeLines = actual.getJobShardingDataNodes(); assertThat(dataNodeLines.size(), is(1));