Skip to content

Commit

Permalink
Add more constructor on TableNameSchemaNameMapping
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 4, 2023
1 parent 6887a49 commit cd151d4
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

/**
* Table name and schema name mapping.
Expand All @@ -37,6 +39,11 @@ public TableNameSchemaNameMapping(final Map<String, String> tableSchemaMap) {
mapping = null == tableSchemaMap ? Collections.emptyMap() : getLogicTableNameMap(tableSchemaMap);
}

public TableNameSchemaNameMapping(final Collection<String> tableNames) {
Map<String, String> tableNameSchemaMap = tableNames.stream().map(each -> each.split("\\.")).filter(split -> split.length > 1).collect(Collectors.toMap(split -> split[1], split -> split[0]));
mapping = getLogicTableNameMap(tableNameSchemaMap);
}

private Map<LogicTableName, String> getLogicTableNameMap(final Map<String, String> tableSchemaMap) {
Map<LogicTableName, String> result = new HashMap<>(tableSchemaMap.size(), 1F);
for (Entry<String, String> entry : tableSchemaMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -30,7 +32,7 @@ class TableNameSchemaNameMappingTest {

@Test
void assertConstructFromNull() {
assertDoesNotThrow(() -> new TableNameSchemaNameMapping(null));
assertDoesNotThrow(() -> new TableNameSchemaNameMapping((Map<String, String>) null));
}

@Test
Expand All @@ -42,4 +44,9 @@ void assertConstructFromValueNullMap() {
void assertConstructFromMap() {
assertThat(new TableNameSchemaNameMapping(Collections.singletonMap("t_order", "public")).getSchemaName("t_order"), is("public"));
}

@Test
void assertConstructFromCollection() {
assertThat(new TableNameSchemaNameMapping(Arrays.asList("public.t_order", "t_order_item")).getSchemaName("t_order"), is("public"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
if (getJobItemProgress(jobId, i).isPresent()) {
continue;
}
IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames()));
IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, new TableNameSchemaNameMapping(jobConfig.getSchemaTableNames()));
InventoryIncrementalJobItemProgress jobItemProgress = getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(
jobId, i, YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
Expand Down Expand Up @@ -267,25 +267,14 @@ protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
@Override
public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
CDCJobConfiguration jobConfig = (CDCJobConfiguration) pipelineJobConfig;
TableNameSchemaNameMapping tableNameSchemaNameMapping = getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames());
TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(jobConfig.getSchemaTableNames());
IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, jobShardingItem, tableNameSchemaNameMapping);
ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, jobConfig.getSchemaTableNames(), tableNameSchemaNameMapping);
CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, importerConfig);
log.debug("buildTaskConfiguration, result={}", result);
return result;
}

private TableNameSchemaNameMapping getTableNameSchemaNameMapping(final Collection<String> tableNames) {
Map<String, String> tableNameSchemaMap = new LinkedHashMap<>();
for (String each : tableNames) {
String[] split = each.split("\\.");
if (split.length > 1) {
tableNameSchemaMap.put(split[1], split[0]);
}
}
return new TableNameSchemaNameMapping(tableNameSchemaMap);
}

private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem);
Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
Expand Down

0 comments on commit cd151d4

Please sign in to comment.