diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java index a7659561743..07c2c59592f 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java @@ -20,9 +20,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.configuration.Configuration; import com.ververica.cdc.common.event.Event; import com.ververica.cdc.common.factories.DataSinkFactory; import com.ververica.cdc.common.factories.FactoryHelper; +import com.ververica.cdc.common.pipeline.PipelineConfig; import com.ververica.cdc.common.pipeline.PipelineOptions; import com.ververica.cdc.common.sink.DataSink; import com.ververica.cdc.composer.PipelineComposer; @@ -80,14 +82,15 @@ public PipelineExecution compose(PipelineDef pipelineDef) { // Source DataSourceTranslator sourceTranslator = new DataSourceTranslator(); DataStream stream = - sourceTranslator.translate(pipelineDef.getSource(), env, parallelism); + sourceTranslator.translate( + pipelineDef.getSource(), env, pipelineDef.getPipelineConfig()); // Route RouteTranslator routeTranslator = new RouteTranslator(); stream = routeTranslator.translate(stream, pipelineDef.getRoute()); // Create sink in advance as schema operator requires MetadataApplier - DataSink dataSink = createDataSink(pipelineDef.getSink()); + DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getPipelineConfig()); // Schema operator SchemaOperatorTranslator schemaOperatorTranslator = @@ -120,7 +123,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) { isBlocking); } - private DataSink createDataSink(SinkDef sinkDef) { + private DataSink createDataSink(SinkDef sinkDef, PipelineConfig pipelineConfig) { // Search the data sink factory DataSinkFactory sinkFactory = FactoryDiscoveryUtils.getFactoryByIdentifier( @@ -131,10 +134,13 @@ private DataSink createDataSink(SinkDef sinkDef) { .ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar)); // Create data sink + final Configuration contextConfig = new Configuration(); + contextConfig.addAll(sinkDef.getConfig()); + contextConfig.addAll(pipelineConfig.getConfiguration()); return sinkFactory.createDataSink( new FactoryHelper.DefaultContext( - sinkDef.getConfig().toMap(), - sinkDef.getConfig(), + contextConfig.toMap(), + contextConfig, Thread.currentThread().getContextClassLoader())); } diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/DataSourceTranslator.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/DataSourceTranslator.java index dbde8755838..f0aba228a55 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/DataSourceTranslator.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/translator/DataSourceTranslator.java @@ -21,9 +21,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.configuration.Configuration; import com.ververica.cdc.common.event.Event; import com.ververica.cdc.common.factories.DataSourceFactory; import com.ververica.cdc.common.factories.FactoryHelper; +import com.ververica.cdc.common.pipeline.PipelineConfig; +import com.ververica.cdc.common.pipeline.PipelineOptions; import com.ververica.cdc.common.source.DataSource; import com.ververica.cdc.common.source.EventSourceProvider; import com.ververica.cdc.common.source.FlinkSourceFunctionProvider; @@ -39,14 +42,18 @@ */ @Internal public class DataSourceTranslator { + public DataStreamSource translate( - SourceDef sourceDef, StreamExecutionEnvironment env, int sourceParallelism) { + SourceDef sourceDef, StreamExecutionEnvironment env, PipelineConfig pipelineConfig) { // Search the data source factory DataSourceFactory sourceFactory = FactoryDiscoveryUtils.getFactoryByIdentifier( sourceDef.getType(), DataSourceFactory.class); // Create data source + final Configuration contextConfig = new Configuration(); + contextConfig.addAll(sourceDef.getConfig()); + contextConfig.addAll(pipelineConfig.getConfiguration()); DataSource dataSource = sourceFactory.createDataSource( new FactoryHelper.DefaultContext( @@ -59,6 +66,7 @@ public DataStreamSource translate( .ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar)); // Get source provider + final int sourceParallelism = pipelineConfig.get(PipelineOptions.GLOBAL_PARALLELISM); EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider(); if (eventSourceProvider instanceof FlinkSourceProvider) { // Source