Skip to content

Commit

Permalink
【源码阅读1.1.0】flink cdc同步
Browse files Browse the repository at this point in the history
  • Loading branch information
liujian committed Dec 27, 2024
1 parent 83ea986 commit 598ecb8
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
/**
* MysqlCDCBuilder
**/
//todo kafka sink
public class KafkaSinkBuilder extends AbstractSinkBuilder implements Serializable {

public static final String KEY_WORD = "datastream-kafka";
Expand Down Expand Up @@ -100,14 +101,15 @@ public String getHandle() {
public SinkBuilder create(FlinkCDCConfig config) {
return new KafkaSinkBuilder(config);
}

//todo sink逻辑入口
@Override
public DataStreamSource build(
CDCBuilder cdcBuilder,
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
Properties kafkaProducerConfig = getProperties();
//todo 明确topic的配置,则发送到该topic
if (Asserts.isNotNullString(config.getSink().get("topic"))) {
org.apache.flink.connector.kafka.sink.KafkaSinkBuilder<String> kafkaSinkBuilder =
KafkaSink.<String>builder()
Expand All @@ -129,6 +131,7 @@ public DataStreamSource build(
KafkaSink<String> kafkaSink = kafkaSinkBuilder.build();
dataStreamSource.sinkTo(kafkaSink);
} else {
//todo 没有明确topic的配置【sink.topic】,所有Change Log会被写入对应库表名的topic
Map<Table, OutputTag<String>> tagMap = new LinkedHashMap<>();
Map<String, Table> tableMap = new LinkedHashMap<>();
ObjectMapper objectMapper = new ObjectMapper();
Expand All @@ -149,6 +152,7 @@ public DataStreamSource build(
List<Table> tableList = schema.getTables().stream()
.sorted(Comparator.comparing(Table::getName))
.collect(Collectors.toList());
//todo 构建OutputTag
for (Table table : tableList) {
String sinkTableName = getSinkTableName(table);
OutputTag<String> outputTag = new OutputTag<String>(sinkTableName) {};
Expand All @@ -168,14 +172,17 @@ public void processElement(Map map, ProcessFunction<Map, String>.Context ctx, Co
tableMap.get(source.get(schemaFieldName).toString() + "."
+ source.get("table").toString());
OutputTag<String> outputTag = tagMap.get(table);
//todo 将不同表的changlog数据写入不同的outputTag中
ctx.output(outputTag, result);
} catch (Exception e) {
out.collect(objectMapper.writeValueAsString(map));
}
}
});
tagMap.forEach((k, v) -> {
//todo 获取sink topic
String topic = getSinkTableName(k);
//todo 构建不同的sink
org.apache.flink.connector.kafka.sink.KafkaSinkBuilder<String> kafkaSinkBuilder =
KafkaSink.<String>builder()
.setBootstrapServers(config.getSink().get("brokers"))
Expand All @@ -195,6 +202,7 @@ public void processElement(Map map, ProcessFunction<Map, String>.Context ctx, Co
kafkaProducerConfig.getProperty("transactional.id") + "-" + topic);
}
KafkaSink<String> kafkaSink = kafkaSinkBuilder.build();
//todo 将不同的SideOutput写入对应的topic中
process.getSideOutput(v).rebalance().sinkTo(kafkaSink).name(topic);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
jdbcProperties.setProperty(key, value);
}
});

//todo flinkcdc connector
MySqlSourceBuilder<String> sourceBuilder = MySqlSource.<String>builder()
.hostname(config.getHostname())
.port(config.getPort())
Expand All @@ -106,6 +106,7 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {

String database = config.getDatabase();
if (Asserts.isNotNullString(database)) {
//todo 多库同步
String[] databases = database.split(FlinkParamConstant.SPLIT);
sourceBuilder.databaseList(databases);
} else {
Expand All @@ -114,15 +115,18 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {

List<String> schemaTableNameList = config.getSchemaTableNameList();
if (Asserts.isNotNullCollection(schemaTableNameList)) {
//todo 可以多表同步
sourceBuilder.tableList(schemaTableNameList.toArray(new String[0]));
} else {
sourceBuilder.tableList();
}

sourceBuilder.deserializer(new JsonDebeziumDeserializationSchema());
//todo 设置debezium配置
sourceBuilder.debeziumProperties(debeziumProperties);
//todo 设置jdbc配置
sourceBuilder.jdbcProperties(jdbcProperties);

//todo 启动模式
if (Asserts.isNotNullString(config.getStartupMode())) {
switch (config.getStartupMode().toLowerCase()) {
case "initial":
Expand Down Expand Up @@ -205,7 +209,7 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {
if (Asserts.isEqualsIgnoreCase(schemaChanges, "true")) {
sourceBuilder.includeSchemaChanges(true);
}

//todo mysql cdc source
return env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ public static String pretreatStatement(Executor executor, String statement) {
public static FlinkInterceptorResult build(Executor executor, String statement) {
boolean noExecute = false;
TableResult tableResult = null;
//todo 解析Operation
Operation operation = Operations.buildOperation(statement);
if (Asserts.isNotNull(operation)) {
//todo 执行Operation
tableResult = operation.execute(executor);
noExecute = operation.noExecute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public static Map<String, List<String>> generateParser(String sql) {
sql = sql.replace("\r\n", " ").replace("\n", " ") + " ENDOFSQL";
if (contains(sql, "(insert\\s+into)(.+)(select)(.+)(from)(.+)")) {
tmp = new InsertSelectSqlParser(sql);
//todo 解析execute cdcsource
} else if (contains(sql, "(execute\\s+cdcsource)")) {
tmp = new CreateCDCSourceSqlParser(sql);
} else if (contains(sql, "(select)(.+)(from)(.+)")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
*
* @since 2022/1/29 23:25
*/
//todo EXECUTE CDCSOURCE !!!!!!
public class CreateCDCSourceOperation extends AbstractOperation implements Operation {

private static final String KEY_WORD = "EXECUTE CDCSOURCE";
Expand Down Expand Up @@ -188,8 +189,10 @@ public TableResult execute(Executor executor) {
streamExecutionEnvironment.enableCheckpointing(config.getCheckpoint());
logger.info("Set checkpoint: {}", config.getCheckpoint());
}
//todo source:获取数据
DataStreamSource<String> streamSource = cdcBuilder.build(streamExecutionEnvironment);
logger.info("Build {} successful...", config.getType());
//todo sink:消费数据
sinkBuilder.build(
cdcBuilder, streamExecutionEnvironment, executor.getCustomTableEnvironment(), streamSource);
logger.info("Build CDCSOURCE Task successful!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
/** @since 0.6.8 */
public class CdcSourceTests {


@Ignore
@Test
public void printTest() throws Exception {
Expand Down

0 comments on commit 598ecb8

Please sign in to comment.