Skip to content

Commit

Permalink
[cdc-composer] Pass pipeline configuration during construct DataSourc…
Browse files Browse the repository at this point in the history
…e/DataSink
  • Loading branch information
leonardBang committed Dec 3, 2023
1 parent 5828475 commit 942a8dc
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.factories.DataSinkFactory;
import com.ververica.cdc.common.factories.FactoryHelper;
import com.ververica.cdc.common.pipeline.PipelineConfig;
import com.ververica.cdc.common.pipeline.PipelineOptions;
import com.ververica.cdc.common.sink.DataSink;
import com.ververica.cdc.composer.PipelineComposer;
Expand Down Expand Up @@ -80,14 +82,15 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
// Source
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
sourceTranslator.translate(pipelineDef.getSource(), env, parallelism);
sourceTranslator.translate(
pipelineDef.getSource(), env, pipelineDef.getPipelineConfig());

// Route
RouteTranslator routeTranslator = new RouteTranslator();
stream = routeTranslator.translate(stream, pipelineDef.getRoute());

// Create sink in advance as schema operator requires MetadataApplier
DataSink dataSink = createDataSink(pipelineDef.getSink());
DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getPipelineConfig());

// Schema operator
SchemaOperatorTranslator schemaOperatorTranslator =
Expand Down Expand Up @@ -120,7 +123,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
isBlocking);
}

private DataSink createDataSink(SinkDef sinkDef) {
private DataSink createDataSink(SinkDef sinkDef, PipelineConfig pipelineConfig) {
// Search the data sink factory
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
Expand All @@ -131,10 +134,13 @@ private DataSink createDataSink(SinkDef sinkDef) {
.ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));

// Create data sink
final Configuration contextConfig = new Configuration();
contextConfig.addAll(sinkDef.getConfig());
contextConfig.addAll(pipelineConfig.getConfiguration());
return sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
sinkDef.getConfig().toMap(),
sinkDef.getConfig(),
contextConfig.toMap(),
contextConfig,
Thread.currentThread().getContextClassLoader()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.factories.DataSourceFactory;
import com.ververica.cdc.common.factories.FactoryHelper;
import com.ververica.cdc.common.pipeline.PipelineConfig;
import com.ververica.cdc.common.pipeline.PipelineOptions;
import com.ververica.cdc.common.source.DataSource;
import com.ververica.cdc.common.source.EventSourceProvider;
import com.ververica.cdc.common.source.FlinkSourceFunctionProvider;
Expand All @@ -39,14 +42,18 @@
*/
@Internal
public class DataSourceTranslator {

public DataStreamSource<Event> translate(
SourceDef sourceDef, StreamExecutionEnvironment env, int sourceParallelism) {
SourceDef sourceDef, StreamExecutionEnvironment env, PipelineConfig pipelineConfig) {
// Search the data source factory
DataSourceFactory sourceFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
sourceDef.getType(), DataSourceFactory.class);

// Create data source
final Configuration contextConfig = new Configuration();
contextConfig.addAll(sourceDef.getConfig());
contextConfig.addAll(pipelineConfig.getConfiguration());
DataSource dataSource =
sourceFactory.createDataSource(
new FactoryHelper.DefaultContext(
Expand All @@ -59,6 +66,7 @@ public DataStreamSource<Event> translate(
.ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));

// Get source provider
final int sourceParallelism = pipelineConfig.get(PipelineOptions.GLOBAL_PARALLELISM);
EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider();
if (eventSourceProvider instanceof FlinkSourceProvider) {
// Source
Expand Down

0 comments on commit 942a8dc

Please sign in to comment.