Skip to content

Commit

Permalink
Merge TransmissionProcessContext and AbstractTransmissionProcessConte…
Browse files Browse the repository at this point in the history
…xt (#29254)
  • Loading branch information
terrymanu authored Dec 1, 2023
1 parent 6c68967 commit 373a607
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 238 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,117 @@

package org.apache.shardingsphere.data.pipeline.common.context;

import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineReadConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineWriteConfiguration;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.common.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineLazyInitializer;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

/**
* Transmission process context.
*/
public interface TransmissionProcessContext extends PipelineProcessContext {
public final class TransmissionProcessContext implements PipelineProcessContext {

/**
* Get pipeline channel creator.
*
* @return pipeline channel creator
*/
PipelineChannelCreator getPipelineChannelCreator();
@Getter
private final PipelineProcessConfiguration pipelineProcessConfig;

@Getter
private final JobRateLimitAlgorithm readRateLimitAlgorithm;

@Getter
private final JobRateLimitAlgorithm writeRateLimitAlgorithm;

@Getter
private final PipelineChannelCreator pipelineChannelCreator;

private final PipelineLazyInitializer<ExecuteEngine> inventoryDumperExecuteEngineLazyInitializer;

private final PipelineLazyInitializer<ExecuteEngine> inventoryImporterExecuteEngineLazyInitializer;

private final PipelineLazyInitializer<ExecuteEngine> incrementalExecuteEngineLazyInitializer;

public TransmissionProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
this.pipelineProcessConfig = processConfig;
PipelineReadConfiguration readConfig = processConfig.getRead();
AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
readRateLimitAlgorithm = null == readRateLimiter ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, readRateLimiter.getType(), readRateLimiter.getProps());
PipelineWriteConfiguration writeConfig = processConfig.getWrite();
AlgorithmConfiguration writeRateLimiter = writeConfig.getRateLimiter();
writeRateLimitAlgorithm = null == writeRateLimiter ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, writeRateLimiter.getType(), writeRateLimiter.getProps());
AlgorithmConfiguration streamChannel = processConfig.getStreamChannel();
pipelineChannelCreator = TypedSPILoader.getService(PipelineChannelCreator.class, streamChannel.getType(), streamChannel.getProps());
inventoryDumperExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {

@Override
protected ExecuteEngine doInitialize() {
return ExecuteEngine.newFixedThreadInstance(readConfig.getWorkerThread(), "Inventory-" + jobId);
}
};
inventoryImporterExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {

@Override
protected ExecuteEngine doInitialize() {
return ExecuteEngine.newFixedThreadInstance(writeConfig.getWorkerThread(), "Importer-" + jobId);
}
};
incrementalExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {

@Override
protected ExecuteEngine doInitialize() {
return ExecuteEngine.newCachedThreadInstance("Incremental-" + jobId);
}
};
}

/**
* Get inventory dumper execute engine.
*
* @return inventory dumper execute engine
*/
ExecuteEngine getInventoryDumperExecuteEngine();
@SneakyThrows(ConcurrentException.class)
public ExecuteEngine getInventoryDumperExecuteEngine() {
return inventoryDumperExecuteEngineLazyInitializer.get();
}

/**
* Get inventory importer execute engine.
*
* @return inventory importer execute engine
*/
ExecuteEngine getInventoryImporterExecuteEngine();
@SneakyThrows(ConcurrentException.class)
public ExecuteEngine getInventoryImporterExecuteEngine() {
return inventoryImporterExecuteEngineLazyInitializer.get();
}

/**
* Get job read rate limit algorithm.
* Get incremental execute engine.
*
* @return job read rate limit algorithm
* @return incremental execute engine
*/
JobRateLimitAlgorithm getReadRateLimitAlgorithm();
@SneakyThrows(ConcurrentException.class)
public ExecuteEngine getIncrementalExecuteEngine() {
return incrementalExecuteEngineLazyInitializer.get();
}

/**
* Get job write rate limit algorithm.
*
* @return job write rate limit algorithm
*/
JobRateLimitAlgorithm getWriteRateLimitAlgorithm();
@Override
public void close() throws Exception {
shutdownExecuteEngine(inventoryDumperExecuteEngineLazyInitializer);
shutdownExecuteEngine(inventoryImporterExecuteEngineLazyInitializer);
shutdownExecuteEngine(incrementalExecuteEngineLazyInitializer);
}

private void shutdownExecuteEngine(final PipelineLazyInitializer<ExecuteEngine> lazyInitializer) throws ConcurrentException {
if (lazyInitializer.isInitialized()) {
lazyInitializer.get().shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
Expand Down Expand Up @@ -129,7 +129,7 @@ public void execute(final ShardingContext shardingContext) {

private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
CDCProcessContext jobProcessContext = jobOption.buildProcessContext(jobConfig);
TransmissionProcessContext jobProcessContext = jobOption.buildProcessContext(jobConfig);
CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink);
}
Expand All @@ -156,7 +156,7 @@ private ImporterConfiguration buildImporterConfiguration(final CDCJobConfigurati
final TableAndSchemaNameMapper tableAndSchemaNameMapper) {
PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
jobConfig.getDataSourceConfig().getParameter());
CDCProcessContext processContext = new CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
TransmissionProcessContext processContext = new TransmissionProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm();
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new ShardingColumnsExtractor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
Expand Down Expand Up @@ -62,9 +61,9 @@ public PipelineJobInfo getJobInfo(final String jobId) {
}

@Override
public CDCProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) {
public TransmissionProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) {
TransmissionJobManager jobManager = new TransmissionJobManager(this);
return new CDCProcessContext(jobConfig.getJobId(), jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
return new TransmissionProcessContext(jobConfig.getJobId(), jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
Expand Down Expand Up @@ -58,7 +59,7 @@ public final class CDCJobItemContext implements TransmissionJobItemContext {

private final TransmissionJobItemProgress initProgress;

private final CDCProcessContext jobProcessContext;
private final TransmissionProcessContext jobProcessContext;

private final CDCTaskConfiguration taskConfig;

Expand Down Expand Up @@ -90,7 +91,7 @@ protected PipelineTableMetaDataLoader initialize() throws ConcurrentException {
}
};

public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem, final TransmissionJobItemProgress initProgress, final CDCProcessContext jobProcessContext,
public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem, final TransmissionJobItemProgress initProgress, final TransmissionProcessContext jobProcessContext,
final CDCTaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager, final PipelineSink sink) {
this.jobConfig = jobConfig;
this.shardingItem = shardingItem;
Expand Down

This file was deleted.

Loading

0 comments on commit 373a607

Please sign in to comment.