Skip to content

Commit

Permalink
Fix pipeline e2e for MySQL
Browse files Browse the repository at this point in the history
  • Loading branch information
menghaoranss committed Dec 20, 2024
1 parent c506346 commit 693f1d0
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.TableNameSegment;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -74,15 +73,17 @@ public final class PipelineDDLGenerator {
* @param sourceTableName source table name
* @param targetTableName target table name
* @param parserEngine parser engine
* @param targetDatabaseName target database name
* @return DDL SQL
* @throws SQLException SQL exception
*/
public List<String> generateLogicDDL(final DatabaseType databaseType, final DataSource sourceDataSource,
final String schemaName, final String sourceTableName, final String targetTableName, final SQLParserEngine parserEngine) throws SQLException {
final String schemaName, final String sourceTableName, final String targetTableName,
final SQLParserEngine parserEngine, final String targetDatabaseName) throws SQLException {
long startTimeMillis = System.currentTimeMillis();
List<String> result = new ArrayList<>();
for (String each : DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, databaseType).buildCreateTableSQLs(sourceDataSource, schemaName, sourceTableName)) {
Optional<String> queryContext = decorate(databaseType, sourceDataSource, schemaName, targetTableName, parserEngine, each);
Optional<String> queryContext = decorate(databaseType, targetDatabaseName, schemaName, targetTableName, parserEngine, each);
queryContext.ifPresent(sql -> {
String trimmedSql = sql.trim();
if (!Strings.isNullOrEmpty(trimmedSql)) {
Expand All @@ -95,19 +96,15 @@ public List<String> generateLogicDDL(final DatabaseType databaseType, final Data
return result;
}

private Optional<String> decorate(final DatabaseType databaseType, final DataSource dataSource, final String schemaName, final String targetTableName,
final SQLParserEngine parserEngine, final String sql) throws SQLException {
private Optional<String> decorate(final DatabaseType databaseType, final String targetDatabaseName, final String schemaName, final String targetTableName,
final SQLParserEngine parserEngine, final String sql) {
if (Strings.isNullOrEmpty(sql)) {
return Optional.empty();
}
String databaseName;
try (Connection connection = dataSource.getConnection()) {
databaseName = connection.getCatalog();
}
String result = decorateActualSQL(databaseName, targetTableName, parserEngine, sql.trim());
String result = decorateActualSQL(targetDatabaseName, targetTableName, parserEngine, sql.trim());
// TODO remove it after set search_path is supported.
if ("openGauss".equals(databaseType.getType())) {
return decorateOpenGauss(databaseName, schemaName, result, parserEngine);
return decorateOpenGauss(targetDatabaseName, schemaName, result, parserEngine);
}
return Optional.of(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
Expand All @@ -40,8 +41,10 @@
import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

Expand All @@ -62,14 +65,16 @@ public final class PipelineJobDataSourcePreparer {
* Prepare target schemas.
*
* @param param prepare target schemas parameter
* @return target schemas
* @throws SQLException if prepare target schema fail
*/
public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) throws SQLException {
public Map<String, ShardingSphereMetaData> prepareTargetSchemas(final PrepareTargetSchemasParameter param) throws SQLException {
DatabaseType targetDatabaseType = param.getTargetDatabaseType();
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(targetDatabaseType).getDialectDatabaseMetaData();
if (!dialectDatabaseMetaData.isSchemaAvailable()) {
return;
return Collections.emptyMap();
}
Map<String, ShardingSphereMetaData> result = new HashMap<>(param.getCreateTableConfigurations().size(), 1F);
String defaultSchema = dialectDatabaseMetaData.getDefaultSchema().orElse(null);
PipelinePrepareSQLBuilder pipelineSQLBuilder = new PipelinePrepareSQLBuilder(targetDatabaseType);
Collection<String> createdSchemaNames = new HashSet<>(param.getCreateTableConfigurations().size(), 1F);
Expand All @@ -80,25 +85,29 @@ public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) thro
}
Optional<String> sql = pipelineSQLBuilder.buildCreateSchemaSQL(targetSchemaName);
if (sql.isPresent()) {
executeCreateSchema(param.getDataSourceManager(), each.getTargetDataSourceConfig(), sql.get());
executeCreateSchema(param.getDataSourceManager(), each.getTargetDataSourceConfig(), sql.get()).ifPresent(metaData -> result.put(targetSchemaName, metaData));
createdSchemaNames.add(targetSchemaName);
}
}
return result;
}

private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException {
private Optional<ShardingSphereMetaData> executeCreateSchema(final PipelineDataSourceManager dataSourceManager,
final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException {
log.info("Prepare target schemas SQL: {}", sql);
try (
Connection connection = dataSourceManager.getDataSource(targetDataSourceConfig).getConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
return Optional.of(((ShardingSphereConnection) connection).getContextManager().getMetaDataContexts().getMetaData());
} catch (final SQLException ex) {
if (DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class, databaseType)
.map(DialectPipelineJobDataSourcePrepareOption::isSupportIfNotExistsOnCreateSchema).orElse(true)) {
throw ex;
}
log.warn("Create schema failed", ex);
}
return Optional.empty();
}

/**
Expand All @@ -111,8 +120,12 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws
final long startTimeMillis = System.currentTimeMillis();
PipelineDataSourceManager dataSourceManager = param.getDataSourceManager();
for (CreateTableConfiguration each : param.getCreateTableConfigurations()) {
List<String> createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine(), param.getMetaData());
try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) {
ShardingSphereMetaData metaData = param.getTargetSchemaMetaData().get(each.getTargetName().getSchemaName());
if (null == metaData) {
metaData = ((ShardingSphereConnection) targetConnection).getContextManager().getMetaDataContexts().getMetaData();
}
List<String> createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine(), metaData, param.getTargetDatabaseName());
for (String sql : createTargetTableSQL) {
executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(sql));
}
Expand All @@ -122,13 +135,13 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws
}

private List<String> getCreateTargetTableSQL(final CreateTableConfiguration createTableConfig, final PipelineDataSourceManager dataSourceManager,
final SQLParserEngine sqlParserEngine, final ShardingSphereMetaData metaData) throws SQLException {
final SQLParserEngine sqlParserEngine, final ShardingSphereMetaData metaData, final String targetDatabaseName) throws SQLException {
DatabaseType databaseType = createTableConfig.getSourceDataSourceConfig().getDatabaseType();
DataSource sourceDataSource = dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
String schemaName = createTableConfig.getSourceName().getSchemaName();
String sourceTableName = createTableConfig.getSourceName().getTableName();
String targetTableName = createTableConfig.getTargetName().getTableName();
return new PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine);
return new PipelineDDLGenerator(metaData).generateLogicDDL(databaseType, sourceDataSource, schemaName, sourceTableName, targetTableName, sqlParserEngine, targetDatabaseName);
}

private void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.shardingsphere.infra.parser.SQLParserEngine;

import java.util.Collection;
import java.util.Map;

/**
* Prepare target tables parameter.
Expand All @@ -38,5 +39,7 @@ public final class PrepareTargetTablesParameter {

private final SQLParserEngine sqlParserEngine;

private final ShardingSphereMetaData metaData;
private final Map<String, ShardingSphereMetaData> targetSchemaMetaData;

private final String targetDatabaseName;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@

package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;

import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;

import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -45,11 +50,17 @@ void assertPrepareTargetSchemasWithSchemaNotAvailable() {
}

@Test
@SneakyThrows(SQLException.class)
void assertPrepareTargetTables() {
CreateTableConfiguration createTableConfig = mock(CreateTableConfiguration.class, RETURNS_DEEP_STUBS);
when(createTableConfig.getSourceDataSourceConfig().getDatabaseType()).thenReturn(databaseType);
PipelineDataSourceManager pipelineDataSourceManager = mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS);
ShardingSphereConnection connection = mock(ShardingSphereConnection.class, RETURNS_DEEP_STUBS);
when(pipelineDataSourceManager.getDataSource(any()).getConnection()).thenReturn(connection);
when(connection.getContextManager().getMetaDataContexts().getMetaData()).thenReturn(mock(ShardingSphereMetaData.class));
PrepareTargetTablesParameter parameter = new PrepareTargetTablesParameter(
Collections.singleton(createTableConfig), mock(PipelineDataSourceManager.class, RETURNS_DEEP_STUBS), mock(SQLParserEngine.class), mock(ShardingSphereMetaData.class));
Collections.singleton(createTableConfig), pipelineDataSourceManager,
mock(SQLParserEngine.class), mock(Map.class), "foo_db");
assertDoesNotThrow(() -> new PipelineJobDataSourcePreparer(databaseType).prepareTargetTables(parameter));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.shardingsphere.data.pipeline.core.checker.PipelineDataSourceCheckEngine;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.execute.PipelineExecuteEngine;
Expand Down Expand Up @@ -77,6 +77,7 @@
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;

/**
* Migration job preparer.
Expand Down Expand Up @@ -157,11 +158,11 @@ private void prepareTarget(final MigrationJobItemContext jobItemContext, final D
Collection<CreateTableConfiguration> createTableConfigs = jobItemContext.getTaskConfig().getCreateTableConfigurations();
PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager();
PipelineJobDataSourcePreparer preparer = new PipelineJobDataSourcePreparer(targetDatabaseType);
preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager));
Map<String, ShardingSphereMetaData> targetSchemaMetaData = preparer.prepareTargetSchemas(new PrepareTargetSchemasParameter(targetDatabaseType, createTableConfigs, dataSourceManager));
ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
SQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class)
.getSQLParserEngine(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType());
preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine, metaData));
preparer.prepareTargetTables(new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, sqlParserEngine, targetSchemaMetaData, jobConfig.getTargetDatabaseName()));
}

private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
Expand Down

0 comments on commit 693f1d0

Please sign in to comment.