From 05e81f0c0c31c1312d32d80a0d5397d03ce3795b Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 4 Nov 2023 19:10:44 +0800 Subject: [PATCH 1/7] Rename TableAndSchemaNameMapperTest --- ...maNameMappingTest.java => TableAndSchemaNameMapperTest.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/{TableNameSchemaNameMappingTest.java => TableAndSchemaNameMapperTest.java} (98%) diff --git a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapperTest.java similarity index 98% rename from kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java rename to kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapperTest.java index 9eb21b6148c1c..fc9521a4ed02b 100644 --- a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java +++ b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapperTest.java @@ -28,7 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertNull; -class TableNameSchemaNameMappingTest { +class TableAndSchemaNameMapperTest { @Test void assertConstructFromNull() { From 0900361906c583ea90238acb4d42d5d3b8da6e4e Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 4 Nov 2023 20:25:58 +0800 Subject: [PATCH 2/7] Move TableAndSchemaNameMapper --- .../pipeline/api/ingest/dumper/context/DumperCommonContext.java | 2 +- .../dumper/context/mapper}/TableAndSchemaNameMapper.java | 2 +- .../dumper/context/mapper}/TableAndSchemaNameMapperTest.java | 2 +- .../data/pipeline/common/config/ImporterConfiguration.java | 2 +- .../core/preparer/datasource/DataSourceCheckEngine.java | 2 +- .../core/preparer/datasource/DataSourceCheckEngineTest.java | 2 +- .../data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java | 2 +- .../pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java | 2 +- .../pipeline/postgresql/ingest/wal/WALEventConverterTest.java | 2 +- .../shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java | 2 +- .../pipeline/scenario/migration/api/impl/MigrationJobAPI.java | 2 +- .../config/ingest/MigrationIncrementalDumperContextCreator.java | 2 +- .../data/pipeline/core/importer/PipelineDataSourceSinkTest.java | 2 +- 13 files changed, 13 insertions(+), 13 deletions(-) rename kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/{context => ingest/dumper/context/mapper}/TableAndSchemaNameMapper.java (97%) rename kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/{context => ingest/dumper/context/mapper}/TableAndSchemaNameMapperTest.java (95%) diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java index 4a640fdb7bf8b..1e57e6d2df9ad 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java @@ -20,7 +20,7 @@ import lombok.Getter; import lombok.Setter; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java similarity index 97% rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java index 3999329ae631c..329a24d3a1ef1 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.context; +package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper; import lombok.ToString; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; diff --git a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapperTest.java b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapperTest.java similarity index 95% rename from kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapperTest.java rename to kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapperTest.java index fc9521a4ed02b..465ee71c7eac8 100644 --- a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapperTest.java +++ b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapperTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.context; +package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper; import org.junit.jupiter.api.Test; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java index 0116904291a85..1438d9420986b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java @@ -20,7 +20,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java index 7b76c512c8504..d801148321c24 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException; diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java index d74115d295de6..0518f42811596 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index 25bbaa33ff13e..5a84c8de293ce 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java index e59c6a0f3539e..dd06d1ee041b6 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index 93a4e72107496..0adcb2c1586e6 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index d6015da0a5ed0..48981568166d2 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -21,7 +21,7 @@ import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 218d70427d9b2..0be975696eda6 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -19,7 +19,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java index 06ee39a408b0c..39718dead3623 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java index 487df1e7d48c1..3fc0ace7fe6e0 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.importer; -import org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; From 40543d6239b4f6f6bd68be715a8d58a6e8bb6db3 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 4 Nov 2023 21:03:38 +0800 Subject: [PATCH 3/7] Add ActualAndLogicTableNameMapper --- .../dumper/context/DumperCommonContext.java | 51 +++-------------- .../mapper/ActualAndLogicTableNameMapper.java | 57 +++++++++++++++++++ .../pipeline/core/dumper/InventoryDumper.java | 11 ++-- .../InventoryRecordsCountCalculator.java | 3 +- .../core/preparer/InventoryTaskSplitter.java | 11 ++-- .../mysql/ingest/MySQLIncrementalDumper.java | 9 ++- .../ingest/wal/WALEventConverter.java | 7 +-- 7 files changed, 82 insertions(+), 67 deletions(-) create mode 100644 kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java index 1e57e6d2df9ad..3fc7175ad8929 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java @@ -20,16 +20,13 @@ import lombok.Getter; import lombok.Setter; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; -import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; -import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; - -import java.util.Map; /** - * Base dumper context. + * Dumper common context. */ @Getter @Setter @@ -40,53 +37,19 @@ public abstract class DumperCommonContext { private PipelineDataSourceConfiguration dataSourceConfig; - private Map tableNameMap; + private ActualAndLogicTableNameMapper tableNameMapper; private TableAndSchemaNameMapper tableAndSchemaNameMapper; private IngestPosition position; - /** - * Get logic table name. - * - * @param actualTableName actual table name - * @return logic table name - */ - public LogicTableName getLogicTableName(final String actualTableName) { - return tableNameMap.get(new ActualTableName(actualTableName)); - } - - private LogicTableName getLogicTableName(final ActualTableName actualTableName) { - return tableNameMap.get(actualTableName); - } - - /** - * Whether contains table. - * - * @param actualTableName actual table name - * @return contains or not - */ - public boolean containsTable(final String actualTableName) { - return tableNameMap.containsKey(new ActualTableName(actualTableName)); - } - - /** - * Get schema name. - * - * @param logicTableName logic table name - * @return schema name. nullable - */ - public String getSchemaName(final LogicTableName logicTableName) { - return tableAndSchemaNameMapper.getSchemaName(logicTableName); - } - /** * Get schema name. * * @param actualTableName actual table name - * @return schema name, can be nullable + * @return schema name, can be nullable */ - public String getSchemaName(final ActualTableName actualTableName) { - return tableAndSchemaNameMapper.getSchemaName(getLogicTableName(actualTableName)); + public String getSchemaName(final String actualTableName) { + return tableAndSchemaNameMapper.getSchemaName(tableNameMapper.getLogicTableName(actualTableName)); } } diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java new file mode 100644 index 0000000000000..475c1742d5f0a --- /dev/null +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; +import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; +import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; + +import java.util.Map; + +/** + * Actual table name and logic table name mapper. + */ +@RequiredArgsConstructor +@Getter +@ToString +public final class ActualAndLogicTableNameMapper { + + private final Map tableNameMap; + + /** + * Get logic table name. + * + * @param actualTableName actual table name + * @return logic table name + */ + public LogicTableName getLogicTableName(final String actualTableName) { + return tableNameMap.get(new ActualTableName(actualTableName)); + } + + /** + * Whether contains table. + * + * @param actualTableName actual table name + * @return contains or not + */ + public boolean containsTable(final String actualTableName) { + return tableNameMap.containsKey(new ActualTableName(actualTableName)); + } +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java index a8fb1156a7c58..b022f32de7227 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java @@ -21,17 +21,16 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType; -import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData; @@ -46,8 +45,8 @@ import org.apache.shardingsphere.data.pipeline.core.exception.IngestException; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm; -import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import javax.sql.DataSource; @@ -102,7 +101,8 @@ protected void runBlocking() { log.info("Ignored because of already finished."); return; } - PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName()); + PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData( + dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName()); try (Connection connection = dataSource.getConnection()) { dump(tableMetaData, connection); } catch (final SQLException ex) { @@ -156,8 +156,7 @@ private String buildInventoryDumpSQL() { if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) { return dumperContext.getQuerySQL(); } - LogicTableName logicTableName = new LogicTableName(dumperContext.getLogicTableName()); - String schemaName = dumperContext.getSchemaName(logicTableName); + String schemaName = dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); if (!dumperContext.hasUniqueKey()) { return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName()); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java index 6656cdcf9a22e..f1e46de7beec7 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java @@ -21,7 +21,6 @@ import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext; -import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException; @@ -52,7 +51,7 @@ public final class InventoryRecordsCountCalculator { * @throws SplitPipelineJobByUniqueKeyException if there's exception from database */ public static long getTableRecordsCount(final InventoryDumperContext dumperContext, final PipelineDataSourceWrapper dataSource) { - String schemaName = dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())); + String schemaName = dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); String actualTableName = dumperContext.getActualTableName(); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(dataSource.getDatabaseType()); Optional sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java index 2f1c6954860ba..971593929183d 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java @@ -20,19 +20,18 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Range; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; -import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineReadConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext; import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; -import org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition; import org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition; +import org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition; import org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.StringPrimaryKeyPosition; import org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.UnsupportedKeyPosition; import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; @@ -111,7 +110,7 @@ public Collection splitInventoryDumperContext(final Inve private Collection splitByTable(final InventoryDumperContext dumperContext) { Collection result = new LinkedList<>(); - dumperContext.getTableNameMap().forEach((key, value) -> { + dumperContext.getTableNameMapper().getTableNameMap().forEach((key, value) -> { InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext); // use original table name, for metadata loader, since some database table name case-sensitive inventoryDumperContext.setActualTableName(key.getOriginal()); @@ -127,7 +126,7 @@ private Collection splitByTable(final InventoryDumperCon private Collection splitByPrimaryKey(final InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) { if (null == dumperContext.getUniqueKeyColumns()) { - String schemaName = dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())); + String schemaName = dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); String actualTableName = dumperContext.getActualTableName(); List uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, jobItemContext.getSourceMetaDataLoader()); dumperContext.setUniqueKeyColumns(uniqueKeyColumns); @@ -205,7 +204,7 @@ private Range getUniqueKeyValuesRange(final InventoryIncrementalJobItemCon String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName(); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType()); String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL( - dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName(), uniqueKey); + dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName(), uniqueKey); try ( Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement(); diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index 84cf56db027a3..79dab1ac4bcdd 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -19,18 +19,17 @@ import com.google.common.base.Preconditions; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration; import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData; @@ -132,7 +131,7 @@ private List handleEvent(final AbstractBinlogEvent event) { return Collections.singletonList(createPlaceholderRecord(event)); } AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event; - if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.containsTable(rowsEvent.getTableName())) { + if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.getTableNameMapper().containsTable(rowsEvent.getTableName())) { return Collections.singletonList(createPlaceholderRecord(event)); } PipelineTableMetaData tableMetaData = getPipelineTableMetaData(rowsEvent.getTableName()); @@ -155,7 +154,7 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractBinlogEvent even } private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) { - return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new ActualTableName(actualTableName)), actualTableName); + return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(actualTableName), actualTableName); } private List handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) { @@ -216,7 +215,7 @@ private Serializable handleValue(final PipelineColumnMetaData columnMetaData, fi } private DataRecord createDataRecord(final String type, final AbstractRowsEvent rowsEvent, final int columnCount) { - String tableName = dumperContext.getLogicTableName(rowsEvent.getTableName()).getOriginal(); + String tableName = dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal(); IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()); DataRecord result = new DataRecord(type, tableName, position, columnCount); result.setActualTableName(rowsEvent.getTableName()); diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java index adfdf05966693..bb2b32b938d4c 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java @@ -22,7 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData; @@ -80,7 +79,7 @@ public Record convert(final AbstractWALEvent event) { private boolean filter(final AbstractWALEvent event) { if (event instanceof AbstractRowEvent) { AbstractRowEvent rowEvent = (AbstractRowEvent) event; - return !dumperContext.containsTable(rowEvent.getTableName()); + return !dumperContext.getTableNameMapper().containsTable(rowEvent.getTableName()); } return false; } @@ -90,7 +89,7 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractWALEvent event) } private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) { - return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new ActualTableName(actualTableName)), actualTableName); + return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(actualTableName), actualTableName); } private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) { @@ -117,7 +116,7 @@ private DataRecord handleDeleteRowEvent(final DeleteRowEvent event, final Pipeli } private DataRecord createDataRecord(final String type, final AbstractRowEvent rowsEvent, final int columnCount) { - String tableName = dumperContext.getLogicTableName(rowsEvent.getTableName()).getOriginal(); + String tableName = dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal(); DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(), tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount); result.setActualTableName(rowsEvent.getTableName()); result.setCsn(rowsEvent.getCsn()); From 13c3222155c110ea1dbd3ce27d89003c32b3cf82 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 4 Nov 2023 21:04:54 +0800 Subject: [PATCH 4/7] Add ActualAndLogicTableNameMapper --- .../api/ingest/dumper/context/InventoryDumperContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java index d72f07c7eafa6..e96375d8c6ed0 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java @@ -54,7 +54,7 @@ public final class InventoryDumperContext extends DumperCommonContext { public InventoryDumperContext(final DumperCommonContext dumperContext) { setDataSourceName(dumperContext.getDataSourceName()); setDataSourceConfig(dumperContext.getDataSourceConfig()); - setTableNameMap(dumperContext.getTableNameMap()); + setTableNameMapper(dumperContext.getTableNameMapper()); setTableAndSchemaNameMapper(dumperContext.getTableAndSchemaNameMapper()); } From f4d413f8cd29dd13585a226bb91e0c40bdef28e9 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 4 Nov 2023 21:16:03 +0800 Subject: [PATCH 5/7] Add ActualAndLogicTableNameMapper --- .../ingest/MigrationIncrementalDumperContextCreator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java index 39718dead3623..659b15995eb2e 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest; import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; @@ -52,7 +53,7 @@ private IncrementalDumperContext buildDumperContext(final String jobId, final St result.setJobId(jobId); result.setDataSourceName(dataSourceName); result.setDataSourceConfig(sourceDataSource); - result.setTableNameMap(tableNameMap); + result.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap)); result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); return result; } From 25215d3f8974827ab71fd9a5bb2e90c7b9d87b88 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 4 Nov 2023 21:22:43 +0800 Subject: [PATCH 6/7] Add ActualAndLogicTableNameMapper --- .../pipeline/mysql/ingest/MySQLIncrementalDumperTest.java | 7 ++++--- .../postgresql/ingest/PostgreSQLWALDumperTest.java | 7 ++++--- .../postgresql/ingest/wal/WALEventConverterTest.java | 7 ++++--- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 7 ++++--- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index 5a84c8de293ce..7d6ee11ad2faa 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -17,9 +17,10 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; @@ -91,7 +92,7 @@ void setUp() throws SQLException { private IncrementalDumperContext createDumperContext() { IncrementalDumperContext result = new IncrementalDumperContext(); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root")); - result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); + result.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")))); result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); return result; } diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java index dd06d1ee041b6..6cbb3233bac64 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java @@ -17,9 +17,10 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager; @@ -107,7 +108,7 @@ private IncrementalDumperContext createDumperContext(final String jdbcUrl, final IncrementalDumperContext result = new IncrementalDumperContext(); result.setJobId("0101123456"); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password)); - result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order"))); + result.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order")))); result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); return result; } diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index 0adcb2c1586e6..15895cd370b3b 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -17,9 +17,10 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; @@ -86,7 +87,7 @@ void setUp() throws SQLException { private IncrementalDumperContext mockDumperContext() { IncrementalDumperContext result = new IncrementalDumperContext(); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root")); - result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); + result.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")))); result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); return result; } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 48981568166d2..85f2aea420630 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -21,13 +21,14 @@ import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; @@ -285,7 +286,7 @@ private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jo result.setJobId(jobConfig.getJobId()); result.setDataSourceName(dataSourceName); result.setDataSourceConfig(actualDataSourceConfig); - result.setTableNameMap(tableNameMap); + result.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap)); result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); result.setDecodeWithTX(jobConfig.isDecodeWithTX()); return result; From c67ffd7264c4e011f2f555100ad8d36c67e0a986 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 4 Nov 2023 22:15:46 +0800 Subject: [PATCH 7/7] Refactor DumperCommonContext --- .../api/ingest/dumper/context/DumperCommonContext.java | 10 ---------- .../pipeline/mysql/ingest/MySQLIncrementalDumper.java | 4 +++- .../postgresql/ingest/wal/WALEventConverter.java | 4 +++- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java index 3fc7175ad8929..9166f609df901 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java @@ -42,14 +42,4 @@ public abstract class DumperCommonContext { private TableAndSchemaNameMapper tableAndSchemaNameMapper; private IngestPosition position; - - /** - * Get schema name. - * - * @param actualTableName actual table name - * @return schema name, can be nullable - */ - public String getSchemaName(final String actualTableName) { - return tableAndSchemaNameMapper.getSchemaName(tableNameMapper.getLogicTableName(actualTableName)); - } } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index 79dab1ac4bcdd..6d50d4950a03f 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -30,6 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; +import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData; @@ -154,7 +155,8 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractBinlogEvent even } private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) { - return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(actualTableName), actualTableName); + LogicTableName logicTableName = dumperContext.getTableNameMapper().getLogicTableName(actualTableName); + return metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName); } private List handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) { diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java index bb2b32b938d4c..cc2dca00d8678 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java @@ -22,6 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; +import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData; @@ -89,7 +90,8 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractWALEvent event) } private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) { - return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(actualTableName), actualTableName); + LogicTableName logicTableName = dumperContext.getTableNameMapper().getLogicTableName(actualTableName); + return metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName); } private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) {