From bfb02e523d4711b83fed9782e2c8d21c012063c8 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Fri, 3 Nov 2023 11:05:41 +0800 Subject: [PATCH] Rename DumperConfiguration to DumperContext --- .../TableNameSchemaNameMapping.java | 2 +- .../ingest/BaseDumperContext.java} | 8 +- .../ingest/IncrementalDumperContext.java} | 6 +- .../ingest/InventoryDumperContext.java} | 18 ++-- .../dumper/IncrementalDumperCreator.java | 6 +- .../TableNameSchemaNameMappingTest.java | 15 +-- .../common/config/ImporterConfiguration.java | 2 +- ...a => IncrementalDumperContextCreator.java} | 12 +-- .../pipeline/core/dumper/InventoryDumper.java | 64 ++++++------ .../InventoryRecordsCountCalculator.java | 14 +-- .../core/preparer/InventoryTaskSplitter.java | 97 ++++++++++--------- .../preparer/PipelineJobPreparerUtils.java | 12 +-- .../datasource/DataSourceCheckEngine.java | 2 +- .../pipeline/core/task/PipelineTaskUtils.java | 10 +- .../datasource/DataSourceCheckEngineTest.java | 2 +- .../dumper/H2IncrementalDumperCreator.java | 4 +- .../mysql/ingest/MySQLIncrementalDumper.java | 26 ++--- .../dumper/MySQLIncrementalDumperCreator.java | 6 +- .../ingest/MySQLIncrementalDumperTest.java | 28 +++--- .../opengauss/ingest/OpenGaussWALDumper.java | 18 ++-- .../OpenGaussIncrementalDumperCreator.java | 6 +- .../ingest/PostgreSQLWALDumper.java | 18 ++-- .../PostgreSQLIncrementalDumperCreator.java | 6 +- .../ingest/wal/WALEventConverter.java | 16 +-- .../ingest/PostgreSQLWALDumperTest.java | 16 +-- .../ingest/wal/WALEventConverterTest.java | 22 ++--- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 22 ++--- .../cdc/config/task/CDCTaskConfiguration.java | 4 +- .../cdc/context/CDCJobItemContext.java | 4 +- .../cdc/core/prepare/CDCJobPreparer.java | 22 ++--- .../migration/api/impl/MigrationJobAPI.java | 16 +-- .../config/MigrationTaskConfiguration.java | 4 +- ...ationIncrementalDumperContextCreator.java} | 18 ++-- .../context/MigrationJobItemContext.java | 2 +- .../prepare/MigrationJobPreparer.java | 22 ++--- .../FixtureIncrementalDumperCreator.java | 4 +- .../importer/PipelineDataSourceSinkTest.java | 2 +- .../GovernanceRepositoryAPIImplTest.java | 16 +-- .../prepare/InventoryTaskSplitterTest.java | 54 +++++------ .../core/task/IncrementalTaskTest.java | 2 +- .../pipeline/core/task/InventoryTaskTest.java | 22 ++--- .../MigrationDataConsistencyCheckerTest.java | 2 +- 42 files changed, 323 insertions(+), 329 deletions(-) rename kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/{config => context}/TableNameSchemaNameMapping.java (97%) rename kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/{config/ingest/BaseDumperConfiguration.java => context/ingest/BaseDumperContext.java} (94%) rename kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/{config/ingest/IncrementalDumperConfiguration.java => context/ingest/IncrementalDumperContext.java} (84%) rename kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/{config/ingest/InventoryDumperConfiguration.java => context/ingest/InventoryDumperContext.java} (74%) rename kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/{config => context}/TableNameSchemaNameMappingTest.java (70%) rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/{IncrementalDumperConfigurationCreator.java => IncrementalDumperContextCreator.java} (74%) rename kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/{MigrationIncrementalDumperConfigurationCreator.java => MigrationIncrementalDumperContextCreator.java} (69%) diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TableNameSchemaNameMapping.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java similarity index 97% rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TableNameSchemaNameMapping.java rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java index 5b37485ce61bc..ea56984a4b2aa 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/TableNameSchemaNameMapping.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.config; +package org.apache.shardingsphere.data.pipeline.api.context; import lombok.ToString; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/BaseDumperConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/BaseDumperContext.java similarity index 94% rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/BaseDumperConfiguration.java rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/BaseDumperContext.java index 59b9ebd41fa21..afff4f01023b1 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/BaseDumperConfiguration.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/BaseDumperContext.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.config.ingest; +package org.apache.shardingsphere.data.pipeline.api.context.ingest; import lombok.Getter; import lombok.Setter; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; @@ -34,12 +34,12 @@ import java.util.stream.Collectors; /** - * Base dumper configuration. + * Base dumper context. */ @Getter @Setter @ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"}) -public abstract class BaseDumperConfiguration { +public abstract class BaseDumperContext { private String dataSourceName; diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/IncrementalDumperConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/IncrementalDumperContext.java similarity index 84% rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/IncrementalDumperConfiguration.java rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/IncrementalDumperContext.java index 326e25c9262b6..127ea801c7994 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/IncrementalDumperConfiguration.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/IncrementalDumperContext.java @@ -15,19 +15,19 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.config.ingest; +package org.apache.shardingsphere.data.pipeline.api.context.ingest; import lombok.Getter; import lombok.Setter; import lombok.ToString; /** - * Incremental dumper configuration. + * Incremental dumper context. */ @Getter @Setter @ToString(callSuper = true) -public class IncrementalDumperConfiguration extends BaseDumperConfiguration { +public class IncrementalDumperContext extends BaseDumperContext { private String jobId; diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/InventoryDumperContext.java similarity index 74% rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/InventoryDumperContext.java index 792f60c8f3700..b1afd6f052933 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/InventoryDumperContext.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.config.ingest; +package org.apache.shardingsphere.data.pipeline.api.context.ingest; import lombok.Getter; import lombok.Setter; @@ -26,12 +26,12 @@ import java.util.List; /** - * Inventory dumper configuration. + * Inventory dumper context. */ @Getter @Setter @ToString(callSuper = true) -public final class InventoryDumperConfiguration extends BaseDumperConfiguration { +public final class InventoryDumperContext extends BaseDumperContext { private String actualTableName; @@ -51,12 +51,12 @@ public final class InventoryDumperConfiguration extends BaseDumperConfiguration private JobRateLimitAlgorithm rateLimitAlgorithm; - public InventoryDumperConfiguration(final BaseDumperConfiguration dumperConfig) { - setDataSourceName(dumperConfig.getDataSourceName()); - setDataSourceConfig(dumperConfig.getDataSourceConfig()); - setTableNameMap(dumperConfig.getTableNameMap()); - setTableNameSchemaNameMapping(dumperConfig.getTableNameSchemaNameMapping()); - setTargetTableColumnsMap(dumperConfig.getTargetTableColumnsMap()); + public InventoryDumperContext(final BaseDumperContext dumperContext) { + setDataSourceName(dumperContext.getDataSourceName()); + setDataSourceConfig(dumperContext.getDataSourceConfig()); + setTableNameMap(dumperContext.getTableNameMap()); + setTableNameSchemaNameMapping(dumperContext.getTableNameSchemaNameMapping()); + setTargetTableColumnsMap(dumperContext.getTargetTableColumnsMap()); } /** diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java index 30705ecf7cbe1..d43317a33e535 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -34,13 +34,13 @@ public interface IncrementalDumperCreator extends DatabaseTypedSPI { /** * Create incremental dumper. * - * @param config incremental dumper configuration + * @param context incremental dumper context * @param position position * @param channel channel * @param metaDataLoader meta data loader * @return incremental dumper */ - IncrementalDumper createIncrementalDumper(IncrementalDumperConfiguration config, IngestPosition position, PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader); + IncrementalDumper createIncrementalDumper(IncrementalDumperContext context, IngestPosition position, PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader); /** * Whether support incremental dump. diff --git a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/TableNameSchemaNameMappingTest.java b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java similarity index 70% rename from kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/TableNameSchemaNameMappingTest.java rename to kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java index 5e5e41ca34297..a640ee35df031 100644 --- a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/TableNameSchemaNameMappingTest.java +++ b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java @@ -15,12 +15,11 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.config; +package org.apache.shardingsphere.data.pipeline.api.context; import org.junit.jupiter.api.Test; -import java.util.HashMap; -import java.util.Map; +import java.util.Collections; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -36,17 +35,11 @@ void assertConstructFromNull() { @Test void assertConstructFromValueNullMap() { - Map map = new HashMap<>(); - map.put("t_order", null); - TableNameSchemaNameMapping mapping = new TableNameSchemaNameMapping(map); - assertNull(mapping.getSchemaName("t_order")); + assertNull(new TableNameSchemaNameMapping(Collections.singletonMap("t_order", null)).getSchemaName("t_order")); } @Test void assertConstructFromMap() { - Map map = new HashMap<>(); - map.put("t_order", "public"); - TableNameSchemaNameMapping mapping = new TableNameSchemaNameMapping(map); - assertThat(mapping.getSchemaName("t_order"), is("public")); + assertThat(new TableNameSchemaNameMapping(Collections.singletonMap("t_order", "public")).getSchemaName("t_order"), is("public")); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java index b3b305afb629a..a92bce3830482 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java @@ -20,7 +20,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperContextCreator.java similarity index 74% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperContextCreator.java index f95815578dcf2..baecb330f57a5 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperConfigurationCreator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ingest/IncrementalDumperContextCreator.java @@ -17,19 +17,19 @@ package org.apache.shardingsphere.data.pipeline.common.config.ingest; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; /** - * Incremental dumper configuration creator. + * Incremental dumper context creator. */ -public interface IncrementalDumperConfigurationCreator { +public interface IncrementalDumperContextCreator { /** - * Create dumper configuration. + * Create incremental dumper context. * * @param jobDataNodeLine job data node line - * @return dumper configuration + * @return created incremental dumper context */ - IncrementalDumperConfiguration createDumperConfiguration(JobDataNodeLine jobDataNodeLine); + IncrementalDumperContext createDumperContext(JobDataNodeLine jobDataNodeLine); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java index 23462cdf1db01..d1ca7a44e2e35 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java @@ -21,7 +21,7 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper; @@ -71,7 +71,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements Dumper { @Getter(AccessLevel.PROTECTED) - private final InventoryDumperConfiguration dumperConfig; + private final InventoryDumperContext dumperContext; private final PipelineChannel channel; @@ -85,11 +85,11 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements private final AtomicReference dumpStatement = new AtomicReference<>(); - public InventoryDumper(final InventoryDumperConfiguration dumperConfig, final PipelineChannel channel, final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) { - this.dumperConfig = dumperConfig; + public InventoryDumper(final InventoryDumperContext dumperContext, final PipelineChannel channel, final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) { + this.dumperContext = dumperContext; this.channel = channel; this.dataSource = dataSource; - DatabaseType databaseType = dumperConfig.getDataSourceConfig().getDatabaseType(); + DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType(); inventoryDumpSQLBuilder = new PipelineInventoryDumpSQLBuilder(databaseType); columnValueReaderEngine = new ColumnValueReaderEngine(databaseType); this.metaDataLoader = metaDataLoader; @@ -97,26 +97,26 @@ public InventoryDumper(final InventoryDumperConfiguration dumperConfig, final Pi @Override protected void runBlocking() { - IngestPosition position = dumperConfig.getPosition(); + IngestPosition position = dumperContext.getPosition(); if (position instanceof FinishedPosition) { log.info("Ignored because of already finished."); return; } - PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName()); + PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName()); try (Connection connection = dataSource.getConnection()) { dump(tableMetaData, connection); } catch (final SQLException ex) { log.error("Inventory dump, ex caught, msg={}.", ex.getMessage()); - throw new IngestException("Inventory dump failed on " + dumperConfig.getActualTableName(), ex); + throw new IngestException("Inventory dump failed on " + dumperContext.getActualTableName(), ex); } } @SuppressWarnings("MagicConstant") private void dump(final PipelineTableMetaData tableMetaData, final Connection connection) throws SQLException { - int batchSize = dumperConfig.getBatchSize(); - DatabaseType databaseType = dumperConfig.getDataSourceConfig().getDatabaseType(); - if (null != dumperConfig.getTransactionIsolation()) { - connection.setTransactionIsolation(dumperConfig.getTransactionIsolation()); + int batchSize = dumperContext.getBatchSize(); + DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType(); + if (null != dumperContext.getTransactionIsolation()) { + connection.setTransactionIsolation(dumperContext.getTransactionIsolation()); } try (PreparedStatement preparedStatement = JDBCStreamQueryBuilder.build(databaseType, connection, buildInventoryDumpSQL())) { dumpStatement.set(preparedStatement); @@ -126,7 +126,7 @@ private void dump(final PipelineTableMetaData tableMetaData, final Connection co setParameters(preparedStatement); try (ResultSet resultSet = preparedStatement.executeQuery()) { int rowCount = 0; - JobRateLimitAlgorithm rateLimitAlgorithm = dumperConfig.getRateLimitAlgorithm(); + JobRateLimitAlgorithm rateLimitAlgorithm = dumperContext.getRateLimitAlgorithm(); ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); List dataRecords = new LinkedList<>(); while (resultSet.next()) { @@ -153,34 +153,34 @@ private void dump(final PipelineTableMetaData tableMetaData, final Connection co } private String buildInventoryDumpSQL() { - if (!Strings.isNullOrEmpty(dumperConfig.getQuerySQL())) { - return dumperConfig.getQuerySQL(); + if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) { + return dumperContext.getQuerySQL(); } - LogicTableName logicTableName = new LogicTableName(dumperConfig.getLogicTableName()); - String schemaName = dumperConfig.getSchemaName(logicTableName); - if (!dumperConfig.hasUniqueKey()) { - return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperConfig.getActualTableName()); + LogicTableName logicTableName = new LogicTableName(dumperContext.getLogicTableName()); + String schemaName = dumperContext.getSchemaName(logicTableName); + if (!dumperContext.hasUniqueKey()) { + return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName()); } - PrimaryKeyPosition primaryKeyPosition = (PrimaryKeyPosition) dumperConfig.getPosition(); - PipelineColumnMetaData firstColumn = dumperConfig.getUniqueKeyColumns().get(0); - Collection columnNames = dumperConfig.getColumnNames(logicTableName); + PrimaryKeyPosition primaryKeyPosition = (PrimaryKeyPosition) dumperContext.getPosition(); + PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0); + Collection columnNames = dumperContext.getColumnNames(logicTableName); if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) { if (null != primaryKeyPosition.getBeginValue() && null != primaryKeyPosition.getEndValue()) { - return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperConfig.getActualTableName(), columnNames, firstColumn.getName()); + return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName()); } if (null != primaryKeyPosition.getBeginValue() && null == primaryKeyPosition.getEndValue()) { - return inventoryDumpSQLBuilder.buildUnlimitedDivisibleSQL(schemaName, dumperConfig.getActualTableName(), columnNames, firstColumn.getName()); + return inventoryDumpSQLBuilder.buildUnlimitedDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName()); } } - return inventoryDumpSQLBuilder.buildIndivisibleSQL(schemaName, dumperConfig.getActualTableName(), columnNames, firstColumn.getName()); + return inventoryDumpSQLBuilder.buildIndivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName()); } private void setParameters(final PreparedStatement preparedStatement) throws SQLException { - if (!dumperConfig.hasUniqueKey()) { + if (!dumperContext.hasUniqueKey()) { return; } - PipelineColumnMetaData firstColumn = dumperConfig.getUniqueKeyColumns().get(0); - PrimaryKeyPosition position = (PrimaryKeyPosition) dumperConfig.getPosition(); + PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0); + PrimaryKeyPosition position = (PrimaryKeyPosition) dumperContext.getPosition(); if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && null != position.getBeginValue() && null != position.getEndValue()) { preparedStatement.setObject(1, position.getBeginValue()); preparedStatement.setObject(2, position.getEndValue()); @@ -198,8 +198,8 @@ private void setParameters(final PreparedStatement preparedStatement) throws SQL private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) throws SQLException { int columnCount = resultSetMetaData.getColumnCount(); - DataRecord result = new DataRecord(IngestDataChangeType.INSERT, dumperConfig.getLogicTableName(), newPosition(resultSet), columnCount); - List insertColumnNames = Optional.ofNullable(dumperConfig.getInsertColumnNames()).orElse(Collections.emptyList()); + DataRecord result = new DataRecord(IngestDataChangeType.INSERT, dumperContext.getLogicTableName(), newPosition(resultSet), columnCount); + List insertColumnNames = Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList()); ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || insertColumnNames.size() == resultSetMetaData.getColumnCount(), () -> new PipelineInvalidParameterException("Insert colum names count not equals ResultSet column count")); for (int i = 1; i <= columnCount; i++) { @@ -212,8 +212,8 @@ private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMeta } private IngestPosition newPosition(final ResultSet resultSet) throws SQLException { - return dumperConfig.hasUniqueKey() - ? PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperConfig.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition) dumperConfig.getPosition()).getEndValue()) + return dumperContext.hasUniqueKey() + ? PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition) dumperContext.getPosition()).getEndValue()) : new PlaceholderPosition(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java index 6296deb8fd3a6..10d164bc483eb 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java @@ -20,7 +20,7 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; @@ -46,14 +46,14 @@ public final class InventoryRecordsCountCalculator { /** * Get table records count. * - * @param dumperConfig dump configuration + * @param dumperContext inventory dumper context * @param dataSource data source * @return table records count * @throws SplitPipelineJobByUniqueKeyException if there's exception from database */ - public static long getTableRecordsCount(final InventoryDumperConfiguration dumperConfig, final PipelineDataSourceWrapper dataSource) { - String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())); - String actualTableName = dumperConfig.getActualTableName(); + public static long getTableRecordsCount(final InventoryDumperContext dumperContext, final PipelineDataSourceWrapper dataSource) { + String schemaName = dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())); + String actualTableName = dumperContext.getActualTableName(); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(dataSource.getDatabaseType()); Optional sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName); try { @@ -64,8 +64,8 @@ public static long getTableRecordsCount(final InventoryDumperConfiguration dumpe } return getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName)); } catch (final SQLException ex) { - String uniqueKey = dumperConfig.hasUniqueKey() ? dumperConfig.getUniqueKeyColumns().get(0).getName() : ""; - throw new SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), uniqueKey, ex); + String uniqueKey = dumperContext.hasUniqueKey() ? dumperContext.getUniqueKeyColumns().get(0).getName() : ""; + throw new SplitPipelineJobByUniqueKeyException(dumperContext.getActualTableName(), uniqueKey, ex); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java index 8b35669d5a7fb..87af351be19e2 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java @@ -20,7 +20,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Range; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -69,7 +69,7 @@ public final class InventoryTaskSplitter { private final PipelineDataSourceWrapper sourceDataSource; - private final InventoryDumperConfiguration dumperConfig; + private final InventoryDumperContext dumperContext; private final ImporterConfiguration importerConfig; @@ -83,7 +83,7 @@ public List splitInventoryData(final InventoryIncrementalJobItemC List result = new LinkedList<>(); long startTimeMillis = System.currentTimeMillis(); InventoryIncrementalProcessContext processContext = jobItemContext.getJobProcessContext(); - for (InventoryDumperConfiguration each : splitInventoryDumperConfig(jobItemContext)) { + for (InventoryDumperContext each : splitInventoryDumperContext(jobItemContext)) { AtomicReference position = new AtomicReference<>(each.getPosition()); PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position); Dumper dumper = new InventoryDumper(each, channel, sourceDataSource, jobItemContext.getSourceMetaDataLoader()); @@ -96,84 +96,84 @@ public List splitInventoryData(final InventoryIncrementalJobItemC } /** - * Split inventory dumper configuration. + * Split inventory dumper context. * * @param jobItemContext job item context - * @return inventory dumper configurations + * @return inventory dumper contexts */ - public Collection splitInventoryDumperConfig(final InventoryIncrementalJobItemContext jobItemContext) { - Collection result = new LinkedList<>(); - for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) { + public Collection splitInventoryDumperContext(final InventoryIncrementalJobItemContext jobItemContext) { + Collection result = new LinkedList<>(); + for (InventoryDumperContext each : splitByTable(dumperContext)) { result.addAll(splitByPrimaryKey(each, jobItemContext, sourceDataSource)); } return result; } - private Collection splitByTable(final InventoryDumperConfiguration dumperConfig) { - Collection result = new LinkedList<>(); - dumperConfig.getTableNameMap().forEach((key, value) -> { - InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(dumperConfig); + private Collection splitByTable(final InventoryDumperContext dumperContext) { + Collection result = new LinkedList<>(); + dumperContext.getTableNameMap().forEach((key, value) -> { + InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext); // use original table name, for metadata loader, since some database table name case-sensitive - inventoryDumperConfig.setActualTableName(key.getOriginal()); - inventoryDumperConfig.setLogicTableName(value.getOriginal()); - inventoryDumperConfig.setPosition(new PlaceholderPosition()); - inventoryDumperConfig.setInsertColumnNames(dumperConfig.getInsertColumnNames()); - inventoryDumperConfig.setUniqueKeyColumns(dumperConfig.getUniqueKeyColumns()); - result.add(inventoryDumperConfig); + inventoryDumperContext.setActualTableName(key.getOriginal()); + inventoryDumperContext.setLogicTableName(value.getOriginal()); + inventoryDumperContext.setPosition(new PlaceholderPosition()); + inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames()); + inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns()); + result.add(inventoryDumperContext); }); return result; } - private Collection splitByPrimaryKey(final InventoryDumperConfiguration dumperConfig, final InventoryIncrementalJobItemContext jobItemContext, - final PipelineDataSourceWrapper dataSource) { - if (null == dumperConfig.getUniqueKeyColumns()) { - String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())); - String actualTableName = dumperConfig.getActualTableName(); + private Collection splitByPrimaryKey(final InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext jobItemContext, + final PipelineDataSourceWrapper dataSource) { + if (null == dumperContext.getUniqueKeyColumns()) { + String schemaName = dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())); + String actualTableName = dumperContext.getActualTableName(); List uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, jobItemContext.getSourceMetaDataLoader()); - dumperConfig.setUniqueKeyColumns(uniqueKeyColumns); + dumperContext.setUniqueKeyColumns(uniqueKeyColumns); } - Collection result = new LinkedList<>(); + Collection result = new LinkedList<>(); InventoryIncrementalProcessContext jobProcessContext = jobItemContext.getJobProcessContext(); PipelineReadConfiguration readConfig = jobProcessContext.getPipelineProcessConfig().getRead(); int batchSize = readConfig.getBatchSize(); JobRateLimitAlgorithm rateLimitAlgorithm = jobProcessContext.getReadRateLimitAlgorithm(); - Collection inventoryPositions = getInventoryPositions(dumperConfig, jobItemContext, dataSource); + Collection inventoryPositions = getInventoryPositions(dumperContext, jobItemContext, dataSource); int i = 0; for (IngestPosition each : inventoryPositions) { - InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(dumperConfig); - splitDumperConfig.setPosition(each); - splitDumperConfig.setShardingItem(i++); - splitDumperConfig.setActualTableName(dumperConfig.getActualTableName()); - splitDumperConfig.setLogicTableName(dumperConfig.getLogicTableName()); - splitDumperConfig.setUniqueKeyColumns(dumperConfig.getUniqueKeyColumns()); - splitDumperConfig.setInsertColumnNames(dumperConfig.getInsertColumnNames()); - splitDumperConfig.setBatchSize(batchSize); - splitDumperConfig.setRateLimitAlgorithm(rateLimitAlgorithm); - result.add(splitDumperConfig); + InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext); + splitDumperContext.setPosition(each); + splitDumperContext.setShardingItem(i++); + splitDumperContext.setActualTableName(dumperContext.getActualTableName()); + splitDumperContext.setLogicTableName(dumperContext.getLogicTableName()); + splitDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns()); + splitDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames()); + splitDumperContext.setBatchSize(batchSize); + splitDumperContext.setRateLimitAlgorithm(rateLimitAlgorithm); + result.add(splitDumperContext); } return result; } - private Collection getInventoryPositions(final InventoryDumperConfiguration dumperConfig, final InventoryIncrementalJobItemContext jobItemContext, + private Collection getInventoryPositions(final InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) { InventoryIncrementalJobItemProgress initProgress = jobItemContext.getInitProgress(); if (null != initProgress) { // Do NOT filter FinishedPosition here, since whole inventory tasks are required in job progress when persisting to register center. - Collection result = initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values(); + Collection result = initProgress.getInventory().getInventoryPosition(dumperContext.getActualTableName()).values(); if (!result.isEmpty()) { return result; } } - long tableRecordsCount = InventoryRecordsCountCalculator.getTableRecordsCount(dumperConfig, dataSource); + long tableRecordsCount = InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, dataSource); jobItemContext.updateInventoryRecordsCount(tableRecordsCount); - if (!dumperConfig.hasUniqueKey()) { + if (!dumperContext.hasUniqueKey()) { return Collections.singleton(new UnsupportedKeyPosition()); } - List uniqueKeyColumns = dumperConfig.getUniqueKeyColumns(); + List uniqueKeyColumns = dumperContext.getUniqueKeyColumns(); if (1 == uniqueKeyColumns.size()) { int firstColumnDataType = uniqueKeyColumns.get(0).getDataType(); if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) { - return getPositionByIntegerUniqueKeyRange(dumperConfig, tableRecordsCount, jobItemContext, dataSource); + return getPositionByIntegerUniqueKeyRange(dumperContext, tableRecordsCount, jobItemContext, dataSource); } if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) { // TODO Support string unique key table splitting. Ascii characters ordering are different in different versions of databases. @@ -183,13 +183,13 @@ private Collection getInventoryPositions(final InventoryDumperCo return Collections.singleton(new UnsupportedKeyPosition()); } - private Collection getPositionByIntegerUniqueKeyRange(final InventoryDumperConfiguration dumperConfig, final long tableRecordsCount, + private Collection getPositionByIntegerUniqueKeyRange(final InventoryDumperContext dumperContext, final long tableRecordsCount, final InventoryIncrementalJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) { if (0 == tableRecordsCount) { return Collections.singletonList(new IntegerPrimaryKeyPosition(0, 0)); } Collection result = new LinkedList<>(); - Range uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dataSource, dumperConfig); + Range uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dataSource, dumperContext); int shardingSize = jobItemContext.getJobProcessContext().getPipelineProcessConfig().getRead().getShardingSize(); long splitCount = tableRecordsCount / shardingSize + (tableRecordsCount % shardingSize > 0 ? 1 : 0); long interval = (uniqueKeyValuesRange.getMaximum() - uniqueKeyValuesRange.getMinimum()) / splitCount; @@ -201,10 +201,11 @@ private Collection getPositionByIntegerUniqueKeyRange(final Inve return result; } - private Range getUniqueKeyValuesRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) { - String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName(); + private Range getUniqueKeyValuesRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperContext dumperContext) { + String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName(); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType()); - String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), uniqueKey); + String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL( + dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName(), uniqueKey); try ( Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement(); @@ -212,7 +213,7 @@ private Range getUniqueKeyValuesRange(final InventoryIncrementalJobItemCon resultSet.next(); return Range.between(resultSet.getLong(1), resultSet.getLong(2)); } catch (final SQLException ex) { - throw new SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), uniqueKey, ex); + throw new SplitPipelineJobByUniqueKeyException(dumperContext.getActualTableName(), uniqueKey, ex); } } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java index 0cd8d13e3af11..d00ef8e250622 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java @@ -20,7 +20,7 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; @@ -118,12 +118,12 @@ public static void prepareTargetTables(final DatabaseType databaseType, final Pr * Get incremental position. * * @param initIncremental init incremental - * @param dumperConfig dumper config + * @param dumperContext dumper config * @param dataSourceManager data source manager * @return ingest position * @throws SQLException sql exception */ - public static IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initIncremental, final IncrementalDumperConfiguration dumperConfig, + public static IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initIncremental, final IncrementalDumperContext dumperContext, final PipelineDataSourceManager dataSourceManager) throws SQLException { if (null != initIncremental) { Optional position = initIncremental.getIncrementalPosition(); @@ -131,9 +131,9 @@ public static IngestPosition getIncrementalPosition(final JobItemIncrementalTask return position.get(); } } - DatabaseType databaseType = dumperConfig.getDataSourceConfig().getDatabaseType(); - DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); - return DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperConfig.getJobId()); + DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType(); + DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); + return DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperContext.getJobId()); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java index 379cb61005cfe..f58f89bb8f4c3 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; -import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java index c271c1f86d650..572728f123522 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java @@ -19,7 +19,7 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.common.ingest.channel.AckCallbacks; @@ -39,12 +39,12 @@ public final class PipelineTaskUtils { /** * Generate inventory task id. * - * @param inventoryDumperConfig inventory dumper configuration + * @param inventoryDumperContext inventory dumper context * @return inventory task id */ - public static String generateInventoryTaskId(final InventoryDumperConfiguration inventoryDumperConfig) { - String result = String.format("%s.%s", inventoryDumperConfig.getDataSourceName(), inventoryDumperConfig.getActualTableName()); - return result + "#" + inventoryDumperConfig.getShardingItem(); + public static String generateInventoryTaskId(final InventoryDumperContext inventoryDumperContext) { + String result = String.format("%s.%s", inventoryDumperContext.getDataSourceName(), inventoryDumperContext.getActualTableName()); + return result + "#" + inventoryDumperContext.getShardingItem(); } /** diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java index 546843905da5a..6922b6642ae75 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; -import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; diff --git a/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/ingest/dumper/H2IncrementalDumperCreator.java b/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/ingest/dumper/H2IncrementalDumperCreator.java index f0fe498d04be7..9ffca80f14e7f 100644 --- a/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/ingest/dumper/H2IncrementalDumperCreator.java +++ b/kernel/data-pipeline/dialect/h2/src/main/java/org/apache/shardingsphere/data/pipeline/h2/ingest/dumper/H2IncrementalDumperCreator.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.h2.ingest.dumper; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -30,7 +30,7 @@ public final class H2IncrementalDumperCreator implements IncrementalDumperCreator { @Override - public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position, + public IncrementalDumper createIncrementalDumper(final IncrementalDumperContext context, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { throw new UnsupportedOperationException("H2 database can not support incremental dump."); } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index 8b65fa2252950..d1fb5ca8ea489 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -19,7 +19,7 @@ import com.google.common.base.Preconditions; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration; import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; @@ -68,7 +68,7 @@ @Slf4j public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor implements IncrementalDumper { - private final IncrementalDumperConfiguration dumperConfig; + private final IncrementalDumperContext dumperContext; private final BinlogPosition binlogPosition; @@ -80,19 +80,19 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl private final String catalog; - public MySQLIncrementalDumper(final IncrementalDumperConfiguration dumperConfig, final IngestPosition binlogPosition, + public MySQLIncrementalDumper(final IncrementalDumperContext dumperContext, final IngestPosition binlogPosition, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration"); - this.dumperConfig = dumperConfig; + Preconditions.checkArgument(dumperContext.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration"); + this.dumperContext = dumperContext; this.binlogPosition = (BinlogPosition) binlogPosition; this.channel = channel; this.metaDataLoader = metaDataLoader; - YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).getJdbcConfig(); + YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig()).getJdbcConfig(); ConnectionPropertiesParser parser = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, TypedSPILoader.getService(DatabaseType.class, "MySQL")); ConnectionProperties connectionProps = parser.parse(jdbcConfig.getUrl(), null, null); ConnectInfo connectInfo = new ConnectInfo(generateServerId(), connectionProps.getHostname(), connectionProps.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()); log.info("incremental dump, jdbcUrl={}, serverId={}, hostname={}, port={}", jdbcConfig.getUrl(), connectInfo.getServerId(), connectInfo.getHost(), connectInfo.getPort()); - client = new MySQLClient(connectInfo, dumperConfig.isDecodeWithTX()); + client = new MySQLClient(connectInfo, dumperContext.isDecodeWithTX()); catalog = connectionProps.getCatalog(); } @@ -134,7 +134,7 @@ private List handleEvent(final AbstractBinlogEvent event) { return Collections.singletonList(createPlaceholderRecord(event)); } AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event; - if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperConfig.containsTable(rowsEvent.getTableName())) { + if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.containsTable(rowsEvent.getTableName())) { return Collections.singletonList(createPlaceholderRecord(event)); } PipelineTableMetaData tableMetaData = getPipelineTableMetaData(rowsEvent.getTableName()); @@ -157,11 +157,11 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractBinlogEvent even } private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) { - return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new ActualTableName(actualTableName)), actualTableName); + return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new ActualTableName(actualTableName)), actualTableName); } private List handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) { - Collection columnNames = dumperConfig.getColumnNames(event.getTableName()); + Collection columnNames = dumperContext.getColumnNames(event.getTableName()); List result = new LinkedList<>(); for (Serializable[] each : event.getAfterRows()) { DataRecord dataRecord = createDataRecord(IngestDataChangeType.INSERT, event, each.length); @@ -182,7 +182,7 @@ private boolean isColumnUnneeded(final Collection columnNames, final } private List handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) { - Collection columnNames = dumperConfig.getColumnNames(event.getTableName()); + Collection columnNames = dumperContext.getColumnNames(event.getTableName()); List result = new LinkedList<>(); for (int i = 0; i < event.getBeforeRows().size(); i++) { Serializable[] beforeValues = event.getBeforeRows().get(i); @@ -206,7 +206,7 @@ private List handleUpdateRowsEvent(final UpdateRowsEvent event, fina } private List handleDeleteRowsEvent(final DeleteRowsEvent event, final PipelineTableMetaData tableMetaData) { - Collection columnNames = dumperConfig.getColumnNames(event.getTableName()); + Collection columnNames = dumperContext.getColumnNames(event.getTableName()); List result = new LinkedList<>(); for (Serializable[] each : event.getBeforeRows()) { DataRecord dataRecord = createDataRecord(IngestDataChangeType.DELETE, event, each.length); @@ -234,7 +234,7 @@ private Serializable handleValue(final PipelineColumnMetaData columnMetaData, fi } private DataRecord createDataRecord(final String type, final AbstractRowsEvent rowsEvent, final int columnCount) { - String tableName = dumperConfig.getLogicTableName(rowsEvent.getTableName()).getOriginal(); + String tableName = dumperContext.getLogicTableName(rowsEvent.getTableName()).getOriginal(); IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()); DataRecord result = new DataRecord(type, tableName, position, columnCount); result.setActualTableName(rowsEvent.getTableName()); diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java index e359cd5b8baef..5b1734bbdb90e 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/dumper/MySQLIncrementalDumperCreator.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest.dumper; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -31,9 +31,9 @@ public final class MySQLIncrementalDumperCreator implements IncrementalDumperCreator { @Override - public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position, + public IncrementalDumper createIncrementalDumper(final IncrementalDumperContext context, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - return new MySQLIncrementalDumper(config, position, channel, metaDataLoader); + return new MySQLIncrementalDumper(context, position, channel, metaDataLoader); } @Override diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index 5cc6e395f374e..a2df83744e196 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -18,8 +18,8 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest; import lombok.SneakyThrows; -import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; @@ -75,7 +75,7 @@ @SuppressWarnings("unchecked") class MySQLIncrementalDumperTest { - private IncrementalDumperConfiguration dumperConfig; + private IncrementalDumperContext dumperContext; private MySQLIncrementalDumper incrementalDumper; @@ -83,18 +83,18 @@ class MySQLIncrementalDumperTest { @BeforeEach void setUp() { - dumperConfig = mockDumperConfiguration(); - initTableData(dumperConfig); - dumperConfig.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root")); + dumperContext = mockDumperContext(); + initTableData(dumperContext); + dumperContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root")); PipelineTableMetaDataLoader metaDataLoader = mock(PipelineTableMetaDataLoader.class); SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(10000, new EmptyAckCallback()); - incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L, 0L), channel, metaDataLoader); + incrementalDumper = new MySQLIncrementalDumper(dumperContext, new BinlogPosition("binlog-000001", 4L, 0L), channel, metaDataLoader); pipelineTableMetaData = new PipelineTableMetaData("t_order", mockOrderColumnsMetaDataMap(), Collections.emptyList()); when(metaDataLoader.getTableMetaData(any(), any())).thenReturn(pipelineTableMetaData); } - private IncrementalDumperConfiguration mockDumperConfiguration() { - IncrementalDumperConfiguration result = new IncrementalDumperConfiguration(); + private IncrementalDumperContext mockDumperContext() { + IncrementalDumperContext result = new IncrementalDumperContext(); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root")); result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap())); @@ -103,10 +103,10 @@ private IncrementalDumperConfiguration mockDumperConfiguration() { } @SneakyThrows(SQLException.class) - private void initTableData(final IncrementalDumperConfiguration dumperConfig) { + private void initTableData(final IncrementalDumperContext dumperContext) { try ( PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); - PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); + PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { statement.execute("DROP TABLE IF EXISTS t_order"); @@ -138,7 +138,7 @@ void assertWriteRowsEventWithCustomColumns() throws ReflectiveOperationException } private void assertWriteRowsEvent0(final Map> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException { - dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap); + dumperContext.setTargetTableColumnsMap(targetTableColumnsMap); WriteRowsEvent rowsEvent = new WriteRowsEvent(); rowsEvent.setDatabaseName(""); rowsEvent.setTableName("t_order"); @@ -166,7 +166,7 @@ void assertUpdateRowsEventWithCustomColumns() throws ReflectiveOperationExceptio } private void assertUpdateRowsEvent0(final Map> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException { - dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap); + dumperContext.setTargetTableColumnsMap(targetTableColumnsMap); UpdateRowsEvent rowsEvent = new UpdateRowsEvent(); rowsEvent.setDatabaseName("test"); rowsEvent.setTableName("t_order"); @@ -191,7 +191,7 @@ void assertDeleteRowsEventWithCustomColumns() throws ReflectiveOperationExceptio } private void assertDeleteRowsEvent0(final Map> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException { - dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap); + dumperContext.setTargetTableColumnsMap(targetTableColumnsMap); DeleteRowsEvent rowsEvent = new DeleteRowsEvent(); rowsEvent.setDatabaseName(""); rowsEvent.setTableName("t_order"); diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java index c4b4defb39059..15626615e503d 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java @@ -19,7 +19,7 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; @@ -58,7 +58,7 @@ @Slf4j public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper { - private final IncrementalDumperConfiguration dumperConfig; + private final IncrementalDumperContext dumperContext; private final AtomicReference walPosition; @@ -72,16 +72,16 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen private List rowEvents = new LinkedList<>(); - public OpenGaussWALDumper(final IncrementalDumperConfiguration dumperConfig, final IngestPosition position, + public OpenGaussWALDumper(final IncrementalDumperContext dumperContext, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()), + ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getDataSourceConfig().getClass()), () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration")); - this.dumperConfig = dumperConfig; + this.dumperContext = dumperContext; walPosition = new AtomicReference<>((WALPosition) position); this.channel = channel; - walEventConverter = new WALEventConverter(dumperConfig, metaDataLoader); + walEventConverter = new WALEventConverter(dumperContext, metaDataLoader); logicalReplication = new OpenGaussLogicalReplication(); - this.decodeWithTX = dumperConfig.isDecodeWithTX(); + this.decodeWithTX = dumperContext.isDecodeWithTX(); } @SneakyThrows(InterruptedException.class) @@ -110,7 +110,7 @@ private void dump() throws SQLException { PGReplicationStream stream = null; try (PgConnection connection = getReplicationConnectionUnwrap()) { stream = logicalReplication.createReplicationStream(connection, walPosition.get().getLogSequenceNumber(), - OpenGaussPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId())); + OpenGaussPositionInitializer.getUniqueSlotName(connection, dumperContext.getJobId())); DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX); while (isRunning()) { ByteBuffer message = stream.readPending(); @@ -137,7 +137,7 @@ private void dump() throws SQLException { } private PgConnection getReplicationConnectionUnwrap() throws SQLException { - return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).unwrap(PgConnection.class); + return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig()).unwrap(PgConnection.class); } private void processEventWithTX(final AbstractWALEvent event) { diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java index b4c9bde4d41bd..7ea934cc98ed6 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/dumper/OpenGaussIncrementalDumperCreator.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.opengauss.ingest.dumper; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -31,9 +31,9 @@ public final class OpenGaussIncrementalDumperCreator implements IncrementalDumperCreator { @Override - public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position, + public IncrementalDumper createIncrementalDumper(final IncrementalDumperContext context, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - return new OpenGaussWALDumper(config, position, channel, metaDataLoader); + return new OpenGaussWALDumper(context, position, channel, metaDataLoader); } @Override diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java index f377684cd2e87..f6901c32906a3 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java @@ -19,7 +19,7 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; @@ -60,7 +60,7 @@ @Slf4j public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper { - private final IncrementalDumperConfiguration dumperConfig; + private final IncrementalDumperContext dumperContext; private final AtomicReference walPosition; @@ -74,16 +74,16 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme private List rowEvents = new LinkedList<>(); - public PostgreSQLWALDumper(final IncrementalDumperConfiguration dumperConfig, final IngestPosition position, + public PostgreSQLWALDumper(final IncrementalDumperContext dumperContext, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()), + ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getDataSourceConfig().getClass()), () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration")); - this.dumperConfig = dumperConfig; + this.dumperContext = dumperContext; walPosition = new AtomicReference<>((WALPosition) position); this.channel = channel; - walEventConverter = new WALEventConverter(dumperConfig, metaDataLoader); + walEventConverter = new WALEventConverter(dumperContext, metaDataLoader); logicalReplication = new PostgreSQLLogicalReplication(); - this.decodeWithTX = dumperConfig.isDecodeWithTX(); + this.decodeWithTX = dumperContext.isDecodeWithTX(); } @SneakyThrows(InterruptedException.class) @@ -111,8 +111,8 @@ protected void runBlocking() { private void dump() throws SQLException { // TODO use unified PgConnection try ( - Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()); - PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()), + Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig()); + PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLPositionInitializer.getUniqueSlotName(connection, dumperContext.getJobId()), walPosition.get().getLogSequenceNumber())) { PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils()); DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils); diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java index 50852c68caa3e..b842ccaccd1ea 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/dumper/PostgreSQLIncrementalDumperCreator.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.dumper; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -31,9 +31,9 @@ public final class PostgreSQLIncrementalDumperCreator implements IncrementalDumperCreator { @Override - public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position, + public IncrementalDumper createIncrementalDumper(final IncrementalDumperContext context, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - return new PostgreSQLWALDumper(config, position, channel, metaDataLoader); + return new PostgreSQLWALDumper(context, position, channel, metaDataLoader); } @Override diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java index 388c664780a13..aa96452a9b6b8 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; @@ -43,12 +43,12 @@ */ public final class WALEventConverter { - private final IncrementalDumperConfiguration dumperConfig; + private final IncrementalDumperContext dumperContext; private final PipelineTableMetaDataLoader metaDataLoader; - public WALEventConverter(final IncrementalDumperConfiguration dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) { - this.dumperConfig = dumperConfig; + public WALEventConverter(final IncrementalDumperContext dumperContext, final PipelineTableMetaDataLoader metaDataLoader) { + this.dumperContext = dumperContext; this.metaDataLoader = metaDataLoader; } @@ -82,7 +82,7 @@ public Record convert(final AbstractWALEvent event) { private boolean filter(final AbstractWALEvent event) { if (event instanceof AbstractRowEvent) { AbstractRowEvent rowEvent = (AbstractRowEvent) event; - return !dumperConfig.containsTable(rowEvent.getTableName()); + return !dumperContext.containsTable(rowEvent.getTableName()); } return false; } @@ -92,7 +92,7 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractWALEvent event) } private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) { - return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new ActualTableName(actualTableName)), actualTableName); + return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new ActualTableName(actualTableName)), actualTableName); } private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) { @@ -120,7 +120,7 @@ private DataRecord handleDeleteRowEvent(final DeleteRowEvent event, final Pipeli } private DataRecord createDataRecord(final String type, final AbstractRowEvent rowsEvent, final int columnCount) { - String tableName = dumperConfig.getLogicTableName(rowsEvent.getTableName()).getOriginal(); + String tableName = dumperContext.getLogicTableName(rowsEvent.getTableName()).getOriginal(); DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(), tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount); result.setActualTableName(rowsEvent.getTableName()); result.setCsn(rowsEvent.getCsn()); @@ -128,7 +128,7 @@ private DataRecord createDataRecord(final String type, final AbstractRowEvent ro } private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final String actualTableName, final List values) { - Collection columnNames = dumperConfig.getColumnNames(actualTableName); + Collection columnNames = dumperContext.getColumnNames(actualTableName); for (int i = 0, count = values.size(); i < count; i++) { PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1); if (isColumnUnneeded(columnNames, columnMetaData.getName())) { diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java index 5167148ddc26f..62052be5ea927 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java @@ -17,8 +17,8 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest; -import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; @@ -72,7 +72,7 @@ class PostgreSQLWALDumperTest { private WALPosition position; - private IncrementalDumperConfiguration dumperConfig; + private IncrementalDumperContext dumperContext; private PostgreSQLWALDumper walDumper; @@ -88,8 +88,8 @@ void setUp() { String username = "root"; String password = "root"; createTable(jdbcUrl, username, password); - dumperConfig = createDumperConfiguration(jdbcUrl, username, password); - walDumper = new PostgreSQLWALDumper(dumperConfig, position, channel, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()))); + dumperContext = createDumperContext(jdbcUrl, username, password); + walDumper = new PostgreSQLWALDumper(dumperContext, position, channel, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()))); } private void createTable(final String jdbcUrl, final String username, final String password) { @@ -103,8 +103,8 @@ private void createTable(final String jdbcUrl, final String username, final Stri } } - private IncrementalDumperConfiguration createDumperConfiguration(final String jdbcUrl, final String username, final String password) { - IncrementalDumperConfiguration result = new IncrementalDumperConfiguration(); + private IncrementalDumperContext createDumperContext(final String jdbcUrl, final String username, final String password) { + IncrementalDumperContext result = new IncrementalDumperContext(); result.setJobId("0101123456"); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password)); result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order"))); @@ -119,7 +119,7 @@ void tearDown() { @Test void assertStart() throws SQLException, ReflectiveOperationException { - StandardPipelineDataSourceConfiguration dataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig(); + StandardPipelineDataSourceConfiguration dataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig(); try { Plugins.getMemberAccessor().set(PostgreSQLWALDumper.class.getDeclaredField("logicalReplication"), walDumper, logicalReplication); when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection); diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index 29479237999a4..389b3f724bdca 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -18,8 +18,8 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal; import lombok.SneakyThrows; -import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; @@ -71,7 +71,7 @@ class WALEventConverterTest { - private IncrementalDumperConfiguration dumperConfig; + private IncrementalDumperContext dumperContext; private WALEventConverter walEventConverter; @@ -81,15 +81,15 @@ class WALEventConverterTest { @BeforeEach void setUp() { - dumperConfig = mockDumperConfiguration(); + dumperContext = mockDumperContext(); PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); - walEventConverter = new WALEventConverter(dumperConfig, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()))); - initTableData(dumperConfig); + walEventConverter = new WALEventConverter(dumperContext, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()))); + initTableData(dumperContext); pipelineTableMetaData = new PipelineTableMetaData("t_order", mockOrderColumnsMetaDataMap(), Collections.emptyList()); } - private IncrementalDumperConfiguration mockDumperConfiguration() { - IncrementalDumperConfiguration result = new IncrementalDumperConfiguration(); + private IncrementalDumperContext mockDumperContext() { + IncrementalDumperContext result = new IncrementalDumperContext(); result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root")); result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap())); @@ -97,10 +97,10 @@ private IncrementalDumperConfiguration mockDumperConfiguration() { } @SneakyThrows(SQLException.class) - private void initTableData(final IncrementalDumperConfiguration dumperConfig) { + private void initTableData(final IncrementalDumperContext dumperContext) { try ( PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); - PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); + PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { statement.execute("DROP TABLE IF EXISTS t_order"); @@ -132,7 +132,7 @@ void assertWriteRowEventWithCustomColumns() throws ReflectiveOperationException } private void assertWriteRowEvent0(final Map> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException { - dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap); + dumperContext.setTargetTableColumnsMap(targetTableColumnsMap); WriteRowEvent rowsEvent = new WriteRowEvent(); rowsEvent.setSchemaName(""); rowsEvent.setTableName("t_order"); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index f88832aea96f1..b7771bc7d8da3 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -21,8 +21,8 @@ import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; @@ -177,8 +177,8 @@ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) { if (getJobItemProgress(jobId, i).isPresent()) { continue; } - IncrementalDumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig, i, getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames())); - InventoryIncrementalJobItemProgress jobItemProgress = getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, dumperConfig); + IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames())); + InventoryIncrementalJobItemProgress jobItemProgress = getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext); PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress( jobId, i, YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress))); } @@ -189,11 +189,11 @@ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) { private static InventoryIncrementalJobItemProgress getInventoryIncrementalJobItemProgress(final CDCJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager, - final IncrementalDumperConfiguration incrementalDumperConfig) throws SQLException { + final IncrementalDumperContext incrementalDumperContext) throws SQLException { InventoryIncrementalJobItemProgress result = new InventoryIncrementalJobItemProgress(); result.setSourceDatabaseType(jobConfig.getSourceDatabaseType()); - result.setDataSourceName(incrementalDumperConfig.getDataSourceName()); - IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, incrementalDumperConfig, dataSourceManager)); + result.setDataSourceName(incrementalDumperContext.getDataSourceName()); + IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, incrementalDumperContext, dataSourceManager)); result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress)); return result; } @@ -268,9 +268,9 @@ protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) { public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) { CDCJobConfiguration jobConfig = (CDCJobConfiguration) pipelineJobConfig; TableNameSchemaNameMapping tableNameSchemaNameMapping = getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames()); - IncrementalDumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig, jobShardingItem, tableNameSchemaNameMapping); + IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, jobShardingItem, tableNameSchemaNameMapping); ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, jobConfig.getSchemaTableNames(), tableNameSchemaNameMapping); - CDCTaskConfiguration result = new CDCTaskConfiguration(dumperConfig, importerConfig); + CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, importerConfig); log.debug("buildTaskConfiguration, result={}", result); return result; } @@ -286,13 +286,13 @@ private TableNameSchemaNameMapping getTableNameSchemaNameMapping(final Collectio return new TableNameSchemaNameMapping(tableNameSchemaMap); } - private IncrementalDumperConfiguration buildDumperConfiguration(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableNameSchemaNameMapping tableNameSchemaNameMapping) { + private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableNameSchemaNameMapping tableNameSchemaNameMapping) { JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem); Map tableNameMap = new LinkedHashMap<>(); dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName())))); String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName(); StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName); - IncrementalDumperConfiguration result = new IncrementalDumperConfiguration(); + IncrementalDumperContext result = new IncrementalDumperContext(); result.setJobId(jobConfig.getJobId()); result.setDataSourceName(dataSourceName); result.setDataSourceConfig(actualDataSourceConfig); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/task/CDCTaskConfiguration.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/task/CDCTaskConfiguration.java index af862bc1a4b8c..b66847da8e985 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/task/CDCTaskConfiguration.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/task/CDCTaskConfiguration.java @@ -19,7 +19,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.PipelineTaskConfiguration; @@ -30,7 +30,7 @@ @Getter public final class CDCTaskConfiguration implements PipelineTaskConfiguration { - private final IncrementalDumperConfiguration dumperConfig; + private final IncrementalDumperContext dumperContext; private final ImporterConfiguration importerConfig; } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java index 160399ce2450a..23bc51e1f51e9 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java @@ -80,7 +80,7 @@ public final class CDCJobItemContext implements InventoryIncrementalJobItemConte @Override protected PipelineDataSourceWrapper initialize() { - return dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()); + return dataSourceManager.getDataSource(taskConfig.getDumperContext().getDataSourceConfig()); } }; @@ -99,7 +99,7 @@ public String getJobId() { @Override public String getDataSourceName() { - return taskConfig.getDumperConfig().getDataSourceName(); + return taskConfig.getDumperContext().getDataSourceName(); } @Override diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index 099c71139cc0e..ff30ed40a93f2 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -18,8 +18,8 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.prepare; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -104,7 +104,7 @@ private void initIncrementalPosition(final CDCJobItemContext jobItemContext) { CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental(); try { - taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperConfig(), jobItemContext.getDataSourceManager())); + taskConfig.getDumperContext().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager())); } catch (final SQLException ex) { throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex); } @@ -115,15 +115,15 @@ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final At CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); ImporterConfiguration importerConfig = taskConfig.getImporterConfig(); CDCProcessContext processContext = jobItemContext.getJobProcessContext(); - for (InventoryDumperConfiguration each : new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new InventoryDumperConfiguration(taskConfig.getDumperConfig()), importerConfig) - .splitInventoryDumperConfig(jobItemContext)) { + for (InventoryDumperContext each : new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new InventoryDumperContext(taskConfig.getDumperContext()), importerConfig) + .splitInventoryDumperContext(jobItemContext)) { AtomicReference position = new AtomicReference<>(each.getPosition()); PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position); channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext)); Dumper dumper = new InventoryDumper(each, channel, jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader()); Importer importer = importerUsed.get() ? null : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(), - needSorting(ImporterType.INVENTORY, hasGlobalCSN(taskConfig.getDumperConfig().getDataSourceConfig().getDatabaseType())), + needSorting(ImporterType.INVENTORY, hasGlobalCSN(taskConfig.getDumperContext().getDataSourceConfig().getDatabaseType())), importerConfig.getRateLimitAlgorithm()); jobItemContext.getInventoryTasks().add(new CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(), processContext.getInventoryImporterExecuteEngine(), dumper, importer, position)); @@ -144,18 +144,18 @@ private boolean hasGlobalCSN(final DatabaseType databaseType) { private void initIncrementalTask(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List channelProgressPairs) { CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); - IncrementalDumperConfiguration dumperConfig = taskConfig.getDumperConfig(); + IncrementalDumperContext dumperContext = taskConfig.getDumperContext(); ImporterConfiguration importerConfig = taskConfig.getImporterConfig(); - IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperConfig.getPosition(), jobItemContext.getInitProgress()); + IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getPosition(), jobItemContext.getInitProgress()); PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), jobItemContext.getJobProcessContext().getPipelineChannelCreator(), taskProgress); channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext)); - Dumper dumper = DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, dumperConfig.getDataSourceConfig().getDatabaseType()) - .createIncrementalDumper(dumperConfig, dumperConfig.getPosition(), channel, jobItemContext.getSourceMetaDataLoader()); + Dumper dumper = DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, dumperContext.getDataSourceConfig().getDatabaseType()) + .createIncrementalDumper(dumperContext, dumperContext.getPosition(), channel, jobItemContext.getSourceMetaDataLoader()); boolean needSorting = needSorting(ImporterType.INCREMENTAL, hasGlobalCSN(importerConfig.getDataSourceConfig().getDatabaseType())); Importer importer = importerUsed.get() ? null : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 300, TimeUnit.MILLISECONDS, jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm()); - PipelineTask incrementalTask = new CDCIncrementalTask(dumperConfig.getDataSourceName(), jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, importer, taskProgress); + PipelineTask incrementalTask = new CDCIncrementalTask(dumperContext.getDataSourceName(), jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, importer, taskProgress); jobItemContext.getIncrementalTasks().add(incrementalTask); importerUsed.set(true); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index b7a11dae97623..aa1a8e4b141c7 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -19,8 +19,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; @@ -67,7 +67,7 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; -import org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperConfigurationCreator; +import org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext; import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration; @@ -262,14 +262,14 @@ protected MigrationJobConfiguration getJobConfiguration(final JobConfigurationPO @Override public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) { MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig; - IncrementalDumperConfiguration incrementalDumperConfig = new MigrationIncrementalDumperConfigurationCreator( - jobConfig).createDumperConfiguration(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); - CreateTableConfiguration createTableConfig = buildCreateTableConfiguration(jobConfig, incrementalDumperConfig.getTableNameSchemaNameMapping()); + IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator( + jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); + CreateTableConfiguration createTableConfig = buildCreateTableConfiguration(jobConfig, incrementalDumperContext.getTableNameSchemaNameMapping()); Set targetTableNames = jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet()); Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); - ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, shardingColumnsMap, incrementalDumperConfig.getTableNameSchemaNameMapping()); - MigrationTaskConfiguration result = new MigrationTaskConfiguration(incrementalDumperConfig.getDataSourceName(), createTableConfig, incrementalDumperConfig, importerConfig); + ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, shardingColumnsMap, incrementalDumperContext.getTableNameSchemaNameMapping()); + MigrationTaskConfiguration result = new MigrationTaskConfiguration(incrementalDumperContext.getDataSourceName(), createTableConfig, incrementalDumperContext, importerConfig); log.info("buildTaskConfiguration, result={}", result); return result; } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java index f250fafe4a27d..d9e34ae3b3c8c 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java @@ -20,7 +20,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.PipelineTaskConfiguration; @@ -37,7 +37,7 @@ public final class MigrationTaskConfiguration implements PipelineTaskConfigurati private final CreateTableConfiguration createTableConfig; - private final IncrementalDumperConfiguration dumperConfig; + private final IncrementalDumperContext dumperContext; private final ImporterConfiguration importerConfig; } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java similarity index 69% rename from kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java rename to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java index 07cb0dfd60dbf..bf4acb9166ccf 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperConfigurationCreator.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java @@ -18,12 +18,12 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; -import org.apache.shardingsphere.data.pipeline.common.config.ingest.IncrementalDumperConfigurationCreator; +import org.apache.shardingsphere.data.pipeline.common.config.ingest.IncrementalDumperContextCreator; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; @@ -34,21 +34,21 @@ * Migration incremental dumper configuration creator. */ @RequiredArgsConstructor -public final class MigrationIncrementalDumperConfigurationCreator implements IncrementalDumperConfigurationCreator { +public final class MigrationIncrementalDumperContextCreator implements IncrementalDumperContextCreator { private final MigrationJobConfiguration jobConfig; @Override - public IncrementalDumperConfiguration createDumperConfiguration(final JobDataNodeLine jobDataNodeLine) { + public IncrementalDumperContext createDumperContext(final JobDataNodeLine jobDataNodeLine) { Map tableNameMap = JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine); TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap()); String dataSourceName = jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName(); - return buildDumperConfiguration(jobConfig.getJobId(), dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMap, tableNameSchemaNameMapping); + return buildDumperContext(jobConfig.getJobId(), dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMap, tableNameSchemaNameMapping); } - private IncrementalDumperConfiguration buildDumperConfiguration(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource, - final Map tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) { - IncrementalDumperConfiguration result = new IncrementalDumperConfiguration(); + private IncrementalDumperContext buildDumperContext(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource, + final Map tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) { + IncrementalDumperContext result = new IncrementalDumperContext(); result.setJobId(jobId); result.setDataSourceName(dataSourceName); result.setDataSourceConfig(sourceDataSource); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java index fabd841a7e0f9..1fe9a79b3d9f0 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java @@ -80,7 +80,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte @Override protected PipelineDataSourceWrapper initialize() { - return dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()); + return dataSourceManager.getDataSource(taskConfig.getDumperContext().getDataSourceConfig()); } }; diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java index ca5ab63968aee..694424af48585 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java @@ -18,8 +18,8 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.prepare; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; @@ -88,7 +88,7 @@ public final class MigrationJobPreparer { * @throws SQLException SQL exception */ public void prepare(final MigrationJobItemContext jobItemContext) throws SQLException { - ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig().getClass()), + ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(jobItemContext.getTaskConfig().getDumperContext().getDataSourceConfig().getClass()), () -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration")); PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(), Collections.singleton(jobItemContext.getSourceDataSource())); if (jobItemContext.isStopping()) { @@ -169,15 +169,15 @@ private void prepareIncremental(final MigrationJobItemContext jobItemContext) { MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental(); try { - taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperConfig(), jobItemContext.getDataSourceManager())); + taskConfig.getDumperContext().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager())); } catch (final SQLException ex) { throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex); } } private void initInventoryTasks(final MigrationJobItemContext jobItemContext) { - InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig()); - InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperConfig, jobItemContext.getTaskConfig().getImporterConfig()); + InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext()); + InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperContext, jobItemContext.getTaskConfig().getImporterConfig()); jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext)); } @@ -185,15 +185,15 @@ private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator(); PipelineTableMetaDataLoader sourceMetaDataLoader = jobItemContext.getSourceMetaDataLoader(); - IncrementalDumperConfiguration dumperConfig = taskConfig.getDumperConfig(); + IncrementalDumperContext dumperContext = taskConfig.getDumperContext(); ImporterConfiguration importerConfig = taskConfig.getImporterConfig(); ExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(); - IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperConfig.getPosition(), jobItemContext.getInitProgress()); + IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getPosition(), jobItemContext.getInitProgress()); PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), pipelineChannelCreator, taskProgress); - Dumper dumper = DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, dumperConfig.getDataSourceConfig().getDatabaseType()) - .createIncrementalDumper(dumperConfig, dumperConfig.getPosition(), channel, sourceMetaDataLoader); + Dumper dumper = DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, dumperContext.getDataSourceConfig().getDatabaseType()) + .createIncrementalDumper(dumperContext, dumperContext.getPosition(), channel, sourceMetaDataLoader); Collection importers = createImporters(importerConfig, jobItemContext.getSink(), channel, jobItemContext); - PipelineTask incrementalTask = new IncrementalTask(dumperConfig.getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress); + PipelineTask incrementalTask = new IncrementalTask(dumperContext.getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress); jobItemContext.getIncrementalTasks().add(incrementalTask); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java index 297b49a4881a1..e0655cbf9c527 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.fixture; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -30,7 +30,7 @@ public final class FixtureIncrementalDumperCreator implements IncrementalDumperCreator { @Override - public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position, + public IncrementalDumper createIncrementalDumper(final IncrementalDumperContext context, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { return new FixtureIncrementalDumper(); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java index a55049c79a269..cbeec358b0c7a 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.importer; -import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java index 5227ff06efb58..a592fe3249c22 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.job.service; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper; import org.apache.shardingsphere.data.pipeline.common.constant.DataPipelineConstants; import org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition; @@ -177,13 +177,13 @@ private MigrationJobItemContext mockJobItemContext() { } private InventoryTask mockInventoryTask(final MigrationTaskConfiguration taskConfig) { - InventoryDumperConfiguration dumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig()); - dumperConfig.setPosition(new PlaceholderPosition()); - dumperConfig.setActualTableName("t_order"); - dumperConfig.setLogicTableName("t_order"); - dumperConfig.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData())); - dumperConfig.setShardingItem(0); - return new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(dumperConfig), PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), + InventoryDumperContext dumperContext = new InventoryDumperContext(taskConfig.getDumperContext()); + dumperContext.setPosition(new PlaceholderPosition()); + dumperContext.setActualTableName("t_order"); + dumperContext.setLogicTableName("t_order"); + dumperContext.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData())); + dumperContext.setShardingItem(0); + return new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(dumperContext), PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), mock(Dumper.class), mock(Importer.class), new AtomicReference<>(new PlaceholderPosition())); } } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java index d029ac73f49d5..dee50d57eef9a 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java @@ -17,8 +17,8 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.prepare; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.BaseDumperConfiguration; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.BaseDumperContext; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; @@ -52,7 +52,7 @@ class InventoryTaskSplitterTest { private MigrationJobItemContext jobItemContext; - private InventoryDumperConfiguration dumperConfig; + private InventoryDumperContext dumperContext; private PipelineDataSourceManager dataSourceManager; @@ -66,10 +66,10 @@ static void beforeClass() { @BeforeEach void setUp() { initJobItemContext(); - dumperConfig = new InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig()); + dumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext()); PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(1, "order_id", Types.INTEGER, "int", false, true, true); - dumperConfig.setUniqueKeyColumns(Collections.singletonList(columnMetaData)); - inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), dumperConfig, jobItemContext.getTaskConfig().getImporterConfig()); + dumperContext.setUniqueKeyColumns(Collections.singletonList(columnMetaData)); + inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), dumperContext, jobItemContext.getTaskConfig().getImporterConfig()); } private void initJobItemContext() { @@ -85,7 +85,7 @@ void tearDown() { @Test void assertSplitInventoryDataWithEmptyTable() throws SQLException { - initEmptyTablePrimaryEnvironment(dumperConfig); + initEmptyTablePrimaryEnvironment(dumperContext); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); assertThat(actual.size(), is(1)); InventoryTask task = actual.get(0); @@ -95,7 +95,7 @@ void assertSplitInventoryDataWithEmptyTable() throws SQLException { @Test void assertSplitInventoryDataWithIntPrimary() throws SQLException { - initIntPrimaryEnvironment(dumperConfig); + initIntPrimaryEnvironment(dumperContext); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); assertThat(actual.size(), is(10)); InventoryTask task = actual.get(9); @@ -105,7 +105,7 @@ void assertSplitInventoryDataWithIntPrimary() throws SQLException { @Test void assertSplitInventoryDataWithCharPrimary() throws SQLException { - initCharPrimaryEnvironment(dumperConfig); + initCharPrimaryEnvironment(dumperContext); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); assertThat(actual.size(), is(1)); assertThat(actual.get(0).getTaskId(), is("ds_0.t_order#0")); @@ -116,17 +116,17 @@ void assertSplitInventoryDataWithCharPrimary() throws SQLException { @Test void assertSplitInventoryDataWithoutPrimaryButWithUniqueIndex() throws SQLException { - initUniqueIndexOnNotNullColumnEnvironment(dumperConfig); + initUniqueIndexOnNotNullColumnEnvironment(dumperContext); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); assertThat(actual.size(), is(1)); } @Test void assertSplitInventoryDataWithMultipleColumnsKey() throws SQLException { - initUnionPrimaryEnvironment(dumperConfig); - try (PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())) { + initUnionPrimaryEnvironment(dumperContext); + try (PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())) { List uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(null, "t_order", new StandardPipelineTableMetaDataLoader(dataSource)); - dumperConfig.setUniqueKeyColumns(uniqueKeyColumns); + dumperContext.setUniqueKeyColumns(uniqueKeyColumns); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); assertThat(actual.size(), is(1)); } @@ -134,8 +134,8 @@ void assertSplitInventoryDataWithMultipleColumnsKey() throws SQLException { @Test void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws SQLException { - initNoPrimaryEnvironment(dumperConfig); - try (PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())) { + initNoPrimaryEnvironment(dumperContext); + try (PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())) { List uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(null, "t_order", new StandardPipelineTableMetaDataLoader(dataSource)); assertTrue(uniqueKeyColumns.isEmpty()); List inventoryTasks = inventoryTaskSplitter.splitInventoryData(jobItemContext); @@ -143,8 +143,8 @@ void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws SQLException } } - private void initEmptyTablePrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException { - DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); + private void initEmptyTablePrimaryEnvironment(final BaseDumperContext dumperContext) throws SQLException { + DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); try ( Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { @@ -153,8 +153,8 @@ private void initEmptyTablePrimaryEnvironment(final BaseDumperConfiguration dump } } - private void initIntPrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException { - DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); + private void initIntPrimaryEnvironment(final BaseDumperContext dumperContext) throws SQLException { + DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); try ( Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { @@ -166,8 +166,8 @@ private void initIntPrimaryEnvironment(final BaseDumperConfiguration dumperConfi } } - private void initCharPrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException { - DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); + private void initCharPrimaryEnvironment(final BaseDumperContext dumperContext) throws SQLException { + DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); try ( Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { @@ -177,8 +177,8 @@ private void initCharPrimaryEnvironment(final BaseDumperConfiguration dumperConf } } - private void initUnionPrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException { - DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); + private void initUnionPrimaryEnvironment(final BaseDumperContext dumperContext) throws SQLException { + DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); try ( Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { @@ -188,8 +188,8 @@ private void initUnionPrimaryEnvironment(final BaseDumperConfiguration dumperCon } } - private void initNoPrimaryEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException { - DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); + private void initNoPrimaryEnvironment(final BaseDumperContext dumperContext) throws SQLException { + DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); try ( Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { @@ -199,8 +199,8 @@ private void initNoPrimaryEnvironment(final BaseDumperConfiguration dumperConfig } } - private void initUniqueIndexOnNotNullColumnEnvironment(final BaseDumperConfiguration dumperConfig) throws SQLException { - DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); + private void initUniqueIndexOnNotNullColumnEnvironment(final BaseDumperContext dumperContext) throws SQLException { + DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); try ( Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java index 9a4eb6e485fc8..0e70960365af5 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java @@ -54,7 +54,7 @@ static void beforeClass() { @BeforeEach void setUp() { MigrationTaskConfiguration taskConfig = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig(); - taskConfig.getDumperConfig().setPosition(new PlaceholderPosition()); + taskConfig.getDumperContext().setPosition(new PlaceholderPosition()); incrementalTask = new IncrementalTask("ds_0", PipelineContextUtils.getExecuteEngine(), mock(Dumper.class), Collections.singletonList(mock(Importer.class)), new IncrementalTaskProgress(new PlaceholderPosition())); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java index 82f09ad5cf8f1..8dcd785c33af4 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java @@ -17,8 +17,8 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.task; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration; -import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager; @@ -73,21 +73,21 @@ void setUp() { @Test void assertGetProgress() throws SQLException, ExecutionException, InterruptedException, TimeoutException { - initTableData(taskConfig.getDumperConfig()); + initTableData(taskConfig.getDumperContext()); // TODO use t_order_0, and also others - InventoryDumperConfiguration inventoryDumperConfig = createInventoryDumperConfiguration("t_order", "t_order"); - AtomicReference position = new AtomicReference<>(inventoryDumperConfig.getPosition()); - InventoryTask inventoryTask = new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperConfig), + InventoryDumperContext inventoryDumperContext = createInventoryDumperContext("t_order", "t_order"); + AtomicReference position = new AtomicReference<>(inventoryDumperContext.getPosition()); + InventoryTask inventoryTask = new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperContext), PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), mock(Dumper.class), mock(Importer.class), position); CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS); assertThat(inventoryTask.getTaskProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class)); inventoryTask.close(); } - private void initTableData(final IncrementalDumperConfiguration dumperConfig) throws SQLException { + private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException { PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); try ( - PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()); + PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { statement.execute("DROP TABLE IF EXISTS t_order"); @@ -97,12 +97,12 @@ private void initTableData(final IncrementalDumperConfiguration dumperConfig) th dataSourceManager.close(); } - private InventoryDumperConfiguration createInventoryDumperConfiguration(final String logicTableName, final String actualTableName) { - InventoryDumperConfiguration result = new InventoryDumperConfiguration(taskConfig.getDumperConfig()); + private InventoryDumperContext createInventoryDumperContext(final String logicTableName, final String actualTableName) { + InventoryDumperContext result = new InventoryDumperContext(taskConfig.getDumperContext()); result.setLogicTableName(logicTableName); result.setActualTableName(actualTableName); result.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData())); - result.setPosition(null == taskConfig.getDumperConfig().getPosition() ? new IntegerPrimaryKeyPosition(0, 1000) : taskConfig.getDumperConfig().getPosition()); + result.setPosition(null == taskConfig.getDumperContext().getPosition() ? new IntegerPrimaryKeyPosition(0, 1000) : taskConfig.getDumperContext().getPosition()); return result; } } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java index b36513d3771bf..c500b0e370e22 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java @@ -74,7 +74,7 @@ private ConsistencyCheckJobItemProgressContext createConsistencyCheckJobItemProg private MigrationJobConfiguration createJobConfiguration() throws SQLException { MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()); - initTableData(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig()); + initTableData(jobItemContext.getTaskConfig().getDumperContext().getDataSourceConfig()); initTableData(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig()); return jobItemContext.getJobConfig(); }