diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java index a0510de5440a6..95b8b3b85ef61 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java @@ -43,7 +43,6 @@ import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment; import javax.sql.DataSource; -import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; @@ -74,15 +73,17 @@ public final class PipelineDDLGenerator { * @param sourceTableName source table name * @param targetTableName target table name * @param parserEngine parser engine + * @param targetDatabaseName target database name * @return DDL SQL * @throws SQLException SQL exception */ public List generateLogicDDL(final DatabaseType databaseType, final DataSource sourceDataSource, - final String schemaName, final String sourceTableName, final String targetTableName, final SQLParserEngine parserEngine) throws SQLException { + final String schemaName, final String sourceTableName, final String targetTableName, + final SQLParserEngine parserEngine, final String targetDatabaseName) throws SQLException { long startTimeMillis = System.currentTimeMillis(); List result = new ArrayList<>(); for (String each : DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, databaseType).buildCreateTableSQLs(sourceDataSource, schemaName, sourceTableName)) { - Optional queryContext = decorate(databaseType, sourceDataSource, schemaName, targetTableName, parserEngine, each); + Optional queryContext = decorate(databaseType, targetDatabaseName, schemaName, targetTableName, parserEngine, each); queryContext.ifPresent(sql -> { String trimmedSql = sql.trim(); if (!Strings.isNullOrEmpty(trimmedSql)) { @@ -95,19 +96,15 @@ public List generateLogicDDL(final DatabaseType databaseType, final Data return result; } - private Optional decorate(final DatabaseType databaseType, final DataSource dataSource, final String schemaName, final String targetTableName, - final SQLParserEngine parserEngine, final String sql) throws SQLException { + private Optional decorate(final DatabaseType databaseType, final String targetDatabaseName, final String schemaName, final String targetTableName, + final SQLParserEngine parserEngine, final String sql) { if (Strings.isNullOrEmpty(sql)) { return Optional.empty(); } - String databaseName; - try (Connection connection = dataSource.getConnection()) { - databaseName = connection.getCatalog(); - } - String result = decorateActualSQL(databaseName, targetTableName, parserEngine, sql.trim()); + String result = decorateActualSQL(targetDatabaseName, targetTableName, parserEngine, sql.trim()); // TODO remove it after set search_path is supported. if ("openGauss".equals(databaseType.getType())) { - return decorateOpenGauss(databaseName, schemaName, result, parserEngine); + return decorateOpenGauss(targetDatabaseName, schemaName, result, parserEngine); } return Optional.of(result); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java index 2901c6164427e..c256c37ae2a14 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java @@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder; +import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; @@ -40,8 +41,10 @@ import java.sql.Statement; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.regex.Pattern; @@ -62,14 +65,16 @@ public final class PipelineJobDataSourcePreparer { * Prepare target schemas. * * @param param prepare target schemas parameter + * @return target schemas * @throws SQLException if prepare target schema fail */ - public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) throws SQLException { + public Map prepareTargetSchemas(final PrepareTargetSchemasParameter param) throws SQLException { DatabaseType targetDatabaseType = param.getTargetDatabaseType(); DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData(); if (!dialectDatabaseMetaData.isSchemaAvailable()) { - return; + return Collections.emptyMap(); } + Map result = new HashMap<>(param.getCreateTableConfigurations().size(), 1F); String defaultSchema = dialectDatabaseMetaData.getDefaultSchema().orElse(null); PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(targetDatabaseType); Collection createdSchemaNames = new HashSet<>(param.getCreateTableConfigurations().size(), 1F); @@ -80,18 +85,21 @@ public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) thro } Optional sql = pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName); if (sql.isPresent()) { - executeCreateSchema(param.getDataSourceManager(), each.getTargetDataSourceConfig(), sql.get()); + executeCreateSchema(param.getDataSourceManager(), each.getTargetDataSourceConfig(), sql.get()).ifPresent(metaData -> result.put(targetSchemaName, metaData)); createdSchemaNames.add(targetSchemaName); } } + return result; } - private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException { + private Optional executeCreateSchema(final PipelineDataSourceManager dataSourceManager, + final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException { log.info("Prepare target schemas SQL: {}", sql); try ( Connection connection = dataSourceManager.getDataSource(targetDataSourceConfig).getConnection(); Statement statement = connection.createStatement()) { statement.execute(sql); + return Optional.of(((ShardingSphereConnection) connection).getContextManager().getMetaDataContexts().getMetaData()); } catch (final SQLException ex) { if (DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class, databaseType) .map(DialectPipelineJobDataSourcePrepareOption::isSupportIfNotExistsOnCreateSchema).orElse(true)) { @@ -99,6 +107,7 @@ private void executeCreateSchema(final PipelineDataSourceManager dataSourceManag } log.warn("Create schema failed", ex); } + return Optional.empty(); } /** @@ -111,8 +120,12 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws final long startTimeMillis = System.currentTimeMillis(); PipelineDataSourceManager dataSourceManager = param.getDataSourceManager(); for (CreateTableConfiguration each : param.getCreateTableConfigurations()) { - List createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine(), param.getMetaData()); try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) { + ShardingSphereMetaData metaData = param.getTargetSchemaMetaData().get(each.getTargetName().getSchemaName()); + if (null == metaData) { + metaData = ((ShardingSphereConnection) targetConnection).getContextManager().getMetaDataContexts().getMetaData(); + } + List createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine(), metaData, param.getTargetDatabaseName()); for (String sql : createTargetTableSQL) { executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(sql)); } @@ -122,13 +135,13 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws } private List getCreateTargetTableSQL(final CreateTableConfiguration createTableConfig, final PipelineDataSourceManager dataSourceManager, - final SQLParserEngine sqlParserEngine, final ShardingSphereMetaData metaData) throws SQLException { + final SQLParserEngine sqlParserEngine, final ShardingSphereMetaData metaData, final String targetDatabaseName) throws SQLException { DatabaseType databaseType = createTableConfig.getSourceDataSourceConfig().getDatabaseType(); DataSource sourceDataSource = dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig()); String schemaName = createTableConfig.getSourceName().getSchemaName(); String sourceTableName = createTableConfig.getSourceName().getTableName(); String targetTableName = createTableConfig.getTargetName().getTableName(); - return new PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine); + return new PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine, targetDatabaseName); } private void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java index d9be8f69bdb2c..f14bb707ff0d7 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java @@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.parser.SQLParserEngine; import java.util.Collection; +import java.util.Map; /** * Prepare target tables parameter. @@ -38,5 +39,7 @@ public final class PrepareTargetTablesParameter { private final SQLParserEngine sqlParserEngine; - private final ShardingSphereMetaData metaData; + private final Map targetSchemaMetaData; + + private final String targetDatabaseName; } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java index 06d26a8cf5e54..73013470e88e8 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparerTest.java @@ -17,19 +17,24 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; +import lombok.SneakyThrows; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter; +import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.parser.SQLParserEngine; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.junit.jupiter.api.Test; +import java.sql.SQLException; import java.util.Collections; +import java.util.Map; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -45,11 +50,17 @@ void assertPrepareTargetSchemasWithSchemaNotAvailable() { } @Test + @SneakyThrows(SQLException.class) void assertPrepareTargetTables() { CreateTableConfiguration createTableConfig = mock(CreateTableConfiguration.class, RETURNS_DEEP_STUBS); when(createTableConfig.getSourceDataSourceConfig().getDatabaseType()).thenReturn(databaseType); + PipelineDataSourceManager pipelineDataSourceManager = mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS); + ShardingSphereConnection connection = mock(ShardingSphereConnection.class, RETURNS_DEEP_STUBS); + when(pipelineDataSourceManager.getDataSource(any()).getConnection()).thenReturn(connection); + when(connection.getContextManager().getMetaDataContexts().getMetaData()).thenReturn(mock(ShardingSphereMetaData.class)); PrepareTargetTablesParameter parameter = new PrepareTargetTablesParameter( - Collections.singleton(createTableConfig), mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS), mock(SQLParserEngine.class), mock(ShardingSphereMetaData.class)); + Collections.singleton(createTableConfig), pipelineDataSourceManager, + mock(SQLParserEngine.class), mock(Map.class), "foo_db"); assertDoesNotThrow(() -> new PipelineJobDataSourcePreparer(databaseType).prepareTargetTables(parameter)); } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java index 110ff5ac616a8..ecb2df8815d92 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java @@ -24,8 +24,8 @@ import org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine; import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource; +import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException; import org.apache.shardingsphere.data.pipeline.core.execute.PipelineExecuteEngine; @@ -77,6 +77,7 @@ import java.sql.SQLException; import java.util.Collection; import java.util.Collections; +import java.util.Map; /** * Migration job preparer. @@ -157,11 +158,11 @@ private void prepareTarget(final MigrationJobItemContext jobItemContext, final D Collection createTableConfigs = jobItemContext.getTaskConfig().getCreateTableConfigurations(); PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager(); PipelineJobDataSourcePreparer preparer = new PipelineJobDataSourcePreparer(targetDatabaseType); - preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager)); + Map targetSchemaMetaData = preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager)); ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData(); SQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class) .getSQLParserEngine(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType()); - preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine, metaData)); + preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine, targetSchemaMetaData, jobConfig.getTargetDatabaseName())); } private void prepareIncremental(final MigrationJobItemContext jobItemContext) {