Skip to content

Commit

Permalink
Refactor TransmissionJobAPI (#29197)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 24, 2023
1 parent 23d60d5 commit 2576aaa
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,20 @@ default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() {
/**
* Build task configuration.
*
* @param pipelineJobConfig pipeline job configuration
* @param jobConfig pipeline job configuration
* @param jobShardingItem job sharding item
* @param pipelineProcessConfig pipeline process configuration
* @param processConfig pipeline process configuration
* @return task configuration
*/
PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);
PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration jobConfig, int jobShardingItem, PipelineProcessConfiguration processConfig);

/**
* Build pipeline process context.
* Build transmission process context.
*
* @param pipelineJobConfig pipeline job configuration
* @return pipeline process context
* @param jobConfig pipeline job configuration
* @return transmission process context
*/
TransmissionProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
TransmissionProcessContext buildProcessContext(PipelineJobConfiguration jobConfig);

/**
* Extend YAML job configuration.
Expand All @@ -78,13 +78,12 @@ default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() {
/**
* Build pipeline data consistency checker.
*
* @param pipelineJobConfig job configuration
* @param jobConfig job configuration
* @param processContext process context
* @param progressContext consistency check job item progress context
* @return all logic tables check result
*/
PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, TransmissionProcessContext processContext,
ConsistencyCheckJobItemProgressContext progressContext);
PipelineDataConsistencyChecker buildDataConsistencyChecker(PipelineJobConfiguration jobConfig, TransmissionProcessContext processContext, ConsistencyCheckJobItemProgressContext progressContext);

/**
* Commit pipeline job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,11 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina
}

@Override
public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) {
CDCJobConfiguration jobConfig = (CDCJobConfiguration) pipelineJobConfig;
TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, jobShardingItem, tableAndSchemaNameMapper);
ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, processConfig, jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, importerConfig);
log.debug("buildTaskConfiguration, result={}", result);
return result;
Expand Down Expand Up @@ -277,9 +277,9 @@ private ImporterConfiguration buildImporterConfiguration(final CDCJobConfigurati
}

@Override
public CDCProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
public CDCProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) {
TransmissionJobManager jobManager = new TransmissionJobManager(this);
return new CDCProcessContext(pipelineJobConfig.getJobId(), jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())));
return new CDCProcessContext(jobConfig.getJobId(), jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
}

@Override
Expand Down Expand Up @@ -325,8 +325,8 @@ public void rollback(final String jobId) throws SQLException {
}

@Override
public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final TransmissionProcessContext processContext,
final ConsistencyCheckJobItemProgressContext progressContext) {
public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext,
final ConsistencyCheckJobItemProgressContext progressContext) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void execute(final ShardingContext shardingContext) {

private CDCJobItemContext buildPipelineJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
CDCProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
CDCProcessContext jobProcessContext = jobAPI.buildProcessContext(jobConfig);
CDCTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ protected void runBlocking() {
TransmissionJobAPI jobAPI = (TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
PipelineJobConfiguration parentJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(parentJobId);
try {
PipelineDataConsistencyChecker checker = jobAPI.buildPipelineDataConsistencyChecker(
parentJobConfig, jobAPI.buildPipelineProcessContext(parentJobConfig), jobItemContext.getProgressContext());
PipelineDataConsistencyChecker checker = jobAPI.buildDataConsistencyChecker(
parentJobConfig, jobAPI.buildProcessContext(parentJobConfig), jobItemContext.getProgressContext());
consistencyChecker.set(checker);
Map<String, TableDataConsistencyCheckResult> checkResultMap = checker.check(checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps());
log.info("job {} with check algorithm '{}' data consistency checker result: {}, stopping: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected TransmissionJobItemContext buildPipelineJobItemContext(final ShardingC
int shardingItem = shardingContext.getShardingItem();
MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
MigrationProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
MigrationProcessContext jobProcessContext = jobAPI.buildProcessContext(jobConfig);
MigrationTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public YamlMigrationJobConfigurationSwapper getYamlJobConfigurationSwapper() {
}

@Override
public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig;
IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator(
jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
Expand All @@ -240,7 +240,7 @@ public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfig
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap(
((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
ImporterConfiguration importerConfig = buildImporterConfiguration(
jobConfig, pipelineProcessConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
jobConfig, processConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
MigrationTaskConfiguration result = new MigrationTaskConfiguration(
incrementalDumperContext.getCommonContext().getDataSourceName(), createTableConfigs, incrementalDumperContext, importerConfig);
log.info("buildTaskConfiguration, result={}", result);
Expand Down Expand Up @@ -275,15 +275,15 @@ private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfi
}

@Override
public MigrationProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
PipelineProcessConfiguration processConfig = new TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId()));
return new MigrationProcessContext(pipelineJobConfig.getJobId(), processConfig);
public MigrationProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) {
PipelineProcessConfiguration processConfig = new TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
return new MigrationProcessContext(jobConfig.getJobId(), processConfig);
}

@Override
public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final TransmissionProcessContext processContext,
final ConsistencyCheckJobItemProgressContext progressContext) {
return new MigrationDataConsistencyChecker((MigrationJobConfiguration) pipelineJobConfig, processContext, progressContext);
public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext,
final ConsistencyCheckJobItemProgressContext progressContext) {
return new MigrationDataConsistencyChecker((MigrationJobConfiguration) jobConfig, processContext, progressContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ void assertDataConsistencyCheck() {
initTableData(jobConfig);
Optional<String> jobId = jobManager.start(jobConfig);
assertTrue(jobId.isPresent());
Map<String, TableDataConsistencyCheckResult> checkResultMap = jobAPI.buildPipelineDataConsistencyChecker(
jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", null);
Map<String, TableDataConsistencyCheckResult> checkResultMap = jobAPI.buildDataConsistencyChecker(
jobConfig, jobAPI.buildProcessContext(jobConfig), new ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", null);
assertThat(checkResultMap.size(), is(1));
String checkKey = "t_order";
assertTrue(checkResultMap.get(checkKey).isMatched());
Expand Down

0 comments on commit 2576aaa

Please sign in to comment.