From cd151d431e09e3e9fad3e3ad3395b31cbd9ec99e Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 4 Nov 2023 18:48:38 +0800 Subject: [PATCH] Add more constructor on TableNameSchemaNameMapping --- .../api/context/TableNameSchemaNameMapping.java | 7 +++++++ .../context/TableNameSchemaNameMappingTest.java | 9 ++++++++- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 15 ++------------- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java index ea56984a4b2aa..2c6793db5e41b 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java @@ -20,10 +20,12 @@ import lombok.ToString; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.stream.Collectors; /** * Table name and schema name mapping. @@ -37,6 +39,11 @@ public TableNameSchemaNameMapping(final Map tableSchemaMap) { mapping = null == tableSchemaMap ? Collections.emptyMap() : getLogicTableNameMap(tableSchemaMap); } + public TableNameSchemaNameMapping(final Collection tableNames) { + Map tableNameSchemaMap = tableNames.stream().map(each -> each.split("\\.")).filter(split -> split.length > 1).collect(Collectors.toMap(split -> split[1], split -> split[0])); + mapping = getLogicTableNameMap(tableNameSchemaMap); + } + private Map getLogicTableNameMap(final Map tableSchemaMap) { Map result = new HashMap<>(tableSchemaMap.size(), 1F); for (Entry entry : tableSchemaMap.entrySet()) { diff --git a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java index a640ee35df031..7809a3046147f 100644 --- a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java +++ b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java @@ -19,7 +19,9 @@ import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; +import java.util.Map; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -30,7 +32,7 @@ class TableNameSchemaNameMappingTest { @Test void assertConstructFromNull() { - assertDoesNotThrow(() -> new TableNameSchemaNameMapping(null)); + assertDoesNotThrow(() -> new TableNameSchemaNameMapping((Map) null)); } @Test @@ -42,4 +44,9 @@ void assertConstructFromValueNullMap() { void assertConstructFromMap() { assertThat(new TableNameSchemaNameMapping(Collections.singletonMap("t_order", "public")).getSchemaName("t_order"), is("public")); } + + @Test + void assertConstructFromCollection() { + assertThat(new TableNameSchemaNameMapping(Arrays.asList("public.t_order", "t_order_item")).getSchemaName("t_order"), is("public")); + } } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 23035491e14f7..a8692217743c1 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -177,7 +177,7 @@ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) { if (getJobItemProgress(jobId, i).isPresent()) { continue; } - IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames())); + IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, new TableNameSchemaNameMapping(jobConfig.getSchemaTableNames())); InventoryIncrementalJobItemProgress jobItemProgress = getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext); PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress( jobId, i, YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress))); @@ -267,7 +267,7 @@ protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) { @Override public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) { CDCJobConfiguration jobConfig = (CDCJobConfiguration) pipelineJobConfig; - TableNameSchemaNameMapping tableNameSchemaNameMapping = getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames()); + TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(jobConfig.getSchemaTableNames()); IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, jobShardingItem, tableNameSchemaNameMapping); ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, jobConfig.getSchemaTableNames(), tableNameSchemaNameMapping); CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, importerConfig); @@ -275,17 +275,6 @@ public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguratio return result; } - private TableNameSchemaNameMapping getTableNameSchemaNameMapping(final Collection tableNames) { - Map tableNameSchemaMap = new LinkedHashMap<>(); - for (String each : tableNames) { - String[] split = each.split("\\."); - if (split.length > 1) { - tableNameSchemaMap.put(split[1], split[0]); - } - } - return new TableNameSchemaNameMapping(tableNameSchemaMap); - } - private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableNameSchemaNameMapping tableNameSchemaNameMapping) { JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem); Map tableNameMap = new LinkedHashMap<>();