From dbf81f2fd738060246e75036ed524b6ac172112e Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Sat, 4 Nov 2023 19:00:57 +0800 Subject: [PATCH] Rename TableNameSchemaNameMapping to TableAndSchemaNameMapper (#28939) * Refactor ImporterConfiguration.findSchemaName() * Add more constructor on TableNameSchemaNameMapping * Rename TableNameSchemaNameMapping to TableAndSchemaNameMapper --- ...ing.java => TableAndSchemaNameMapper.java} | 13 +++++++-- .../dumper/context/DumperCommonContext.java | 10 +++---- .../context/InventoryDumperContext.java | 2 +- .../TableNameSchemaNameMappingTest.java | 13 +++++++-- .../common/config/ImporterConfiguration.java | 15 +++++----- .../importer/sink/PipelineDataSourceSink.java | 11 ++----- .../preparer/PipelineJobPreparerUtils.java | 2 +- .../datasource/DataSourceCheckEngine.java | 8 ++--- .../datasource/DataSourceCheckEngineTest.java | 6 ++-- .../ingest/MySQLIncrementalDumperTest.java | 4 +-- .../ingest/PostgreSQLWALDumperTest.java | 4 +-- .../ingest/wal/WALEventConverterTest.java | 4 +-- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 29 ++++++------------- .../migration/api/impl/MigrationJobAPI.java | 16 +++++----- ...rationIncrementalDumperContextCreator.java | 10 +++---- .../importer/PipelineDataSourceSinkTest.java | 4 +-- 16 files changed, 75 insertions(+), 76 deletions(-) rename kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/{TableNameSchemaNameMapping.java => TableAndSchemaNameMapper.java} (80%) diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java similarity index 80% rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java index ea56984a4b2aa..3999329ae631c 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java @@ -20,23 +20,30 @@ import lombok.ToString; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.stream.Collectors; /** - * Table name and schema name mapping. + * Table and schema name mapper. */ @ToString -public final class TableNameSchemaNameMapping { +public final class TableAndSchemaNameMapper { private final Map mapping; - public TableNameSchemaNameMapping(final Map tableSchemaMap) { + public TableAndSchemaNameMapper(final Map tableSchemaMap) { mapping = null == tableSchemaMap ? Collections.emptyMap() : getLogicTableNameMap(tableSchemaMap); } + public TableAndSchemaNameMapper(final Collection tableNames) { + Map tableNameSchemaMap = tableNames.stream().map(each -> each.split("\\.")).filter(split -> split.length > 1).collect(Collectors.toMap(split -> split[1], split -> split[0])); + mapping = getLogicTableNameMap(tableNameSchemaMap); + } + private Map getLogicTableNameMap(final Map tableSchemaMap) { Map result = new HashMap<>(tableSchemaMap.size(), 1F); for (Entry entry : tableSchemaMap.entrySet()) { diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java index 69491ddcc9a8a..4a640fdb7bf8b 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java @@ -20,7 +20,7 @@ import lombok.Getter; import lombok.Setter; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; @@ -33,7 +33,7 @@ */ @Getter @Setter -@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"}) +@ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"}) public abstract class DumperCommonContext { private String dataSourceName; @@ -42,7 +42,7 @@ public abstract class DumperCommonContext { private Map tableNameMap; - private TableNameSchemaNameMapping tableNameSchemaNameMapping; + private TableAndSchemaNameMapper tableAndSchemaNameMapper; private IngestPosition position; @@ -77,7 +77,7 @@ public boolean containsTable(final String actualTableName) { * @return schema name. nullable */ public String getSchemaName(final LogicTableName logicTableName) { - return tableNameSchemaNameMapping.getSchemaName(logicTableName); + return tableAndSchemaNameMapper.getSchemaName(logicTableName); } /** @@ -87,6 +87,6 @@ public String getSchemaName(final LogicTableName logicTableName) { * @return schema name, can be nullable */ public String getSchemaName(final ActualTableName actualTableName) { - return tableNameSchemaNameMapping.getSchemaName(getLogicTableName(actualTableName)); + return tableAndSchemaNameMapper.getSchemaName(getLogicTableName(actualTableName)); } } diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java index 45eaf8e1ebeba..d72f07c7eafa6 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java @@ -55,7 +55,7 @@ public InventoryDumperContext(final DumperCommonContext dumperContext) { setDataSourceName(dumperContext.getDataSourceName()); setDataSourceConfig(dumperContext.getDataSourceConfig()); setTableNameMap(dumperContext.getTableNameMap()); - setTableNameSchemaNameMapping(dumperContext.getTableNameSchemaNameMapping()); + setTableAndSchemaNameMapper(dumperContext.getTableAndSchemaNameMapper()); } /** diff --git a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java index a640ee35df031..9eb21b6148c1c 100644 --- a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java +++ b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java @@ -19,7 +19,9 @@ import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; +import java.util.Map; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -30,16 +32,21 @@ class TableNameSchemaNameMappingTest { @Test void assertConstructFromNull() { - assertDoesNotThrow(() -> new TableNameSchemaNameMapping(null)); + assertDoesNotThrow(() -> new TableAndSchemaNameMapper((Map) null)); } @Test void assertConstructFromValueNullMap() { - assertNull(new TableNameSchemaNameMapping(Collections.singletonMap("t_order", null)).getSchemaName("t_order")); + assertNull(new TableAndSchemaNameMapper(Collections.singletonMap("t_order", null)).getSchemaName("t_order")); } @Test void assertConstructFromMap() { - assertThat(new TableNameSchemaNameMapping(Collections.singletonMap("t_order", "public")).getSchemaName("t_order"), is("public")); + assertThat(new TableAndSchemaNameMapper(Collections.singletonMap("t_order", "public")).getSchemaName("t_order"), is("public")); + } + + @Test + void assertConstructFromCollection() { + assertThat(new TableAndSchemaNameMapper(Arrays.asList("public.t_order", "t_order_item")).getSchemaName("t_order"), is("public")); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java index a92bce3830482..0116904291a85 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java @@ -20,7 +20,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm; @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -38,7 +39,7 @@ */ @RequiredArgsConstructor @Getter -@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"}) +@ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"}) public final class ImporterConfiguration { private final PipelineDataSourceConfiguration dataSourceConfig; @@ -46,7 +47,7 @@ public final class ImporterConfiguration { // TODO columnName case-insensitive? private final Map> shardingColumnsMap; - private final TableNameSchemaNameMapping tableNameSchemaNameMapping; + private final TableAndSchemaNameMapper tableAndSchemaNameMapper; private final int batchSize; @@ -76,13 +77,13 @@ public Set getShardingColumns(final String logicTableName) { } /** - * Get schema name. + * Find schema name. * * @param logicTableName logic table name - * @return schema name. nullable + * @return schema name */ - public String getSchemaName(final LogicTableName logicTableName) { + public Optional findSchemaName(final String logicTableName) { DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(dataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData(); - return dialectDatabaseMetaData.isSchemaAvailable() ? tableNameSchemaNameMapping.getSchemaName(logicTableName) : null; + return dialectDatabaseMetaData.isSchemaAvailable() ? Optional.of(tableAndSchemaNameMapper.getSchemaName(logicTableName)) : Optional.empty(); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java index 0995292e0dbb9..fe3134354bdb1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java @@ -26,7 +26,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType; -import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType; @@ -169,7 +168,7 @@ private void doFlush(final DataSource dataSource, final List buffer) private void executeBatchInsert(final Connection connection, final List dataRecords) throws SQLException { DataRecord dataRecord = dataRecords.get(0); - String insertSql = importSQLBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), dataRecord); + String insertSql = importSQLBuilder.buildInsertSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord); try (PreparedStatement preparedStatement = connection.prepareStatement(insertSql)) { batchInsertStatement.set(preparedStatement); preparedStatement.setQueryTimeout(30); @@ -185,10 +184,6 @@ private void executeBatchInsert(final Connection connection, final List dataRecords) throws SQLException { for (DataRecord each : dataRecords) { executeUpdate(connection, each); @@ -199,7 +194,7 @@ private void executeUpdate(final Connection connection, final DataRecord dataRec Set shardingColumns = importerConfig.getShardingColumns(dataRecord.getTableName()); List conditionColumns = RecordUtils.extractConditionColumns(dataRecord, shardingColumns); List setColumns = dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList()); - String updateSql = importSQLBuilder.buildUpdateSQL(getSchemaName(dataRecord.getTableName()), dataRecord, conditionColumns); + String updateSql = importSQLBuilder.buildUpdateSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord, conditionColumns); try (PreparedStatement preparedStatement = connection.prepareStatement(updateSql)) { updateStatement.set(preparedStatement); for (int i = 0; i < setColumns.size(); i++) { @@ -226,7 +221,7 @@ private void executeUpdate(final Connection connection, final DataRecord dataRec private void executeBatchDelete(final Connection connection, final List dataRecords) throws SQLException { DataRecord dataRecord = dataRecords.get(0); - String deleteSQL = importSQLBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()), dataRecord, + String deleteSQL = importSQLBuilder.buildDeleteSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord, RecordUtils.extractConditionColumns(dataRecord, importerConfig.getShardingColumns(dataRecord.getTableName()))); try (PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) { batchDeleteStatement.set(preparedStatement); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java index e649878a0c772..2f38ea4c72c95 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java @@ -166,7 +166,7 @@ public static void checkTargetDataSource(final DatabaseType databaseType, final } DataSourceCheckEngine dataSourceCheckEngine = new DataSourceCheckEngine(databaseType); dataSourceCheckEngine.checkConnection(targetDataSources); - dataSourceCheckEngine.checkTargetTable(targetDataSources, importerConfig.getTableNameSchemaNameMapping(), importerConfig.getLogicTableNames()); + dataSourceCheckEngine.checkTargetTable(targetDataSources, importerConfig.getTableAndSchemaNameMapper(), importerConfig.getLogicTableNames()); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java index f58f89bb8f4c3..7b76c512c8504 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; -import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException; @@ -66,17 +66,17 @@ public void checkConnection(final Collection dataSources) * Check table is empty. * * @param dataSources data sources - * @param tableNameSchemaNameMapping mapping + * @param tableAndSchemaNameMapper mapping * @param logicTableNames logic table names * @throws PrepareJobWithInvalidConnectionException prepare job with invalid connection exception */ // TODO rename to common usage name // TODO Merge schemaName and tableNames - public void checkTargetTable(final Collection dataSources, final TableNameSchemaNameMapping tableNameSchemaNameMapping, final Collection logicTableNames) { + public void checkTargetTable(final Collection dataSources, final TableAndSchemaNameMapper tableAndSchemaNameMapper, final Collection logicTableNames) { try { for (DataSource each : dataSources) { for (String tableName : logicTableNames) { - if (!checkEmpty(each, tableNameSchemaNameMapping.getSchemaName(tableName), tableName)) { + if (!checkEmpty(each, tableAndSchemaNameMapper.getSchemaName(tableName), tableName)) { throw new PrepareJobWithTargetTableNotEmptyException(tableName); } } diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java index 6922b6642ae75..d74115d295de6 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; -import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; @@ -85,7 +85,7 @@ void assertCheckTargetTable() throws SQLException { when(dataSource.getConnection()).thenReturn(connection); when(connection.prepareStatement("SELECT * FROM t_order LIMIT 1")).thenReturn(preparedStatement); when(preparedStatement.executeQuery()).thenReturn(resultSet); - dataSourceCheckEngine.checkTargetTable(dataSources, new TableNameSchemaNameMapping(Collections.emptyMap()), Collections.singletonList("t_order")); + dataSourceCheckEngine.checkTargetTable(dataSources, new TableAndSchemaNameMapper(Collections.emptyMap()), Collections.singletonList("t_order")); } @Test @@ -95,6 +95,6 @@ void assertCheckTargetTableFailed() throws SQLException { when(preparedStatement.executeQuery()).thenReturn(resultSet); when(resultSet.next()).thenReturn(true); assertThrows(PrepareJobWithTargetTableNotEmptyException.class, - () -> dataSourceCheckEngine.checkTargetTable(dataSources, new TableNameSchemaNameMapping(Collections.emptyMap()), Collections.singletonList("t_order"))); + () -> dataSourceCheckEngine.checkTargetTable(dataSources, new TableAndSchemaNameMapper(Collections.emptyMap()), Collections.singletonList("t_order"))); } } diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index 876dd1beacfa8..25bbaa33ff13e 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest; -import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; @@ -92,7 +92,7 @@ private IncrementalDumperContext createDumperContext() { IncrementalDumperContext result = new IncrementalDumperContext(); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root")); result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); - result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap())); + result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); return result; } diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java index c177dbb5963a0..e59c6a0f3539e 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest; -import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; @@ -108,7 +108,7 @@ private IncrementalDumperContext createDumperContext(final String jdbcUrl, final result.setJobId("0101123456"); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password)); result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order"))); - result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap())); + result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); return result; } diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index 4d5f9d35ffd81..93a4e72107496 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal; -import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; @@ -87,7 +87,7 @@ private IncrementalDumperContext mockDumperContext() { IncrementalDumperContext result = new IncrementalDumperContext(); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root")); result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); - result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap())); + result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); return result; } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 23035491e14f7..d6015da0a5ed0 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -21,7 +21,7 @@ import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; @@ -177,7 +177,7 @@ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) { if (getJobItemProgress(jobId, i).isPresent()) { continue; } - IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames())); + IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames())); InventoryIncrementalJobItemProgress jobItemProgress = getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext); PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress( jobId, i, YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress))); @@ -267,26 +267,15 @@ protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) { @Override public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) { CDCJobConfiguration jobConfig = (CDCJobConfiguration) pipelineJobConfig; - TableNameSchemaNameMapping tableNameSchemaNameMapping = getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames()); - IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, jobShardingItem, tableNameSchemaNameMapping); - ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, jobConfig.getSchemaTableNames(), tableNameSchemaNameMapping); + TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()); + IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, jobShardingItem, tableAndSchemaNameMapper); + ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper); CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, importerConfig); log.debug("buildTaskConfiguration, result={}", result); return result; } - private TableNameSchemaNameMapping getTableNameSchemaNameMapping(final Collection tableNames) { - Map tableNameSchemaMap = new LinkedHashMap<>(); - for (String each : tableNames) { - String[] split = each.split("\\."); - if (split.length > 1) { - tableNameSchemaMap.put(split[1], split[0]); - } - } - return new TableNameSchemaNameMapping(tableNameSchemaMap); - } - - private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableNameSchemaNameMapping tableNameSchemaNameMapping) { + private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem); Map tableNameMap = new LinkedHashMap<>(); dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName())))); @@ -297,13 +286,13 @@ private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jo result.setDataSourceName(dataSourceName); result.setDataSourceConfig(actualDataSourceConfig); result.setTableNameMap(tableNameMap); - result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping); + result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); result.setDecodeWithTX(jobConfig.isDecodeWithTX()); return result; } private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, final Collection schemaTableNames, - final TableNameSchemaNameMapping tableNameSchemaNameMapping) { + final TableAndSchemaNameMapper tableAndSchemaNameMapper) { PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(), jobConfig.getDataSourceConfig().getParameter()); CDCProcessContext processContext = new CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig); @@ -311,7 +300,7 @@ private ImporterConfiguration buildImporterConfiguration(final CDCJobConfigurati int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); Map> shardingColumnsMap = new ShardingColumnsExtractor() .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), schemaTableNames.stream().map(LogicTableName::new).collect(Collectors.toSet())); - return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, tableNameSchemaNameMapping, batchSize, writeRateLimitAlgorithm, 0, 1); + return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1); } @Override diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 2c74e4a0d1ae8..218d70427d9b2 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -19,7 +19,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; @@ -264,21 +264,21 @@ public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfig MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig; IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator( jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); - CreateTableConfiguration createTableConfig = buildCreateTableConfiguration(jobConfig, incrementalDumperContext.getTableNameSchemaNameMapping()); + CreateTableConfiguration createTableConfig = buildCreateTableConfiguration(jobConfig, incrementalDumperContext.getTableAndSchemaNameMapper()); Set targetTableNames = jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet()); Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); - ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, shardingColumnsMap, incrementalDumperContext.getTableNameSchemaNameMapping()); + ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, shardingColumnsMap, incrementalDumperContext.getTableAndSchemaNameMapper()); MigrationTaskConfiguration result = new MigrationTaskConfiguration(incrementalDumperContext.getDataSourceName(), createTableConfig, incrementalDumperContext, importerConfig); log.info("buildTaskConfiguration, result={}", result); return result; } private CreateTableConfiguration buildCreateTableConfiguration(final MigrationJobConfiguration jobConfig, - final TableNameSchemaNameMapping tableNameSchemaNameMapping) { + final TableAndSchemaNameMapper tableAndSchemaNameMapper) { Collection createTableEntries = new LinkedList<>(); for (JobDataNodeEntry each : jobConfig.getTablesFirstDataNodes().getEntries()) { - String sourceSchemaName = tableNameSchemaNameMapping.getSchemaName(each.getLogicTableName()); + String sourceSchemaName = tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName()); DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData(); String targetSchemaName = dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null; DataNode dataNode = each.getDataNodes().get(0); @@ -294,13 +294,13 @@ sourceDataSourceConfig, new SchemaTableName(sourceSchemaName, dataNode.getTableN } private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, - final Map> shardingColumnsMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) { + final Map> shardingColumnsMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { MigrationProcessContext processContext = new MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig); JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm(); int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); int retryTimes = jobConfig.getRetryTimes(); int concurrency = jobConfig.getConcurrency(); - return new ImporterConfiguration(jobConfig.getTarget(), shardingColumnsMap, tableNameSchemaNameMapping, batchSize, writeRateLimitAlgorithm, retryTimes, concurrency); + return new ImporterConfiguration(jobConfig.getTarget(), shardingColumnsMap, tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, retryTimes, concurrency); } @Override @@ -372,7 +372,7 @@ private void dropCheckJobs(final String jobId) { private void cleanTempTableOnRollback(final String jobId) throws SQLException { MigrationJobConfiguration jobConfig = getJobConfiguration(jobId); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType()); - TableNameSchemaNameMapping mapping = new TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap()); + TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); try ( PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget()); Connection connection = dataSource.getConnection()) { diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java index c63c5243c52e4..06ee39a408b0c 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; @@ -41,19 +41,19 @@ public final class MigrationIncrementalDumperContextCreator implements Increment @Override public IncrementalDumperContext createDumperContext(final JobDataNodeLine jobDataNodeLine) { Map tableNameMap = JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine); - TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap()); + TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); String dataSourceName = jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName(); - return buildDumperContext(jobConfig.getJobId(), dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMap, tableNameSchemaNameMapping); + return buildDumperContext(jobConfig.getJobId(), dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMap, tableAndSchemaNameMapper); } private IncrementalDumperContext buildDumperContext(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource, - final Map tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) { + final Map tableNameMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { IncrementalDumperContext result = new IncrementalDumperContext(); result.setJobId(jobId); result.setDataSourceName(dataSourceName); result.setDataSourceConfig(sourceDataSource); result.setTableNameMap(tableNameMap); - result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping); + result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); return result; } } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java index cbeec358b0c7a..487df1e7d48c1 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.importer; -import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; @@ -193,6 +193,6 @@ private DataRecord getDataRecord(final String recordType) { private ImporterConfiguration mockImporterConfiguration() { Map> shardingColumnsMap = Collections.singletonMap(new LogicTableName("test_table"), Collections.singleton("user")); - return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, null, 3, 3); + return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, new TableAndSchemaNameMapper(Collections.emptyMap()), 1000, null, 3, 3); } }