Skip to content

Commit

Permalink
Remove TransmissionJobOption.extendYamlJobConfiguration() (#29242)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 30, 2023
1 parent d09b94c commit 28814e9
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.job.option;

import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
Expand All @@ -47,14 +45,6 @@ default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() {
*/
PipelineJobInfo getJobInfo(String jobId);

/**
* Extend YAML job configuration.
*
* @param contextKey context key
* @param yamlJobConfig YAML job configuration
*/
void extendYamlJobConfiguration(PipelineContextKey contextKey, YamlPipelineJobConfiguration yamlJobConfig);

/**
* Build task configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@

package org.apache.shardingsphere.data.pipeline.cdc;

import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
Expand Down Expand Up @@ -84,19 +80,6 @@ public PipelineJobInfo getJobInfo(final String jobId) {
return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames()));
}

@Override
public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) yamlJobConfig;
if (null == yamlJobConfig.getJobId()) {
config.setJobId(new CDCJobId(contextKey, config.getSchemaTableNames(), config.isFull(), config.getSinkConfig().getSinkType()).marshal());
}
if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
config.getDataSourceConfiguration().getParameter());
config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
}
}

@Override
public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) {
CDCJobConfiguration jobConfig = (CDCJobConfiguration) pipelineJobConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
import org.apache.shardingsphere.data.pipeline.cdc.CDCJobId;
import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration.YamlSinkConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
Expand All @@ -49,10 +51,10 @@
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
Expand Down Expand Up @@ -113,7 +115,6 @@ public CDCJobAPI() {
public String create(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps) {
PipelineContextKey contextKey = new PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY);
YamlCDCJobConfiguration yamlJobConfig = getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
jobOption.extendYamlJobConfiguration(contextKey, yamlJobConfig);
CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
Expand All @@ -133,6 +134,7 @@ public String create(final StreamDataParameter param, final CDCSinkType sinkType

private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final StreamDataParameter param, final CDCSinkType sinkType, final Properties sinkProps, final PipelineContextKey contextKey) {
YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
result.setJobId(new CDCJobId(contextKey, param.getSchemaTableNames(), param.isFull(), sinkType.name()).marshal());
result.setDatabaseName(param.getDatabaseName());
result.setSchemaTableNames(param.getSchemaTableNames());
result.setFull(param.isFull());
Expand All @@ -148,6 +150,8 @@ private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final StreamDataParam
JobDataNodeLine tableFirstDataNodes = new JobDataNodeLine(param.getDataNodesMap().entrySet().stream()
.map(each -> new JobDataNodeEntry(each.getKey(), each.getValue().subList(0, 1))).collect(Collectors.toList()));
result.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
result.setSourceDatabaseType(PipelineDataSourceConfigurationFactory.newInstance(
result.getDataSourceConfiguration().getType(), result.getDataSourceConfiguration().getParameter()).getDatabaseType().getType());
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
Expand All @@ -47,7 +45,6 @@
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
Expand Down Expand Up @@ -97,14 +94,6 @@ public PipelineJobInfo getJobInfo(final String jobId) {
return new PipelineJobInfo(jobMetaData, null, String.join(",", sourceTables));
}

@Override
public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration) yamlJobConfig;
if (null == yamlJobConfig.getJobId()) {
config.setJobId(new MigrationJobId(contextKey, config.getJobShardingDataNodes()).marshal());
}
}

@Override
public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration;
Expand Down Expand Up @@ -92,16 +93,14 @@
@Slf4j
public final class MigrationJobAPI implements TransmissionJobAPI {

private final TransmissionJobOption jobOption;

private final PipelineJobManager jobManager;

private final PipelineJobConfigurationManager jobConfigManager;

private final PipelineDataSourcePersistService dataSourcePersistService;

public MigrationJobAPI() {
jobOption = new MigrationJobOption();
TransmissionJobOption jobOption = new MigrationJobOption();
jobManager = new PipelineJobManager(jobOption);
jobConfigManager = new PipelineJobConfigurationManager(jobOption);
dataSourcePersistService = new PipelineDataSourcePersistService();
Expand Down Expand Up @@ -164,7 +163,7 @@ private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineCo
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal());
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
jobOption.extendYamlJobConfiguration(contextKey, result);
result.setJobId(new MigrationJobId(contextKey, result.getJobShardingDataNodes()).marshal());
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,12 @@
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.test.util.ConfigurationFileUtils;

import java.sql.Connection;
Expand Down Expand Up @@ -96,7 +93,6 @@ public static YamlMigrationJobConfiguration createYamlMigrationJobConfiguration(
result.setSources(sources);
result.setTarget(createYamlPipelineDataSourceConfiguration(new ShardingSpherePipelineDataSourceConfiguration(
ConfigurationFileUtils.readFile("migration_sharding_sphere_jdbc_target.yaml").replace("${databaseNameSuffix}", databaseNameSuffix))));
((TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, "MIGRATION").getOption()).extendYamlJobConfiguration(contextKey, result);
return result;
}

Expand Down

0 comments on commit 28814e9

Please sign in to comment.