Skip to content

Commit

Permalink
Refactor PipelineDataSourceFactory (#29503)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Dec 22, 2023
1 parent 835b055 commit de114d5
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 110 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public PipelineDataSourceWrapper getDataSource(final PipelineDataSourceConfigura
}
log.warn("{} is already closed, create again.", result);
}
result = PipelineDataSourceFactory.newInstance(dataSourceConfig);
result = new PipelineDataSourceWrapper(dataSourceConfig);
cachedDataSources.put(dataSourceConfig, result);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

import javax.sql.DataSource;
import java.io.PrintWriter;
Expand All @@ -45,6 +49,12 @@ public final class PipelineDataSourceWrapper implements DataSource, AutoCloseabl

private final AtomicBoolean closed = new AtomicBoolean(false);

@SneakyThrows(SQLException.class)
public PipelineDataSourceWrapper(final PipelineDataSourceConfiguration pipelineDataSourceConfig) {
this(TypedSPILoader.getService(PipelineDataSourceCreator.class, pipelineDataSourceConfig.getType()).create(pipelineDataSourceConfig.getDataSourceConfiguration()),
pipelineDataSourceConfig.getDatabaseType());
}

/**
* Whether underlying data source is closed or not.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;

import java.sql.Connection;
Expand All @@ -43,7 +42,7 @@ public final class PipelineSchemaUtils {
*/
@SneakyThrows(SQLException.class)
public static String getDefaultSchema(final PipelineDataSourceConfiguration dataSourceConfig) {
try (PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(dataSourceConfig)) {
try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(dataSourceConfig)) {
try (Connection connection = dataSource.getConnection()) {
String result = connection.getSchema();
log.info("get default schema {}", result);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,24 @@
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
import org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
import org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.MigrateTableStatement;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
Expand All @@ -67,8 +68,6 @@
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.MigrateTableStatement;
import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
import org.apache.shardingsphere.mode.manager.ContextManager;

import java.sql.Connection;
Expand Down Expand Up @@ -322,7 +321,7 @@ private void cleanTempTableOnRollback(final String jobId) throws SQLException {
PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(jobConfig.getTarget());
Connection connection = dataSource.getConnection()) {
for (String each : jobConfig.getTargetTableNames()) {
String targetSchemaName = mapping.getSchemaName(each);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrlAppender;
Expand Down Expand Up @@ -580,7 +580,7 @@ public DataSource generateShardingSphereDataSourceFromProxy() {
YamlSingleRuleConfiguration singleRuleConfig = new YamlSingleRuleConfiguration();
singleRuleConfig.setTables(Collections.singletonList("*.*"));
rootConfig.getRules().add(singleRuleConfig);
return PipelineDataSourceFactory.newInstance(new ShardingSpherePipelineDataSourceConfiguration(rootConfig));
return new PipelineDataSourceWrapper(new ShardingSpherePipelineDataSourceConfiguration(rootConfig));
}

private YamlRootConfiguration getYamlRootConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
Expand Down Expand Up @@ -175,7 +174,7 @@ private void initSchemaAndTable(final PipelineContainerComposer containerCompose
}

private DataSource createStandardDataSource(final PipelineContainerComposer containerComposer, final String storageUnitName) {
return PipelineDataSourceFactory.newInstance(new StandardPipelineDataSourceConfiguration(containerComposer.getActualJdbcUrlTemplate(storageUnitName, false),
return new PipelineDataSourceWrapper(new StandardPipelineDataSourceConfiguration(containerComposer.getActualJdbcUrlTemplate(storageUnitName, false),
containerComposer.getUsername(), containerComposer.getPassword()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
Expand Down Expand Up @@ -83,7 +82,7 @@ public static YamlMigrationJobConfiguration createYamlMigrationJobConfiguration(
PipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(
ConfigurationFileUtils.readFile("migration_standard_jdbc_source.yaml").replace("${databaseNameSuffix}", databaseNameSuffix));
try (
PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(sourceDataSourceConfig);
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute(PipelineContextUtils.getCreateOrderTableSchema());
Expand Down

0 comments on commit de114d5

Please sign in to comment.