Skip to content

Commit

Permalink
Remove TransmissionProcessContext.pipelineChannelCreator (#29549)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Dec 26, 2023
1 parent 000380d commit dd1c568
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ public interface PipelineProcessContext extends AutoCloseable {
*
* @return pipeline process config
*/
PipelineProcessConfiguration getPipelineProcessConfig();
PipelineProcessConfiguration getProcessConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineReadConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineLazyInitializer;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
Expand All @@ -37,34 +36,28 @@
public final class TransmissionProcessContext implements PipelineProcessContext {

@Getter
private final PipelineProcessConfiguration pipelineProcessConfig;
private final PipelineProcessConfiguration processConfig;

@Getter
private final JobRateLimitAlgorithm readRateLimitAlgorithm;

@Getter
private final JobRateLimitAlgorithm writeRateLimitAlgorithm;

@Getter
private final PipelineChannelCreator pipelineChannelCreator;

private final PipelineLazyInitializer<ExecuteEngine> inventoryDumperExecuteEngineLazyInitializer;

private final PipelineLazyInitializer<ExecuteEngine> inventoryImporterExecuteEngineLazyInitializer;

private final PipelineLazyInitializer<ExecuteEngine> incrementalExecuteEngineLazyInitializer;

public TransmissionProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
this.pipelineProcessConfig = processConfig;
this.processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
PipelineReadConfiguration readConfig = processConfig.getRead();
AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
readRateLimitAlgorithm = null == readRateLimiter ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, readRateLimiter.getType(), readRateLimiter.getProps());
PipelineWriteConfiguration writeConfig = processConfig.getWrite();
AlgorithmConfiguration writeRateLimiter = writeConfig.getRateLimiter();
writeRateLimitAlgorithm = null == writeRateLimiter ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, writeRateLimiter.getType(), writeRateLimiter.getProps());
AlgorithmConfiguration streamChannel = processConfig.getStreamChannel();
pipelineChannelCreator = TypedSPILoader.getService(PipelineChannelCreator.class, streamChannel.getType(), streamChannel.getProps());
inventoryDumperExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public List<InventoryTask> splitInventoryData(final TransmissionJobItemContext j
TransmissionProcessContext processContext = jobItemContext.getJobProcessContext();
for (InventoryDumperContext each : splitInventoryDumperContext(jobItemContext)) {
AtomicReference<IngestPosition> position = new AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position);
PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getProcessConfig().getStreamChannel(), importerConfig.getBatchSize(), position);
Dumper dumper = new InventoryDumper(each, channel, sourceDataSource, jobItemContext.getSourceMetaDataLoader());
Importer importer = new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3000L, jobItemContext.getSink(), jobItemContext);
result.add(new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(),
Expand Down Expand Up @@ -132,7 +132,7 @@ private Collection<InventoryDumperContext> splitByPrimaryKey(final InventoryDump
}
Collection<InventoryDumperContext> result = new LinkedList<>();
TransmissionProcessContext jobProcessContext = jobItemContext.getJobProcessContext();
PipelineReadConfiguration readConfig = jobProcessContext.getPipelineProcessConfig().getRead();
PipelineReadConfiguration readConfig = jobProcessContext.getProcessConfig().getRead();
int batchSize = readConfig.getBatchSize();
JobRateLimitAlgorithm rateLimitAlgorithm = jobProcessContext.getReadRateLimitAlgorithm();
Collection<IngestPosition> inventoryPositions = getInventoryPositions(dumperContext, jobItemContext, dataSource);
Expand Down Expand Up @@ -188,7 +188,7 @@ private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final Inve
}
Collection<IngestPosition> result = new LinkedList<>();
Range<Long> uniqueKeyValuesRange = getUniqueKeyValuesRange(jobItemContext, dataSource, dumperContext);
int shardingSize = jobItemContext.getJobProcessContext().getPipelineProcessConfig().getRead().getShardingSize();
int shardingSize = jobItemContext.getJobProcessContext().getProcessConfig().getRead().getShardingSize();
long splitCount = tableRecordsCount / shardingSize + (tableRecordsCount % shardingSize > 0 ? 1 : 0);
long interval = (uniqueKeyValuesRange.getMaximum() - uniqueKeyValuesRange.getMinimum()) / splitCount;
IntervalToRangeIterator rangeIterator = new IntervalToRangeIterator(uniqueKeyValuesRange.getMinimum(), uniqueKeyValuesRange.getMaximum(), interval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -66,24 +68,25 @@ public static IncrementalTaskProgress createIncrementalTaskProgress(final Ingest
/**
* Create pipeline channel for inventory task.
*
* @param channelCreator pipeline channel creator
* @param channelConfig pipeline channel configuration
* @param importerBatchSize importer batch size
* @param position ingest position
* @return created pipeline channel
*/
public static PipelineChannel createInventoryChannel(final PipelineChannelCreator channelCreator, final int importerBatchSize, final AtomicReference<IngestPosition> position) {
return channelCreator.newInstance(importerBatchSize, new InventoryTaskAckCallback(position));
public static PipelineChannel createInventoryChannel(final AlgorithmConfiguration channelConfig, final int importerBatchSize, final AtomicReference<IngestPosition> position) {
return TypedSPILoader.getService(PipelineChannelCreator.class, channelConfig.getType(), channelConfig.getProps()).newInstance(importerBatchSize, new InventoryTaskAckCallback(position));
}

/**
* Create pipeline channel for incremental task.
*
* @param concurrency output concurrency
* @param channelCreator pipeline channel creator
* @param channelConfig pipeline channel configuration
* @param progress incremental task progress
* @return created pipeline channel
*/
public static PipelineChannel createIncrementalChannel(final int concurrency, final PipelineChannelCreator channelCreator, final IncrementalTaskProgress progress) {
public static PipelineChannel createIncrementalChannel(final int concurrency, final AlgorithmConfiguration channelConfig, final IncrementalTaskProgress progress) {
PipelineChannelCreator channelCreator = TypedSPILoader.getService(PipelineChannelCreator.class, channelConfig.getType(), channelConfig.getProps());
return 1 == concurrency
? channelCreator.newInstance(5, new IncrementalTaskAckCallback(progress))
: new MultiplexPipelineChannel(concurrency, channelCreator, 5, new IncrementalTaskAckCallback(progress));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected CDCJobItemContext buildJobItemContext(final PipelineJobConfiguration j
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), "STREAMING"));
TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
CDCTaskConfiguration taskConfig = buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
CDCTaskConfiguration taskConfig = buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem, jobProcessContext.getProcessConfig());
return new CDCJobItemContext((CDCJobConfiguration) jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager(), sink);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final At
for (InventoryDumperContext each : new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()), importerConfig)
.splitInventoryDumperContext(jobItemContext)) {
AtomicReference<IngestPosition> position = new AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position);
PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getProcessConfig().getStreamChannel(), importerConfig.getBatchSize(), position);
if (!(position.get() instanceof IngestFinishedPosition)) {
channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
}
Expand All @@ -139,7 +139,8 @@ private void initIncrementalTask(final CDCJobItemContext jobItemContext, final A
IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress());
PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), jobItemContext.getJobProcessContext().getPipelineChannelCreator(), taskProgress);
PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(
importerConfig.getConcurrency(), jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(), taskProgress);
channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
Dumper dumper = DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
.createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protected ExecuteEngine doInitialize() {
}

@Override
public PipelineProcessConfiguration getPipelineProcessConfig() {
public PipelineProcessConfiguration getProcessConfig() {
return PipelineProcessConfigurationUtils.convertWithDefaultValue(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public final class MigrationJob extends AbstractSeparablePipelineJob<MigrationJo
@Override
protected MigrationJobItemContext buildJobItemContext(final MigrationJobConfiguration jobConfig,
final int shardingItem, final TransmissionJobItemProgress jobItemProgress, final TransmissionProcessContext jobProcessContext) {
MigrationTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
MigrationTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem, jobItemProgress, jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
Expand Down Expand Up @@ -199,13 +198,13 @@ private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {

private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) {
MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
PipelineTableMetaDataLoader sourceMetaDataLoader = jobItemContext.getSourceMetaDataLoader();
IncrementalDumperContext dumperContext = taskConfig.getDumperContext();
ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
ExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress());
PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), pipelineChannelCreator, taskProgress);
PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(
importerConfig.getConcurrency(), jobItemContext.getJobProcessContext().getProcessConfig().getStreamChannel(), taskProgress);
Dumper dumper = DatabaseTypedSPILoader.getService(DialectIncrementalDumperCreator.class, dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
.createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader);
Collection<Importer> importers = createImporters(importerConfig, jobItemContext.getSink(), channel, jobItemContext);
Expand Down

0 comments on commit dd1c568

Please sign in to comment.