diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineProcessContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineProcessContext.java index 1284c64309a50..b63c29eab644b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineProcessContext.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineProcessContext.java @@ -29,5 +29,5 @@ public interface PipelineProcessContext extends AutoCloseable { * * @return pipeline process config */ - PipelineProcessConfiguration getPipelineProcessConfig(); + PipelineProcessConfiguration getProcessConfig(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/TransmissionProcessContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/TransmissionProcessContext.java index 3c129f4dd7ccf..cb312d7d8b2b0 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/TransmissionProcessContext.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/TransmissionProcessContext.java @@ -20,12 +20,11 @@ import lombok.Getter; import lombok.SneakyThrows; import org.apache.commons.lang3.concurrent.ConcurrentException; +import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine; import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils; import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration; import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration; -import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine; -import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator; import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.core.util.PipelineLazyInitializer; import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration; @@ -37,7 +36,7 @@ public final class TransmissionProcessContext implements PipelineProcessContext { @Getter - private final PipelineProcessConfiguration pipelineProcessConfig; + private final PipelineProcessConfiguration processConfig; @Getter private final JobRateLimitAlgorithm readRateLimitAlgorithm; @@ -45,9 +44,6 @@ public final class TransmissionProcessContext implements PipelineProcessContext @Getter private final JobRateLimitAlgorithm writeRateLimitAlgorithm; - @Getter - private final PipelineChannelCreator pipelineChannelCreator; - private final PipelineLazyInitializer inventoryDumperExecuteEngineLazyInitializer; private final PipelineLazyInitializer inventoryImporterExecuteEngineLazyInitializer; @@ -55,16 +51,13 @@ public final class TransmissionProcessContext implements PipelineProcessContext private final PipelineLazyInitializer incrementalExecuteEngineLazyInitializer; public TransmissionProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) { - PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig); - this.pipelineProcessConfig = processConfig; + this.processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig); PipelineReadConfiguration readConfig = processConfig.getRead(); AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter(); readRateLimitAlgorithm = null == readRateLimiter ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, readRateLimiter.getType(), readRateLimiter.getProps()); PipelineWriteConfiguration writeConfig = processConfig.getWrite(); AlgorithmConfiguration writeRateLimiter = writeConfig.getRateLimiter(); writeRateLimitAlgorithm = null == writeRateLimiter ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, writeRateLimiter.getType(), writeRateLimiter.getProps()); - AlgorithmConfiguration streamChannel = processConfig.getStreamChannel(); - pipelineChannelCreator = TypedSPILoader.getService(PipelineChannelCreator.class, streamChannel.getType(), streamChannel.getProps()); inventoryDumperExecuteEngineLazyInitializer = new PipelineLazyInitializer() { @Override diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java index 464b0ec048ff0..f53be4e4c8514 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java @@ -83,7 +83,7 @@ public List splitInventoryData(final TransmissionJobItemContext j TransmissionProcessContext processContext = jobItemContext.getJobProcessContext(); for (InventoryDumperContext each : splitInventoryDumperContext(jobItemContext)) { AtomicReference position = new AtomicReference<>(each.getCommonContext().getPosition()); - PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position); + PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getProcessConfig().getStreamChannel(), importerConfig.getBatchSize(), position); Dumper dumper = new InventoryDumper(each, channel, sourceDataSource, jobItemContext.getSourceMetaDataLoader()); Importer importer = new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3000L, jobItemContext.getSink(), jobItemContext); result.add(new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(), @@ -132,7 +132,7 @@ private Collection splitByPrimaryKey(final InventoryDump } Collection result = new LinkedList<>(); TransmissionProcessContext jobProcessContext = jobItemContext.getJobProcessContext(); - PipelineReadConfiguration readConfig = jobProcessContext.getPipelineProcessConfig().getRead(); + PipelineReadConfiguration readConfig = jobProcessContext.getProcessConfig().getRead(); int batchSize = readConfig.getBatchSize(); JobRateLimitAlgorithm rateLimitAlgorithm = jobProcessContext.getReadRateLimitAlgorithm(); Collection inventoryPositions = getInventoryPositions(dumperContext, jobItemContext, dataSource); @@ -188,7 +188,7 @@ private Collection getPositionByIntegerUniqueKeyRange(final Inve } Collection result = new LinkedList<>(); Range uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dataSource, dumperContext); - int shardingSize = jobItemContext.getJobProcessContext().getPipelineProcessConfig().getRead().getShardingSize(); + int shardingSize = jobItemContext.getJobProcessContext().getProcessConfig().getRead().getShardingSize(); long splitCount = tableRecordsCount / shardingSize + (tableRecordsCount % shardingSize > 0 ? 1 : 0); long interval = (uniqueKeyValuesRange.getMaximum() - uniqueKeyValuesRange.getMinimum()) / splitCount; IntervalToRangeIterator rangeIterator = new IntervalToRangeIterator(uniqueKeyValuesRange.getMinimum(), uniqueKeyValuesRange.getMaximum(), interval); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java index 74ba13eafc481..2f749f1c34922 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java @@ -26,6 +26,8 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress; +import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration; +import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; @@ -66,24 +68,25 @@ public static IncrementalTaskProgress createIncrementalTaskProgress(final Ingest /** * Create pipeline channel for inventory task. * - * @param channelCreator pipeline channel creator + * @param channelConfig pipeline channel configuration * @param importerBatchSize importer batch size * @param position ingest position * @return created pipeline channel */ - public static PipelineChannel createInventoryChannel(final PipelineChannelCreator channelCreator, final int importerBatchSize, final AtomicReference position) { - return channelCreator.newInstance(importerBatchSize, new InventoryTaskAckCallback(position)); + public static PipelineChannel createInventoryChannel(final AlgorithmConfiguration channelConfig, final int importerBatchSize, final AtomicReference position) { + return TypedSPILoader.getService(PipelineChannelCreator.class, channelConfig.getType(), channelConfig.getProps()).newInstance(importerBatchSize, new InventoryTaskAckCallback(position)); } /** * Create pipeline channel for incremental task. * * @param concurrency output concurrency - * @param channelCreator pipeline channel creator + * @param channelConfig pipeline channel configuration * @param progress incremental task progress * @return created pipeline channel */ - public static PipelineChannel createIncrementalChannel(final int concurrency, final PipelineChannelCreator channelCreator, final IncrementalTaskProgress progress) { + public static PipelineChannel createIncrementalChannel(final int concurrency, final AlgorithmConfiguration channelConfig, final IncrementalTaskProgress progress) { + PipelineChannelCreator channelCreator = TypedSPILoader.getService(PipelineChannelCreator.class, channelConfig.getType(), channelConfig.getProps()); return 1 == concurrency ? channelCreator.newInstance(5, new IncrementalTaskAckCallback(progress)) : new MultiplexPipelineChannel(concurrency, channelCreator, 5, new IncrementalTaskAckCallback(progress)); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java index f39a246d5f10e..d234606fe9514 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java @@ -94,7 +94,7 @@ protected CDCJobItemContext buildJobItemContext(final PipelineJobConfiguration j PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue( processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), "STREAMING")); TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig); - CDCTaskConfiguration taskConfig = buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); + CDCTaskConfiguration taskConfig = buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem, jobProcessContext.getProcessConfig()); return new CDCJobItemContext((CDCJobConfiguration) jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager(), sink); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index 4b5fd0e8301c9..a9a55734b89c4 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -118,7 +118,7 @@ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final At for (InventoryDumperContext each : new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()), importerConfig) .splitInventoryDumperContext(jobItemContext)) { AtomicReference position = new AtomicReference<>(each.getCommonContext().getPosition()); - PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position); + PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getProcessConfig().getStreamChannel(), importerConfig.getBatchSize(), position); if (!(position.get() instanceof IngestFinishedPosition)) { channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext)); } @@ -139,7 +139,8 @@ private void initIncrementalTask(final CDCJobItemContext jobItemContext, final A IncrementalDumperContext dumperContext = taskConfig.getDumperContext(); ImporterConfiguration importerConfig = taskConfig.getImporterConfig(); IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress()); - PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), jobItemContext.getJobProcessContext().getPipelineChannelCreator(), taskProgress); + PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel( + importerConfig.getConcurrency(), jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(), taskProgress); channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext)); Dumper dumper = DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()) .createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader()); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java index 3201b01d1657d..2d279047607ae 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java @@ -45,7 +45,7 @@ protected ExecuteEngine doInitialize() { } @Override - public PipelineProcessConfiguration getPipelineProcessConfig() { + public PipelineProcessConfiguration getProcessConfig() { return PipelineProcessConfigurationUtils.convertWithDefaultValue(null); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java index 8b4c7dceb6fc0..91563b587b255 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java @@ -58,7 +58,7 @@ public final class MigrationJob extends AbstractSeparablePipelineJob importers = createImporters(importerConfig, jobItemContext.getSink(), channel, jobItemContext);