From 8e63ce6726863acfde0de296c759662ab9ca3a1a Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 5 Nov 2023 22:16:39 +0800 Subject: [PATCH] Refactor IncrementalDumperContext --- .../ingest/dumper/context/IncrementalDumperContext.java | 6 ++---- .../pipeline/mysql/ingest/MySQLIncrementalDumperTest.java | 2 +- .../postgresql/ingest/PostgreSQLWALDumperTest.java | 4 +--- .../postgresql/ingest/wal/WALEventConverterTest.java | 2 +- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 8 +++----- .../ingest/MigrationIncrementalDumperContextCreator.java | 5 ++--- 6 files changed, 10 insertions(+), 17 deletions(-) diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java index 4b7697c7dfaf6..4bcb2ce0d4613 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java @@ -19,7 +19,6 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import lombok.Setter; import lombok.ToString; /** @@ -27,13 +26,12 @@ */ @RequiredArgsConstructor @Getter -@Setter @ToString public final class IncrementalDumperContext { private final DumperCommonContext commonContext; - private String jobId; + private final String jobId; - private boolean decodeWithTX; + private final boolean decodeWithTX; } diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index 589c7513578ce..bd8ee875b1327 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -101,7 +101,7 @@ private IncrementalDumperContext createDumperContext() { new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root"), new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))), new TableAndSchemaNameMapper(Collections.emptyMap())); - return new IncrementalDumperContext(commonContext); + return new IncrementalDumperContext(commonContext, null, false); } private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException { diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java index efa87a96409d1..a6a90a7981774 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java @@ -111,9 +111,7 @@ private IncrementalDumperContext createDumperContext(final String jdbcUrl, final new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password), new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order"))), new TableAndSchemaNameMapper(Collections.emptyMap())); - IncrementalDumperContext result = new IncrementalDumperContext(commonContext); - result.setJobId("0101123456"); - return result; + return new IncrementalDumperContext(commonContext, "0101123456", false); } @AfterEach diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index cdda6d2d11da7..9306159d72b5e 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -90,7 +90,7 @@ private IncrementalDumperContext mockDumperContext() { new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"), new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))), new TableAndSchemaNameMapper(Collections.emptyMap())); - return new IncrementalDumperContext(commonContext); + return new IncrementalDumperContext(commonContext, null, false); } private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException { diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 4232b335bf32c..3b96545fde320 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -283,11 +283,9 @@ private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jo StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName); Map tableNameMap = new LinkedHashMap<>(); dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName())))); - IncrementalDumperContext result = new IncrementalDumperContext( - new DumperCommonContext(dataSourceName, actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap), tableAndSchemaNameMapper)); - result.setJobId(jobConfig.getJobId()); - result.setDecodeWithTX(jobConfig.isDecodeWithTX()); - return result; + return new IncrementalDumperContext( + new DumperCommonContext(dataSourceName, actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap), tableAndSchemaNameMapper), + jobConfig.getJobId(), jobConfig.isDecodeWithTX()); } private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, final Collection schemaTableNames, diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java index f3819d82ff5c3..a5ee57453347c 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java @@ -40,8 +40,7 @@ public IncrementalDumperContext createDumperContext(final JobDataNodeLine jobDat String dataSourceName = jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName(); ActualAndLogicTableNameMapper tableNameMapper = new ActualAndLogicTableNameMapper(JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine)); TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); - IncrementalDumperContext result = new IncrementalDumperContext(new DumperCommonContext(dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMapper, tableAndSchemaNameMapper)); - result.setJobId(jobConfig.getJobId()); - return result; + return new IncrementalDumperContext( + new DumperCommonContext(dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMapper, tableAndSchemaNameMapper), jobConfig.getJobId(), false); } }