Skip to content

Commit

Permalink
Refactor IncrementalDumperContext
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 5, 2023
1 parent e479923 commit 8e63ce6
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,19 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;

/**
* Incremental dumper context.
*/
@RequiredArgsConstructor
@Getter
@Setter
@ToString
public final class IncrementalDumperContext {

private final DumperCommonContext commonContext;

private String jobId;
private final String jobId;

private boolean decodeWithTX;
private final boolean decodeWithTX;
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private IncrementalDumperContext createDumperContext() {
new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root"),
new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))),
new TableAndSchemaNameMapper(Collections.emptyMap()));
return new IncrementalDumperContext(commonContext);
return new IncrementalDumperContext(commonContext, null, false);
}

private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,7 @@ private IncrementalDumperContext createDumperContext(final String jdbcUrl, final
new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password),
new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order"))),
new TableAndSchemaNameMapper(Collections.emptyMap()));
IncrementalDumperContext result = new IncrementalDumperContext(commonContext);
result.setJobId("0101123456");
return result;
return new IncrementalDumperContext(commonContext, "0101123456", false);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private IncrementalDumperContext mockDumperContext() {
new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"),
new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))),
new TableAndSchemaNameMapper(Collections.emptyMap()));
return new IncrementalDumperContext(commonContext);
return new IncrementalDumperContext(commonContext, null, false);
}

private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,9 @@ private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jo
StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName()))));
IncrementalDumperContext result = new IncrementalDumperContext(
new DumperCommonContext(dataSourceName, actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap), tableAndSchemaNameMapper));
result.setJobId(jobConfig.getJobId());
result.setDecodeWithTX(jobConfig.isDecodeWithTX());
return result;
return new IncrementalDumperContext(
new DumperCommonContext(dataSourceName, actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap), tableAndSchemaNameMapper),
jobConfig.getJobId(), jobConfig.isDecodeWithTX());
}

private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, final Collection<String> schemaTableNames,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public IncrementalDumperContext createDumperContext(final JobDataNodeLine jobDat
String dataSourceName = jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
ActualAndLogicTableNameMapper tableNameMapper = new ActualAndLogicTableNameMapper(JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine));
TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
IncrementalDumperContext result = new IncrementalDumperContext(new DumperCommonContext(dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMapper, tableAndSchemaNameMapper));
result.setJobId(jobConfig.getJobId());
return result;
return new IncrementalDumperContext(
new DumperCommonContext(dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMapper, tableAndSchemaNameMapper), jobConfig.getJobId(), false);
}
}

0 comments on commit 8e63ce6

Please sign in to comment.