Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36931][cdc] FlinkCDC YAML supports batch mode #3812

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.event;

/**
* An {@link Event} from {@code SourceOperator} to notify {@code SchemaBatchOperator} that it has
* completed sending all CreateTableEvent in batch mode.
*/
public class CreateTableCompletedEvent implements Event {

public CreateTableCompletedEvent() {}

@Override
public String toString() {
return "CreateTableCompletedEvent{}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public SchemaChangeEventType getType() {
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
public CreateTableEvent copy(TableId newTableId) {
return new CreateTableEvent(newTableId, schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.table.api.ValidationException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -151,13 +153,26 @@ public static class DefaultContext implements Factory.Context {
private final Configuration factoryConfiguration;
private final ClassLoader classLoader;
private final Configuration pipelineConfiguration;
private final List<RouteRule> routeRules;

public DefaultContext(
Configuration factoryConfiguration,
Configuration pipelineConfiguration,
ClassLoader classLoader) {
this.factoryConfiguration = factoryConfiguration;
this.pipelineConfiguration = pipelineConfiguration;
this.routeRules = new ArrayList<>();
this.classLoader = classLoader;
}

public DefaultContext(
Configuration factoryConfiguration,
Configuration pipelineConfiguration,
List<RouteRule> routeRules,
ClassLoader classLoader) {
this.factoryConfiguration = factoryConfiguration;
this.pipelineConfiguration = pipelineConfiguration;
this.routeRules = routeRules;
this.classLoader = classLoader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ public class PipelineOptions {
.defaultValue(1)
.withDescription("Parallelism of the pipeline");

public static final ConfigOption<RunTimeMode> PIPELINE_RUNTIME_MODE =
ConfigOptions.key("runtime-mode")
.enumType(RunTimeMode.class)
.defaultValue(RunTimeMode.STREAMING)
.withDescription("Run time mode of the pipeline");

public static final ConfigOption<SchemaChangeBehavior> PIPELINE_SCHEMA_CHANGE_BEHAVIOR =
ConfigOptions.key("schema.change.behavior")
.enumType(SchemaChangeBehavior.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.pipeline;

import org.apache.flink.cdc.common.annotation.PublicEvolving;

@PublicEvolving
public enum RunTimeMode {
STREAMING,
BATCH
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.flink.cdc.composer.flink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.RunTimeMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.source.DataSource;
Expand Down Expand Up @@ -111,6 +113,13 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)
SchemaChangeBehavior schemaChangeBehavior =
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);

boolean isBatchMode = false;
if (RunTimeMode.BATCH.equals(
pipelineDefConfig.get(PipelineOptions.PIPELINE_RUNTIME_MODE))) {
isBatchMode = true;
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
}

// Initialize translators
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
TransformTranslator transformTranslator = new TransformTranslator();
Expand Down Expand Up @@ -186,6 +195,7 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)
schemaOperatorTranslator.translateRegular(
stream,
parallelism,
isBatchMode,
dataSink.getMetadataApplier()
.setAcceptedSchemaEvolutionTypes(
pipelineDef
Expand All @@ -199,13 +209,18 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)
stream,
parallelism,
parallelism,
isBatchMode,
schemaOperatorIDGenerator.generate(),
dataSink.getDataChangeEventHashFunctionProvider(parallelism));
}

// Schema Operator -> Sink -> X
sinkTranslator.translate(
pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());
pipelineDef.getSink(),
stream,
dataSink,
isBatchMode,
schemaOperatorIDGenerator.generate());
}

private void addFrameworkJars() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.cdc.runtime.operators.sink.DataBatchSinkFunctionOperator;
import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator;
import org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory;
import org.apache.flink.core.io.SimpleVersionedSerializer;
Expand All @@ -46,6 +47,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;

Expand Down Expand Up @@ -82,20 +84,29 @@ public void translate(
DataStream<Event> input,
DataSink dataSink,
OperatorID schemaOperatorID) {
translate(sinkDef, input, dataSink, false, schemaOperatorID);
}

public void translate(
SinkDef sinkDef,
DataStream<Event> input,
DataSink dataSink,
boolean isBatchMode,
OperatorID schemaOperatorID) {
// Get sink provider
EventSinkProvider eventSinkProvider = dataSink.getEventSinkProvider();
String sinkName = generateSinkName(sinkDef);
if (eventSinkProvider instanceof FlinkSinkProvider) {
// Sink V2
FlinkSinkProvider sinkProvider = (FlinkSinkProvider) eventSinkProvider;
Sink<Event> sink = sinkProvider.getSink();
sinkTo(input, sink, sinkName, schemaOperatorID);
sinkTo(input, sink, sinkName, isBatchMode, schemaOperatorID);
} else if (eventSinkProvider instanceof FlinkSinkFunctionProvider) {
// SinkFunction
FlinkSinkFunctionProvider sinkFunctionProvider =
(FlinkSinkFunctionProvider) eventSinkProvider;
SinkFunction<Event> sinkFunction = sinkFunctionProvider.getSinkFunction();
sinkTo(input, sinkFunction, sinkName, schemaOperatorID);
sinkTo(input, sinkFunction, sinkName, isBatchMode, schemaOperatorID);
}
}

Expand All @@ -104,6 +115,7 @@ void sinkTo(
DataStream<Event> input,
Sink<Event> sink,
String sinkName,
boolean isBatchMode,
OperatorID schemaOperatorID) {
DataStream<Event> stream = input;
// Pre-write topology
Expand All @@ -112,22 +124,27 @@ void sinkTo(
}

if (sink instanceof TwoPhaseCommittingSink) {
addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
addCommittingTopology(sink, stream, sinkName, isBatchMode, schemaOperatorID);
} else {
stream.transform(
SINK_WRITER_PREFIX + sinkName,
CommittableMessageTypeInfo.noOutput(),
new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
new DataSinkWriterOperatorFactory<>(sink, isBatchMode, schemaOperatorID));
}
}

private void sinkTo(
DataStream<Event> input,
SinkFunction<Event> sinkFunction,
String sinkName,
boolean isBatchMode,
OperatorID schemaOperatorID) {
DataSinkFunctionOperator sinkOperator =
new DataSinkFunctionOperator(sinkFunction, schemaOperatorID);
StreamSink<Event> sinkOperator;
if (isBatchMode) {
sinkOperator = new DataBatchSinkFunctionOperator(sinkFunction, schemaOperatorID);
} else {
sinkOperator = new DataSinkFunctionOperator(sinkFunction, schemaOperatorID);
}
final StreamExecutionEnvironment executionEnvironment = input.getExecutionEnvironment();
PhysicalTransformation<Event> transformation =
new LegacySinkTransformation<>(
Expand All @@ -143,6 +160,7 @@ private <CommT> void addCommittingTopology(
Sink<Event> sink,
DataStream<Event> inputStream,
String sinkName,
boolean isBatchMode,
OperatorID schemaOperatorID) {
TypeInformation<CommittableMessage<CommT>> typeInformation =
CommittableMessageTypeInfo.of(() -> getCommittableSerializer(sink));
Expand All @@ -158,8 +176,7 @@ private <CommT> void addCommittingTopology(
((WithPreCommitTopology<Event, CommT>) sink).addPreCommitTopology(written);
}

// TODO: Hard coding stream mode and checkpoint
boolean isBatchMode = false;
// TODO: Hard coding checkpoint
boolean isCheckpointingEnabled = true;
DataStream<CommittableMessage<CommT>> committed =
preCommitted.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public DataSource createDataSource(
// Add source JAR to environment
FactoryDiscoveryUtils.getJarPathByIdentifier(sourceFactory)
.ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));

return sourceFactory.createDataSource(
new FactoryHelper.DefaultContext(
sourceDef.getConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
import org.apache.flink.cdc.runtime.partitioning.PartitioningEventKeySelector;
import org.apache.flink.cdc.runtime.partitioning.PostPartitionProcessor;
import org.apache.flink.cdc.runtime.partitioning.RegularPrePartitionBatchOperator;
import org.apache.flink.cdc.runtime.partitioning.RegularPrePartitionOperator;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.cdc.runtime.typeutils.PartitioningEventTypeInfo;
Expand All @@ -46,11 +47,34 @@ public DataStream<Event> translateRegular(
int downstreamParallelism,
OperatorID schemaOperatorID,
HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
return translateRegular(
input,
upstreamParallelism,
downstreamParallelism,
false,
schemaOperatorID,
hashFunctionProvider);
}

public DataStream<Event> translateRegular(
DataStream<Event> input,
int upstreamParallelism,
int downstreamParallelism,
boolean isBatchMode,
OperatorID schemaOperatorID,
HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
return input.transform(
"PrePartition",
isBatchMode ? "BatchPrePartition" : "PrePartition",
new PartitioningEventTypeInfo(),
new RegularPrePartitionOperator(
schemaOperatorID, downstreamParallelism, hashFunctionProvider))
isBatchMode
? new RegularPrePartitionBatchOperator(
schemaOperatorID,
downstreamParallelism,
hashFunctionProvider)
: new RegularPrePartitionOperator(
schemaOperatorID,
downstreamParallelism,
hashFunctionProvider))
.setParallelism(upstreamParallelism)
.partitionCustom(new EventPartitioner(), new PartitioningEventKeySelector())
.map(new PostPartitionProcessor(), new EventTypeInfo())
Expand Down
Loading
Loading