From 3ba67fb8d01003d99a39144ae4be032cae2c6142 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 5 Nov 2023 11:22:45 +0800 Subject: [PATCH] Refactor DumperCommonContext --- .../dumper/context/DumperCommonContext.java | 2 +- .../context/IncrementalDumperContext.java | 8 ++++++-- .../context/InventoryDumperContext.java | 17 +++++++++------- .../pipeline/core/dumper/InventoryDumper.java | 17 ++++++++-------- .../InventoryRecordsCountCalculator.java | 2 +- .../core/preparer/InventoryTaskSplitter.java | 16 +++++++-------- .../preparer/PipelineJobPreparerUtils.java | 4 ++-- .../pipeline/core/task/PipelineTaskUtils.java | 2 +- .../mysql/ingest/MySQLIncrementalDumper.java | 13 ++++++------ .../ingest/MySQLIncrementalDumperTest.java | 15 +++++++------- .../opengauss/ingest/OpenGaussWALDumper.java | 4 ++-- .../ingest/PostgreSQLWALDumper.java | 4 ++-- .../ingest/wal/WALEventConverter.java | 8 ++++---- .../ingest/PostgreSQLWALDumperTest.java | 15 ++++++++------ .../ingest/wal/WALEventConverterTest.java | 15 +++++++------- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 14 +++++++------ .../cdc/context/CDCJobItemContext.java | 4 ++-- .../cdc/core/prepare/CDCJobPreparer.java | 20 ++++++++++--------- .../migration/api/impl/MigrationJobAPI.java | 8 +++++--- ...rationIncrementalDumperContextCreator.java | 12 ++++++----- .../context/MigrationJobItemContext.java | 2 +- .../prepare/MigrationJobPreparer.java | 16 ++++++++------- .../GovernanceRepositoryAPIImplTest.java | 4 ++-- .../prepare/InventoryTaskSplitterTest.java | 18 ++++++++--------- .../core/task/IncrementalTaskTest.java | 2 +- .../pipeline/core/task/InventoryTaskTest.java | 9 +++++---- .../MigrationDataConsistencyCheckerTest.java | 2 +- 27 files changed, 139 insertions(+), 114 deletions(-) diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java index 9166f609df901..cdcbf6b0f3f89 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java @@ -31,7 +31,7 @@ @Getter @Setter @ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"}) -public abstract class DumperCommonContext { +public final class DumperCommonContext { private String dataSourceName; diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java index 4ea3cb2c5da93..4b7697c7dfaf6 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java @@ -18,16 +18,20 @@ package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; /** * Incremental dumper context. */ +@RequiredArgsConstructor @Getter @Setter -@ToString(callSuper = true) -public final class IncrementalDumperContext extends DumperCommonContext { +@ToString +public final class IncrementalDumperContext { + + private final DumperCommonContext commonContext; private String jobId; diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java index e96375d8c6ed0..47a7445df82b9 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java @@ -30,8 +30,10 @@ */ @Getter @Setter -@ToString(callSuper = true) -public final class InventoryDumperContext extends DumperCommonContext { +@ToString +public final class InventoryDumperContext { + + private final DumperCommonContext commonContext; private String actualTableName; @@ -51,11 +53,12 @@ public final class InventoryDumperContext extends DumperCommonContext { private JobRateLimitAlgorithm rateLimitAlgorithm; - public InventoryDumperContext(final DumperCommonContext dumperContext) { - setDataSourceName(dumperContext.getDataSourceName()); - setDataSourceConfig(dumperContext.getDataSourceConfig()); - setTableNameMapper(dumperContext.getTableNameMapper()); - setTableAndSchemaNameMapper(dumperContext.getTableAndSchemaNameMapper()); + public InventoryDumperContext(final DumperCommonContext commonContext) { + this.commonContext = new DumperCommonContext(); + commonContext.setDataSourceName(commonContext.getDataSourceName()); + commonContext.setDataSourceConfig(commonContext.getDataSourceConfig()); + commonContext.setTableNameMapper(commonContext.getTableNameMapper()); + commonContext.setTableAndSchemaNameMapper(commonContext.getTableAndSchemaNameMapper()); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java index 36943d542bd69..ebd0f8183eee7 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java @@ -88,7 +88,7 @@ public InventoryDumper(final InventoryDumperContext dumperContext, final Pipelin this.dumperContext = dumperContext; this.channel = channel; this.dataSource = dataSource; - DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType(); + DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType(); inventoryDumpSQLBuilder = new PipelineInventoryDumpSQLBuilder(databaseType); columnValueReaderEngine = new ColumnValueReaderEngine(databaseType); this.metaDataLoader = metaDataLoader; @@ -96,13 +96,13 @@ public InventoryDumper(final InventoryDumperContext dumperContext, final Pipelin @Override protected void runBlocking() { - IngestPosition position = dumperContext.getPosition(); + IngestPosition position = dumperContext.getCommonContext().getPosition(); if (position instanceof FinishedPosition) { log.info("Ignored because of already finished."); return; } PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData( - dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName()); + dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName()); try (Connection connection = dataSource.getConnection()) { dump(tableMetaData, connection); } catch (final SQLException ex) { @@ -114,7 +114,7 @@ protected void runBlocking() { @SuppressWarnings("MagicConstant") private void dump(final PipelineTableMetaData tableMetaData, final Connection connection) throws SQLException { int batchSize = dumperContext.getBatchSize(); - DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType(); + DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType(); if (null != dumperContext.getTransactionIsolation()) { connection.setTransactionIsolation(dumperContext.getTransactionIsolation()); } @@ -156,11 +156,11 @@ private String buildInventoryDumpSQL() { if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) { return dumperContext.getQuerySQL(); } - String schemaName = dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); + String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); if (!dumperContext.hasUniqueKey()) { return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName()); } - PrimaryKeyPosition primaryKeyPosition = (PrimaryKeyPosition) dumperContext.getPosition(); + PrimaryKeyPosition primaryKeyPosition = (PrimaryKeyPosition) dumperContext.getCommonContext().getPosition(); PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0); Collection columnNames = Collections.singleton("*"); if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) { @@ -179,7 +179,7 @@ private void setParameters(final PreparedStatement preparedStatement) throws SQL return; } PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0); - PrimaryKeyPosition position = (PrimaryKeyPosition) dumperContext.getPosition(); + PrimaryKeyPosition position = (PrimaryKeyPosition) dumperContext.getCommonContext().getPosition(); if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && null != position.getBeginValue() && null != position.getEndValue()) { preparedStatement.setObject(1, position.getBeginValue()); preparedStatement.setObject(2, position.getEndValue()); @@ -212,7 +212,8 @@ private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMeta private IngestPosition newPosition(final ResultSet resultSet) throws SQLException { return dumperContext.hasUniqueKey() - ? PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition) dumperContext.getPosition()).getEndValue()) + ? PrimaryKeyPositionFactory.newInstance( + resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition) dumperContext.getCommonContext().getPosition()).getEndValue()) : new PlaceholderPosition(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java index f1e46de7beec7..5610c5bf0d73c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java @@ -51,7 +51,7 @@ public final class InventoryRecordsCountCalculator { * @throws SplitPipelineJobByUniqueKeyException if there's exception from database */ public static long getTableRecordsCount(final InventoryDumperContext dumperContext, final PipelineDataSourceWrapper dataSource) { - String schemaName = dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); + String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); String actualTableName = dumperContext.getActualTableName(); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(dataSource.getDatabaseType()); Optional sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java index 971593929183d..f497583c9f862 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java @@ -83,7 +83,7 @@ public List splitInventoryData(final InventoryIncrementalJobItemC long startTimeMillis = System.currentTimeMillis(); InventoryIncrementalProcessContext processContext = jobItemContext.getJobProcessContext(); for (InventoryDumperContext each : splitInventoryDumperContext(jobItemContext)) { - AtomicReference position = new AtomicReference<>(each.getPosition()); + AtomicReference position = new AtomicReference<>(each.getCommonContext().getPosition()); PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position); Dumper dumper = new InventoryDumper(each, channel, sourceDataSource, jobItemContext.getSourceMetaDataLoader()); Importer importer = new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(), jobItemContext); @@ -110,12 +110,12 @@ public Collection splitInventoryDumperContext(final Inve private Collection splitByTable(final InventoryDumperContext dumperContext) { Collection result = new LinkedList<>(); - dumperContext.getTableNameMapper().getTableNameMap().forEach((key, value) -> { - InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext); + dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().forEach((key, value) -> { + InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext.getCommonContext()); // use original table name, for metadata loader, since some database table name case-sensitive inventoryDumperContext.setActualTableName(key.getOriginal()); inventoryDumperContext.setLogicTableName(value.getOriginal()); - inventoryDumperContext.setPosition(new PlaceholderPosition()); + inventoryDumperContext.getCommonContext().setPosition(new PlaceholderPosition()); inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames()); inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns()); result.add(inventoryDumperContext); @@ -126,7 +126,7 @@ private Collection splitByTable(final InventoryDumperCon private Collection splitByPrimaryKey(final InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) { if (null == dumperContext.getUniqueKeyColumns()) { - String schemaName = dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); + String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); String actualTableName = dumperContext.getActualTableName(); List uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, jobItemContext.getSourceMetaDataLoader()); dumperContext.setUniqueKeyColumns(uniqueKeyColumns); @@ -139,8 +139,8 @@ private Collection splitByPrimaryKey(final InventoryDump Collection inventoryPositions = getInventoryPositions(dumperContext, jobItemContext, dataSource); int i = 0; for (IngestPosition each : inventoryPositions) { - InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext); - splitDumperContext.setPosition(each); + InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext.getCommonContext()); + splitDumperContext.getCommonContext().setPosition(each); splitDumperContext.setShardingItem(i++); splitDumperContext.setActualTableName(dumperContext.getActualTableName()); splitDumperContext.setLogicTableName(dumperContext.getLogicTableName()); @@ -204,7 +204,7 @@ private Range getUniqueKeyValuesRange(final InventoryIncrementalJobItemCon String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName(); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType()); String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL( - dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName(), uniqueKey); + dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()), dumperContext.getActualTableName(), uniqueKey); try ( Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java index 2f38ea4c72c95..e40fa297fe0a1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java @@ -131,8 +131,8 @@ public static IngestPosition getIncrementalPosition(final JobItemIncrementalTask return position.get(); } } - DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType(); - DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); + DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType(); + DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()); return DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperContext.getJobId()); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java index fedb6911f05be..3522cc82bbae2 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java @@ -43,7 +43,7 @@ public final class PipelineTaskUtils { * @return inventory task id */ public static String generateInventoryTaskId(final InventoryDumperContext inventoryDumperContext) { - String result = String.format("%s.%s", inventoryDumperContext.getDataSourceName(), inventoryDumperContext.getActualTableName()); + String result = String.format("%s.%s", inventoryDumperContext.getCommonContext().getDataSourceName(), inventoryDumperContext.getActualTableName()); return result + "#" + inventoryDumperContext.getShardingItem(); } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index ed64a865cf7df..705a02200a8db 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -80,12 +80,13 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl public MySQLIncrementalDumper(final IncrementalDumperContext dumperContext, final IngestPosition binlogPosition, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - Preconditions.checkArgument(dumperContext.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration"); + Preconditions.checkArgument(dumperContext.getCommonContext().getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, + "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration"); this.dumperContext = dumperContext; this.binlogPosition = (BinlogPosition) binlogPosition; this.channel = channel; this.metaDataLoader = metaDataLoader; - YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig()).getJdbcConfig(); + YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()).getJdbcConfig(); ConnectionPropertiesParser parser = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, TypedSPILoader.getService(DatabaseType.class, "MySQL")); ConnectionProperties connectionProps = parser.parse(jdbcConfig.getUrl(), null, null); ConnectInfo connectInfo = new ConnectInfo(generateServerId(), connectionProps.getHostname(), connectionProps.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()); @@ -132,7 +133,7 @@ private List handleEvent(final AbstractBinlogEvent event) { return Collections.singletonList(createPlaceholderRecord(event)); } AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event; - if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.getTableNameMapper().containsTable(rowsEvent.getTableName())) { + if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.getCommonContext().getTableNameMapper().containsTable(rowsEvent.getTableName())) { return Collections.singletonList(createPlaceholderRecord(event)); } PipelineTableMetaData tableMetaData = getPipelineTableMetaData(rowsEvent.getTableName()); @@ -155,8 +156,8 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractBinlogEvent even } private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) { - LogicTableName logicTableName = dumperContext.getTableNameMapper().getLogicTableName(actualTableName); - return metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName); + LogicTableName logicTableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName); + return metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName); } private List handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) { @@ -217,7 +218,7 @@ private Serializable handleValue(final PipelineColumnMetaData columnMetaData, fi } private DataRecord createDataRecord(final String type, final AbstractRowsEvent rowsEvent, final int columnCount) { - String tableName = dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal(); + String tableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal(); IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()); DataRecord result = new DataRecord(type, tableName, position, columnCount); result.setActualTableName(rowsEvent.getTableName()); diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index 7d6ee11ad2faa..5ed2365d713ef 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; @@ -81,7 +82,7 @@ class MySQLIncrementalDumperTest { void setUp() throws SQLException { IncrementalDumperContext dumperContext = createDumperContext(); initTableData(dumperContext); - dumperContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root")); + dumperContext.getCommonContext().setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root")); PipelineTableMetaDataLoader metaDataLoader = mock(PipelineTableMetaDataLoader.class); SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(10000, new EmptyAckCallback()); incrementalDumper = new MySQLIncrementalDumper(dumperContext, new BinlogPosition("binlog-000001", 4L, 0L), channel, metaDataLoader); @@ -90,17 +91,17 @@ void setUp() throws SQLException { } private IncrementalDumperContext createDumperContext() { - IncrementalDumperContext result = new IncrementalDumperContext(); - result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root")); - result.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")))); - result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); - return result; + DumperCommonContext commonContext = new DumperCommonContext(); + commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root")); + commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")))); + commonContext.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); + return new IncrementalDumperContext(commonContext); } private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException { try ( PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); - PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); + PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()); Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { statement.execute("DROP TABLE IF EXISTS t_order"); diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java index dbc2a573ba66a..dfa912c4a0e0f 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java @@ -74,7 +74,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen public OpenGaussWALDumper(final IncrementalDumperContext dumperContext, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getDataSourceConfig().getClass()), + ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()), () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration")); this.dumperContext = dumperContext; walPosition = new AtomicReference<>((WALPosition) position); @@ -137,7 +137,7 @@ private void dump() throws SQLException { } private PgConnection getReplicationConnectionUnwrap() throws SQLException { - return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig()).unwrap(PgConnection.class); + return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()).unwrap(PgConnection.class); } private void processEventWithTX(final AbstractWALEvent event) { diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java index 23a0d808b089a..b23686cbe4e6b 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java @@ -76,7 +76,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme public PostgreSQLWALDumper(final IncrementalDumperContext dumperContext, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getDataSourceConfig().getClass()), + ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()), () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration")); this.dumperContext = dumperContext; walPosition = new AtomicReference<>((WALPosition) position); @@ -111,7 +111,7 @@ protected void runBlocking() { private void dump() throws SQLException { // TODO use unified PgConnection try ( - Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig()); + Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()); PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLPositionInitializer.getUniqueSlotName(connection, dumperContext.getJobId()), walPosition.get().getLogSequenceNumber())) { PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils()); diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java index cc2dca00d8678..389fb0e459318 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java @@ -80,7 +80,7 @@ public Record convert(final AbstractWALEvent event) { private boolean filter(final AbstractWALEvent event) { if (event instanceof AbstractRowEvent) { AbstractRowEvent rowEvent = (AbstractRowEvent) event; - return !dumperContext.getTableNameMapper().containsTable(rowEvent.getTableName()); + return !dumperContext.getCommonContext().getTableNameMapper().containsTable(rowEvent.getTableName()); } return false; } @@ -90,8 +90,8 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractWALEvent event) } private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) { - LogicTableName logicTableName = dumperContext.getTableNameMapper().getLogicTableName(actualTableName); - return metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName); + LogicTableName logicTableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName); + return metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName); } private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) { @@ -118,7 +118,7 @@ private DataRecord handleDeleteRowEvent(final DeleteRowEvent event, final Pipeli } private DataRecord createDataRecord(final String type, final AbstractRowEvent rowsEvent, final int columnCount) { - String tableName = dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal(); + String tableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal(); DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(), tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount); result.setActualTableName(rowsEvent.getTableName()); result.setCsn(rowsEvent.getCsn()); diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java index 6cbb3233bac64..4a033b76e6304 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; @@ -90,7 +91,8 @@ void setUp() { String password = "root"; createTable(jdbcUrl, username, password); dumperContext = createDumperContext(jdbcUrl, username, password); - walDumper = new PostgreSQLWALDumper(dumperContext, position, channel, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()))); + walDumper = new PostgreSQLWALDumper(dumperContext, position, channel, + new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))); } private void createTable(final String jdbcUrl, final String username, final String password) { @@ -105,11 +107,12 @@ private void createTable(final String jdbcUrl, final String username, final Stri } private IncrementalDumperContext createDumperContext(final String jdbcUrl, final String username, final String password) { - IncrementalDumperContext result = new IncrementalDumperContext(); + DumperCommonContext commonContext = new DumperCommonContext(); + commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password)); + commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order")))); + commonContext.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); + IncrementalDumperContext result = new IncrementalDumperContext(commonContext); result.setJobId("0101123456"); - result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password)); - result.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order")))); - result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); return result; } @@ -120,7 +123,7 @@ void tearDown() { @Test void assertStart() throws SQLException, ReflectiveOperationException { - StandardPipelineDataSourceConfiguration dataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig(); + StandardPipelineDataSourceConfiguration dataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig(); try { Plugins.getMemberAccessor().set(PostgreSQLWALDumper.class.getDeclaredField("logicalReplication"), walDumper, logicalReplication); when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection); diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index 15895cd370b3b..29c625de0a7d4 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; @@ -79,23 +80,23 @@ class WALEventConverterTest { void setUp() throws SQLException { IncrementalDumperContext dumperContext = mockDumperContext(); PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); - walEventConverter = new WALEventConverter(dumperContext, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()))); + walEventConverter = new WALEventConverter(dumperContext, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))); initTableData(dumperContext); pipelineTableMetaData = new PipelineTableMetaData("t_order", mockOrderColumnsMetaDataMap(), Collections.emptyList()); } private IncrementalDumperContext mockDumperContext() { - IncrementalDumperContext result = new IncrementalDumperContext(); - result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root")); - result.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")))); - result.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); - return result; + DumperCommonContext commonContext = new DumperCommonContext(); + commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root")); + commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")))); + commonContext.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); + return new IncrementalDumperContext(commonContext); } private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException { try ( PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); - PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); + PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()); Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { statement.execute("DROP TABLE IF EXISTS t_order"); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 85f2aea420630..d25feef3a3b88 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -26,6 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; @@ -193,7 +194,7 @@ private static InventoryIncrementalJobItemProgress getInventoryIncrementalJobIte final IncrementalDumperContext incrementalDumperContext) throws SQLException { InventoryIncrementalJobItemProgress result = new InventoryIncrementalJobItemProgress(); result.setSourceDatabaseType(jobConfig.getSourceDatabaseType()); - result.setDataSourceName(incrementalDumperContext.getDataSourceName()); + result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName()); IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, incrementalDumperContext, dataSourceManager)); result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress)); return result; @@ -282,12 +283,13 @@ private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jo dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName())))); String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName(); StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName); - IncrementalDumperContext result = new IncrementalDumperContext(); + DumperCommonContext commonContext = new DumperCommonContext(); + commonContext.setDataSourceName(dataSourceName); + commonContext.setDataSourceConfig(actualDataSourceConfig); + commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap)); + commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); + IncrementalDumperContext result = new IncrementalDumperContext(commonContext); result.setJobId(jobConfig.getJobId()); - result.setDataSourceName(dataSourceName); - result.setDataSourceConfig(actualDataSourceConfig); - result.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap)); - result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); result.setDecodeWithTX(jobConfig.isDecodeWithTX()); return result; } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java index 23bc51e1f51e9..47e6acba40cbb 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java @@ -80,7 +80,7 @@ public final class CDCJobItemContext implements InventoryIncrementalJobItemConte @Override protected PipelineDataSourceWrapper initialize() { - return dataSourceManager.getDataSource(taskConfig.getDumperContext().getDataSourceConfig()); + return dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig()); } }; @@ -99,7 +99,7 @@ public String getJobId() { @Override public String getDataSourceName() { - return taskConfig.getDumperContext().getDataSourceName(); + return taskConfig.getDumperContext().getCommonContext().getDataSourceName(); } @Override diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index 0cf3ea7d135b0..3d0f982de5981 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -104,7 +104,8 @@ private void initIncrementalPosition(final CDCJobItemContext jobItemContext) { CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental(); try { - taskConfig.getDumperContext().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager())); + taskConfig.getDumperContext().getCommonContext().setPosition( + PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager())); } catch (final SQLException ex) { throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex); } @@ -115,19 +116,19 @@ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final At CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); ImporterConfiguration importerConfig = taskConfig.getImporterConfig(); CDCProcessContext processContext = jobItemContext.getJobProcessContext(); - for (InventoryDumperContext each : new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new InventoryDumperContext(taskConfig.getDumperContext()), importerConfig) + for (InventoryDumperContext each : new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()), importerConfig) .splitInventoryDumperContext(jobItemContext)) { - AtomicReference position = new AtomicReference<>(each.getPosition()); + AtomicReference position = new AtomicReference<>(each.getCommonContext().getPosition()); PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position); channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext)); Dumper dumper = new InventoryDumper(each, channel, jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader()); Importer importer = importerUsed.get() ? null : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(), - needSorting(ImporterType.INVENTORY, hasGlobalCSN(taskConfig.getDumperContext().getDataSourceConfig().getDatabaseType())), + needSorting(ImporterType.INVENTORY, hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())), importerConfig.getRateLimitAlgorithm()); jobItemContext.getInventoryTasks().add(new CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(), processContext.getInventoryImporterExecuteEngine(), dumper, importer, position)); - if (!(each.getPosition() instanceof FinishedPosition)) { + if (!(each.getCommonContext().getPosition() instanceof FinishedPosition)) { importerUsed.set(true); } } @@ -146,16 +147,17 @@ private void initIncrementalTask(final CDCJobItemContext jobItemContext, final A CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); IncrementalDumperContext dumperContext = taskConfig.getDumperContext(); ImporterConfiguration importerConfig = taskConfig.getImporterConfig(); - IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getPosition(), jobItemContext.getInitProgress()); + IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress()); PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), jobItemContext.getJobProcessContext().getPipelineChannelCreator(), taskProgress); channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext)); - Dumper dumper = DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, dumperContext.getDataSourceConfig().getDatabaseType()) - .createIncrementalDumper(dumperContext, dumperContext.getPosition(), channel, jobItemContext.getSourceMetaDataLoader()); + Dumper dumper = DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()) + .createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader()); boolean needSorting = needSorting(ImporterType.INCREMENTAL, hasGlobalCSN(importerConfig.getDataSourceConfig().getDatabaseType())); Importer importer = importerUsed.get() ? null : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 300, TimeUnit.MILLISECONDS, jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm()); - PipelineTask incrementalTask = new CDCIncrementalTask(dumperContext.getDataSourceName(), jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, importer, taskProgress); + PipelineTask incrementalTask = new CDCIncrementalTask( + dumperContext.getCommonContext().getDataSourceName(), jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, importer, taskProgress); jobItemContext.getIncrementalTasks().add(incrementalTask); importerUsed.set(true); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 0be975696eda6..3a6447fa4fa1d 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -264,12 +264,14 @@ public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfig MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig; IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator( jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); - CreateTableConfiguration createTableConfig = buildCreateTableConfiguration(jobConfig, incrementalDumperContext.getTableAndSchemaNameMapper()); + CreateTableConfiguration createTableConfig = buildCreateTableConfiguration(jobConfig, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); Set targetTableNames = jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet()); Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); - ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, shardingColumnsMap, incrementalDumperContext.getTableAndSchemaNameMapper()); - MigrationTaskConfiguration result = new MigrationTaskConfiguration(incrementalDumperContext.getDataSourceName(), createTableConfig, incrementalDumperContext, importerConfig); + ImporterConfiguration importerConfig = buildImporterConfiguration( + jobConfig, pipelineProcessConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); + MigrationTaskConfiguration result = new MigrationTaskConfiguration( + incrementalDumperContext.getCommonContext().getDataSourceName(), createTableConfig, incrementalDumperContext, importerConfig); log.info("buildTaskConfiguration, result={}", result); return result; } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java index 659b15995eb2e..764b22dec926d 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest; import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; @@ -49,12 +50,13 @@ public IncrementalDumperContext createDumperContext(final JobDataNodeLine jobDat private IncrementalDumperContext buildDumperContext(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource, final Map tableNameMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { - IncrementalDumperContext result = new IncrementalDumperContext(); + DumperCommonContext commonContext = new DumperCommonContext(); + commonContext.setDataSourceName(dataSourceName); + commonContext.setDataSourceConfig(sourceDataSource); + commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap)); + commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); + IncrementalDumperContext result = new IncrementalDumperContext(commonContext); result.setJobId(jobId); - result.setDataSourceName(dataSourceName); - result.setDataSourceConfig(sourceDataSource); - result.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap)); - result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); return result; } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java index 1fe9a79b3d9f0..a4eaa8330c792 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java @@ -80,7 +80,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte @Override protected PipelineDataSourceWrapper initialize() { - return dataSourceManager.getDataSource(taskConfig.getDumperContext().getDataSourceConfig()); + return dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig()); } }; diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java index c882dd924a184..a017a3559d26b 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java @@ -88,7 +88,8 @@ public final class MigrationJobPreparer { * @throws SQLException SQL exception */ public void prepare(final MigrationJobItemContext jobItemContext) throws SQLException { - ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(jobItemContext.getTaskConfig().getDumperContext().getDataSourceConfig().getClass()), + ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals( + jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()), () -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration")); PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(), Collections.singleton(jobItemContext.getSourceDataSource())); if (jobItemContext.isStopping()) { @@ -169,14 +170,15 @@ private void prepareIncremental(final MigrationJobItemContext jobItemContext) { MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental(); try { - taskConfig.getDumperContext().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager())); + taskConfig.getDumperContext().getCommonContext().setPosition( + PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager())); } catch (final SQLException ex) { throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex); } } private void initInventoryTasks(final MigrationJobItemContext jobItemContext) { - InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext()); + InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext().getCommonContext()); InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperContext, jobItemContext.getTaskConfig().getImporterConfig()); jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext)); } @@ -188,12 +190,12 @@ private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) IncrementalDumperContext dumperContext = taskConfig.getDumperContext(); ImporterConfiguration importerConfig = taskConfig.getImporterConfig(); ExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(); - IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getPosition(), jobItemContext.getInitProgress()); + IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress()); PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), pipelineChannelCreator, taskProgress); - Dumper dumper = DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, dumperContext.getDataSourceConfig().getDatabaseType()) - .createIncrementalDumper(dumperContext, dumperContext.getPosition(), channel, sourceMetaDataLoader); + Dumper dumper = DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()) + .createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader); Collection importers = createImporters(importerConfig, jobItemContext.getSink(), channel, jobItemContext); - PipelineTask incrementalTask = new IncrementalTask(dumperContext.getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress); + PipelineTask incrementalTask = new IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress); jobItemContext.getIncrementalTasks().add(incrementalTask); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java index 74ddfef8e14b4..7229363eb7e28 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java @@ -177,8 +177,8 @@ private MigrationJobItemContext mockJobItemContext() { } private InventoryTask mockInventoryTask(final MigrationTaskConfiguration taskConfig) { - InventoryDumperContext dumperContext = new InventoryDumperContext(taskConfig.getDumperContext()); - dumperContext.setPosition(new PlaceholderPosition()); + InventoryDumperContext dumperContext = new InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()); + dumperContext.getCommonContext().setPosition(new PlaceholderPosition()); dumperContext.setActualTableName("t_order"); dumperContext.setLogicTableName("t_order"); dumperContext.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData())); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java index 6a0105b71f3a4..9c1f051695a51 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java @@ -66,7 +66,7 @@ static void beforeClass() { @BeforeEach void setUp() { initJobItemContext(); - dumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext()); + dumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext().getCommonContext()); PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(1, "order_id", Types.INTEGER, "int", false, true, true); dumperContext.setUniqueKeyColumns(Collections.singletonList(columnMetaData)); inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), dumperContext, jobItemContext.getTaskConfig().getImporterConfig()); @@ -85,7 +85,7 @@ void tearDown() { @Test void assertSplitInventoryDataWithEmptyTable() throws SQLException { - initEmptyTablePrimaryEnvironment(dumperContext); + initEmptyTablePrimaryEnvironment(dumperContext.getCommonContext()); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); assertThat(actual.size(), is(1)); InventoryTask task = actual.get(0); @@ -95,7 +95,7 @@ void assertSplitInventoryDataWithEmptyTable() throws SQLException { @Test void assertSplitInventoryDataWithIntPrimary() throws SQLException { - initIntPrimaryEnvironment(dumperContext); + initIntPrimaryEnvironment(dumperContext.getCommonContext()); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); assertThat(actual.size(), is(10)); InventoryTask task = actual.get(9); @@ -105,7 +105,7 @@ void assertSplitInventoryDataWithIntPrimary() throws SQLException { @Test void assertSplitInventoryDataWithCharPrimary() throws SQLException { - initCharPrimaryEnvironment(dumperContext); + initCharPrimaryEnvironment(dumperContext.getCommonContext()); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); assertThat(actual.size(), is(1)); assertThat(actual.get(0).getTaskId(), is("ds_0.t_order#0")); @@ -116,15 +116,15 @@ void assertSplitInventoryDataWithCharPrimary() throws SQLException { @Test void assertSplitInventoryDataWithoutPrimaryButWithUniqueIndex() throws SQLException { - initUniqueIndexOnNotNullColumnEnvironment(dumperContext); + initUniqueIndexOnNotNullColumnEnvironment(dumperContext.getCommonContext()); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); assertThat(actual.size(), is(1)); } @Test void assertSplitInventoryDataWithMultipleColumnsKey() throws SQLException { - initUnionPrimaryEnvironment(dumperContext); - try (PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())) { + initUnionPrimaryEnvironment(dumperContext.getCommonContext()); + try (PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig())) { List uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(null, "t_order", new StandardPipelineTableMetaDataLoader(dataSource)); dumperContext.setUniqueKeyColumns(uniqueKeyColumns); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); @@ -134,8 +134,8 @@ void assertSplitInventoryDataWithMultipleColumnsKey() throws SQLException { @Test void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws SQLException { - initNoPrimaryEnvironment(dumperContext); - try (PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())) { + initNoPrimaryEnvironment(dumperContext.getCommonContext()); + try (PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig())) { List uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(null, "t_order", new StandardPipelineTableMetaDataLoader(dataSource)); assertTrue(uniqueKeyColumns.isEmpty()); List inventoryTasks = inventoryTaskSplitter.splitInventoryData(jobItemContext); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java index 0e70960365af5..eb42b7d5b8e44 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java @@ -54,7 +54,7 @@ static void beforeClass() { @BeforeEach void setUp() { MigrationTaskConfiguration taskConfig = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig(); - taskConfig.getDumperContext().setPosition(new PlaceholderPosition()); + taskConfig.getDumperContext().getCommonContext().setPosition(new PlaceholderPosition()); incrementalTask = new IncrementalTask("ds_0", PipelineContextUtils.getExecuteEngine(), mock(Dumper.class), Collections.singletonList(mock(Importer.class)), new IncrementalTaskProgress(new PlaceholderPosition())); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java index e075cd57c086a..7291ad65752be 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java @@ -76,7 +76,7 @@ void assertGetProgress() throws SQLException, ExecutionException, InterruptedExc initTableData(taskConfig.getDumperContext()); // TODO use t_order_0, and also others InventoryDumperContext inventoryDumperContext = createInventoryDumperContext("t_order", "t_order"); - AtomicReference position = new AtomicReference<>(inventoryDumperContext.getPosition()); + AtomicReference position = new AtomicReference<>(inventoryDumperContext.getCommonContext().getPosition()); InventoryTask inventoryTask = new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperContext), PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), mock(Dumper.class), mock(Importer.class), position); CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS); @@ -87,7 +87,7 @@ void assertGetProgress() throws SQLException, ExecutionException, InterruptedExc private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException { PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); try ( - PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); + PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()); Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { statement.execute("DROP TABLE IF EXISTS t_order"); @@ -98,11 +98,12 @@ private void initTableData(final IncrementalDumperContext dumperContext) throws } private InventoryDumperContext createInventoryDumperContext(final String logicTableName, final String actualTableName) { - InventoryDumperContext result = new InventoryDumperContext(taskConfig.getDumperContext()); + InventoryDumperContext result = new InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()); result.setLogicTableName(logicTableName); result.setActualTableName(actualTableName); result.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData())); - result.setPosition(null == taskConfig.getDumperContext().getPosition() ? new IntegerPrimaryKeyPosition(0, 1000) : taskConfig.getDumperContext().getPosition()); + result.getCommonContext().setPosition( + null == taskConfig.getDumperContext().getCommonContext().getPosition() ? new IntegerPrimaryKeyPosition(0, 1000) : taskConfig.getDumperContext().getCommonContext().getPosition()); return result; } } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java index c500b0e370e22..23f8210a46803 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java @@ -74,7 +74,7 @@ private ConsistencyCheckJobItemProgressContext createConsistencyCheckJobItemProg private MigrationJobConfiguration createJobConfiguration() throws SQLException { MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()); - initTableData(jobItemContext.getTaskConfig().getDumperContext().getDataSourceConfig()); + initTableData(jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig()); initTableData(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig()); return jobItemContext.getJobConfig(); }