diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java deleted file mode 100644 index c08946e05c75e..0000000000000 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.core.datasource; - -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import lombok.SneakyThrows; -import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator; -import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; - -import javax.sql.DataSource; -import java.sql.SQLException; - -/** - * Pipeline data source factory. - */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class PipelineDataSourceFactory { - - /** - * New instance data source wrapper. - * - * @param pipelineDataSourceConfig pipeline data source configuration - * @return new data source wrapper - */ - @SneakyThrows(SQLException.class) - public static PipelineDataSourceWrapper newInstance(final PipelineDataSourceConfiguration pipelineDataSourceConfig) { - DataSource dataSource = TypedSPILoader.getService( - PipelineDataSourceCreator.class, pipelineDataSourceConfig.getType()).create(pipelineDataSourceConfig.getDataSourceConfiguration()); - return new PipelineDataSourceWrapper(dataSource, pipelineDataSourceConfig.getDatabaseType()); - } -} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java index 6ef42f14e6b5b..ad731b33cbc48 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java @@ -51,7 +51,7 @@ public PipelineDataSourceWrapper getDataSource(final PipelineDataSourceConfigura } log.warn("{} is already closed, create again.", result); } - result = PipelineDataSourceFactory.newInstance(dataSourceConfig); + result = new PipelineDataSourceWrapper(dataSourceConfig); cachedDataSources.put(dataSourceConfig, result); return result; } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java index 253e9a9a16a36..162ba91a4ef25 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceWrapper.java @@ -19,9 +19,13 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer; +import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import javax.sql.DataSource; import java.io.PrintWriter; @@ -45,6 +49,12 @@ public final class PipelineDataSourceWrapper implements DataSource, AutoCloseabl private final AtomicBoolean closed = new AtomicBoolean(false); + @SneakyThrows(SQLException.class) + public PipelineDataSourceWrapper(final PipelineDataSourceConfiguration pipelineDataSourceConfig) { + this(TypedSPILoader.getService(PipelineDataSourceCreator.class, pipelineDataSourceConfig.getType()).create(pipelineDataSourceConfig.getDataSourceConfiguration()), + pipelineDataSourceConfig.getDatabaseType()); + } + /** * Whether underlying data source is closed or not. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java index 25dbf13313459..f1e78d620f806 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtils.java @@ -22,7 +22,6 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper; import java.sql.Connection; @@ -43,7 +42,7 @@ public final class PipelineSchemaUtils { */ @SneakyThrows(SQLException.class) public static String getDefaultSchema(final PipelineDataSourceConfiguration dataSourceConfig) { - try (PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(dataSourceConfig)) { + try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(dataSourceConfig)) { try (Connection connection = dataSource.getConnection()) { String result = connection.getSchema(); log.info("get default schema {}", result); diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactoryTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactoryTest.java deleted file mode 100644 index 829aa34d92204..0000000000000 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactoryTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.core.datasource; - -import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - -class PipelineDataSourceFactoryTest { - - @Test - void assertNewInstance() { - Map yamlDataSourceConfig = new HashMap<>(3, 1F); - yamlDataSourceConfig.put("url", "jdbc:mysql://localhost:3306/database"); - yamlDataSourceConfig.put("username", "username"); - yamlDataSourceConfig.put("password", "password"); - PipelineDataSourceConfiguration pipelineDataSourceConfig = new StandardPipelineDataSourceConfiguration(yamlDataSourceConfig); - assertThat(PipelineDataSourceFactory.newInstance(pipelineDataSourceConfig).getDatabaseType(), is(pipelineDataSourceConfig.getDatabaseType())); - } -} diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java index df1dc31201e34..3e16d0e239fed 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java @@ -27,23 +27,24 @@ import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry; import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType; -import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils; -import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException; import org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException; import org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; +import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtils; +import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder; +import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.MigrateTableStatement; +import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; @@ -67,8 +68,6 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration; import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper; import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine; -import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.MigrateTableStatement; -import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry; import org.apache.shardingsphere.mode.manager.ContextManager; import java.sql.Connection; @@ -322,7 +321,7 @@ private void cleanTempTableOnRollback(final String jobId) throws SQLException { PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(jobConfig.getTargetDatabaseType()); TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); try ( - PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget()); + PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(jobConfig.getTarget()); Connection connection = dataSource.getConnection()) { for (String each : jobConfig.getTargetTableNames()) { String targetSchemaName = mapping.getSchemaName(each); diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java index f0cf4df32cd93..d3ba2ec2a0044 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java @@ -24,7 +24,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory; +import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.core.job.JobStatus; import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType; import org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrlAppender; @@ -580,7 +580,7 @@ public DataSource generateShardingSphereDataSourceFromProxy() { YamlSingleRuleConfiguration singleRuleConfig = new YamlSingleRuleConfiguration(); singleRuleConfig.setTables(Collections.singletonList("*.*")); rootConfig.getRules().add(singleRuleConfig); - return PipelineDataSourceFactory.newInstance(new ShardingSpherePipelineDataSourceConfiguration(rootConfig)); + return new PipelineDataSourceWrapper(new ShardingSpherePipelineDataSourceConfiguration(rootConfig)); } private YamlRootConfiguration getYamlRootConfig() { diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java index a3e2df5d89e48..8720812aad863 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java @@ -27,16 +27,15 @@ import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter; import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; @@ -175,7 +174,7 @@ private void initSchemaAndTable(final PipelineContainerComposer containerCompose } private DataSource createStandardDataSource(final PipelineContainerComposer containerComposer, final String storageUnitName) { - return PipelineDataSourceFactory.newInstance(new StandardPipelineDataSourceConfiguration(containerComposer.getActualJdbcUrlTemplate(storageUnitName, false), + return new PipelineDataSourceWrapper(new StandardPipelineDataSourceConfiguration(containerComposer.getActualJdbcUrlTemplate(storageUnitName, false), containerComposer.getUsername(), containerComposer.getPassword())); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java index 04af7e5121ef1..6e88c183f8bb4 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java @@ -24,7 +24,6 @@ import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.config.YamlPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; @@ -83,7 +82,7 @@ public static YamlMigrationJobConfiguration createYamlMigrationJobConfiguration( PipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration( ConfigurationFileUtils.readFile("migration_standard_jdbc_source.yaml").replace("${databaseNameSuffix}", databaseNameSuffix)); try ( - PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig); + PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(sourceDataSourceConfig); Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { statement.execute(PipelineContextUtils.getCreateOrderTableSchema());