From 25215d3f8974827ab71fd9a5bb2e90c7b9d87b88 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 4 Nov 2023 21:22:43 +0800 Subject: [PATCH] Add ActualAndLogicTableNameMapper --- .../pipeline/mysql/ingest/MySQLIncrementalDumperTest.java | 7 ++++--- .../postgresql/ingest/PostgreSQLWALDumperTest.java | 7 ++++--- .../postgresql/ingest/wal/WALEventConverterTest.java | 7 ++++--- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 7 ++++--- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index 5a84c8de293ce..7d6ee11ad2faa 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -17,9 +17,10 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; @@ -91,7 +92,7 @@ void setUp() throws SQLException { private IncrementalDumperContext createDumperContext() { IncrementalDumperContext result = new IncrementalDumperContext(); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root")); - result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); + result.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")))); result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); return result; } diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java index dd06d1ee041b6..6cbb3233bac64 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java @@ -17,9 +17,10 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager; @@ -107,7 +108,7 @@ private IncrementalDumperContext createDumperContext(final String jdbcUrl, final IncrementalDumperContext result = new IncrementalDumperContext(); result.setJobId("0101123456"); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password)); - result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order"))); + result.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order")))); result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); return result; } diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index 0adcb2c1586e6..15895cd370b3b 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -17,9 +17,10 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; @@ -86,7 +87,7 @@ void setUp() throws SQLException { private IncrementalDumperContext mockDumperContext() { IncrementalDumperContext result = new IncrementalDumperContext(); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root")); - result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); + result.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")))); result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); return result; } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 48981568166d2..85f2aea420630 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -21,13 +21,14 @@ import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; @@ -285,7 +286,7 @@ private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jo result.setJobId(jobConfig.getJobId()); result.setDataSourceName(dataSourceName); result.setDataSourceConfig(actualDataSourceConfig); - result.setTableNameMap(tableNameMap); + result.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap)); result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); result.setDecodeWithTX(jobConfig.isDecodeWithTX()); return result;