Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:kafka sync 支持canal-json #1848

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void buildStatementWrapper() throws SQLException {
// restoration
statementWrapper = new RestoreWrapperProxy(dbConn, jdbcDialect, false);
} else {
if (useAbstractColumn || CollectionUtils.isEmpty(jdbcConfig.getUniqueKey())) {
if (CollectionUtils.isEmpty(jdbcConfig.getUniqueKey())) {
// sync or sql appendOnly
FieldNamedPreparedStatement fieldNamedPreparedStatement =
FieldNamedPreparedStatement.prepareStatement(
Expand All @@ -131,12 +131,12 @@ public void buildStatementWrapper() throws SQLException {
new SimpleStatementWrapper(fieldNamedPreparedStatement, rowConverter);
} else {
// sql retract
buildRetractStatementExecutor();
buildRetractStatementExecutor(useAbstractColumn);
}
}
}

private void buildRetractStatementExecutor() throws SQLException {
private void buildRetractStatementExecutor(boolean useAbstractColumn) throws SQLException {
SimpleStatementWrapper deleteExecutor =
new SimpleStatementWrapper(
FieldNamedPreparedStatement.prepareStatement(
Expand Down Expand Up @@ -164,7 +164,10 @@ private void buildRetractStatementExecutor() throws SQLException {
upsertExecutor,
deleteExecutor,
JdbcUtil.getKeyExtractor(
columnNameList, jdbcConfig.getUniqueKey(), keyRowType, false));
columnNameList,
jdbcConfig.getUniqueKey(),
keyRowType,
useAbstractColumn));
}

private JdbcBatchStatementWrapper<RowData> getInsertOrUpdateExecutor() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.chunjun.connector.kafka.converter;

import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.config.TypeConfig;
import com.dtstack.chunjun.connector.kafka.conf.KafkaConfig;
import com.dtstack.chunjun.element.AbstractBaseColumn;
import com.dtstack.chunjun.element.ColumnRowData;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;

import static java.lang.String.format;

public class KafkaCanalSyncConverter extends KafkaSyncConverter {

private static final String FIELD_OLD = "old";
private static final String FIELD_TYPE = "type";
private static final String FIELD_DATA = "data";
private static final String OP_INSERT = "INSERT";
private static final String OP_UPDATE = "UPDATE";
private static final String OP_DELETE = "DELETE";
private static final String OP_CREATE = "CREATE";

private static final boolean IGNORE_PARSE_ERRORS = false;

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/** kafka Conf */
private final KafkaConfig kafkaConfig;

public KafkaCanalSyncConverter(RowType rowType, KafkaConfig kafkaConfig) {
super(rowType, kafkaConfig);
this.commonConfig = this.kafkaConfig = kafkaConfig;
List<TypeConfig> typeList =
kafkaConfig.getColumn().stream()
.map(FieldConfig::getType)
.collect(Collectors.toList());
this.toInternalConverters = new ArrayList<>();
for (TypeConfig s : typeList) {
toInternalConverters.add(
wrapIntoNullableInternalConverter(createInternalConverter(s.getType())));
}
}

@Override
public List<RowData> toInternalList(ConsumerRecord<byte[], byte[]> input) throws Exception {
List<RowData> rowDataList = new ArrayList<>();
JsonNode root = OBJECT_MAPPER.readTree(input.value());
String type = root.get(FIELD_TYPE).textValue();
ArrayNode data = (ArrayNode) root.get(FIELD_DATA);
try {
if (OP_INSERT.equals(type)) {
fillRowDataList(data, RowKind.INSERT, rowDataList);
} else if (OP_UPDATE.equals(type)) {
ArrayNode old = (ArrayNode) root.get(FIELD_OLD);
fillRowDataList(old, data, rowDataList);
} else if (OP_DELETE.equals(type)) {
fillRowDataList(data, RowKind.DELETE, rowDataList);
} else {
if (!IGNORE_PARSE_ERRORS) {
throw new IOException(
format(
"Unknown \"type\" value \"%s\". The Canal JSON message is '%s'",
type, new String(input.value())));
}
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!IGNORE_PARSE_ERRORS) {
throw new IOException(
format("Corrupt Canal JSON message '%s'.", new String(input.value())), t);
}
}
return rowDataList;
}

private void fillRowDataList(ArrayNode old, ArrayNode data, List<RowData> rowDataList)
throws Exception {
List<RowData> updateBefore = new ArrayList<>();
List<RowData> updateAfter = new ArrayList<>();
fillRowDataList(data, RowKind.UPDATE_AFTER, updateAfter);
for (int i = 0; i < old.size(); i++) {
JsonNode item = old.get(i);
ColumnRowData columnRowData = new ColumnRowData(old.size());
columnRowData.setRowKind(RowKind.UPDATE_BEFORE);
for (int j = 0; j < kafkaConfig.getColumn().size(); j++) {
JsonNode field = item.get(kafkaConfig.getColumn().get(j).getName());
if (Objects.isNull(field)) {
columnRowData.addField(((ColumnRowData) updateAfter.get(i)).getField(j));
} else {
AbstractBaseColumn baseColumn =
(AbstractBaseColumn)
toInternalConverters
.get(j)
.deserialize(
item.get(
kafkaConfig
.getColumn()
.get(j)
.getName())
.textValue());
columnRowData.addField(
assembleFieldProps(kafkaConfig.getColumn().get(j), baseColumn));
}
}
updateBefore.add(columnRowData);
}
rowDataList.addAll(updateBefore);
rowDataList.addAll(updateAfter);
}

private void fillRowDataList(ArrayNode data, RowKind insert, List<RowData> rowDataList)
throws Exception {
for (int i = 0; i < data.size(); i++) {
JsonNode item = data.get(i);
ColumnRowData columnRowData = new ColumnRowData(data.size());
columnRowData.setRowKind(insert);
for (int j = 0; j < kafkaConfig.getColumn().size(); j++) {
AbstractBaseColumn baseColumn =
(AbstractBaseColumn)
toInternalConverters
.get(j)
.deserialize(
item.get(kafkaConfig.getColumn().get(j).getName())
.textValue());
columnRowData.addField(
assembleFieldProps(kafkaConfig.getColumn().get(j), baseColumn));
}
rowDataList.add(columnRowData);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
* Date: 2021/03/04 Company: www.dtstack.com
Expand All @@ -45,13 +46,16 @@ public class RowDeserializationSchema extends DynamicKafkaDeserializationSchema
/** kafka conf */
private final KafkaConfig kafkaConfig;

private final boolean isCdc;

public RowDeserializationSchema(
KafkaConfig kafkaConfig,
AbstractRowConverter<ConsumerRecord<byte[], byte[]>, Object, byte[], String>
converter) {
AbstractRowConverter<ConsumerRecord<byte[], byte[]>, Object, byte[], String> converter,
boolean isCdc) {
super(1, null, null, null, null, false, null, null, false);
this.kafkaConfig = kafkaConfig;
this.converter = converter;
this.isCdc = isCdc;
}

@Override
Expand All @@ -69,7 +73,14 @@ public void open(DeserializationSchema.InitializationContext context) {
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector) {
try {
beforeDeserialize(record);
collector.collect(converter.toInternal(record));
if (isCdc) {
List<RowData> rowDataList = converter.toInternalList(record);
for (RowData rowData : rowDataList) {
collector.collect(rowData);
}
} else {
collector.collect(converter.toInternal(record));
}
} catch (Exception e) {
String data = null;
if (record.value() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.dtstack.chunjun.config.SyncConfig;
import com.dtstack.chunjun.connector.kafka.adapter.StartupModeAdapter;
import com.dtstack.chunjun.connector.kafka.conf.KafkaConfig;
import com.dtstack.chunjun.connector.kafka.converter.KafkaCanalSyncConverter;
import com.dtstack.chunjun.connector.kafka.converter.KafkaRawTypeMapping;
import com.dtstack.chunjun.connector.kafka.converter.KafkaSyncConverter;
import com.dtstack.chunjun.connector.kafka.enums.StartupMode;
Expand Down Expand Up @@ -117,9 +118,16 @@ public DataStream<RowData> createSource() {
props.putAll(kafkaConfig.getConsumerSettings());
RowType rowType =
TableUtil.createRowType(kafkaConfig.getColumn(), KafkaRawTypeMapping::apply);
DynamicKafkaDeserializationSchema deserializationSchema =
new RowDeserializationSchema(
kafkaConfig, new KafkaSyncConverter(rowType, kafkaConfig));
DynamicKafkaDeserializationSchema deserializationSchema;
if (StringUtils.equalsIgnoreCase(kafkaConfig.getCodec(), "canal-json")) {
deserializationSchema =
new RowDeserializationSchema(
kafkaConfig, new KafkaCanalSyncConverter(rowType, kafkaConfig), true);
} else {
deserializationSchema =
new RowDeserializationSchema(
kafkaConfig, new KafkaSyncConverter(rowType, kafkaConfig), false);
}
KafkaConsumerWrapper consumer =
new KafkaConsumerWrapper(topics, deserializationSchema, props);
switch (kafkaConfig.getMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -188,6 +189,10 @@ protected ISerializationConverter<SinkT> wrapIntoNullableExternalConverter(
*/
public abstract RowData toInternal(SourceT input) throws Exception;

public List<RowData> toInternalList(SourceT input) throws Exception {
throw new RuntimeException("Subclass need rewriting");
}

/**
* @param input input
* @return RowData
Expand Down
Loading