Skip to content

Commit

Permalink
Kafka debezium json supports automatic discovery of primary keys
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jun 25, 2024
1 parent 3baca1c commit cf92903
Show file tree
Hide file tree
Showing 13 changed files with 399 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

Expand Down Expand Up @@ -130,6 +131,13 @@ public static <T> T fromJson(String json, TypeReference<T> typeReference) {
}
}

public static <T> ObjectNode setNode(ObjectNode node, String fieldName, T value) {
ObjectMapper mapper = new ObjectMapper();
JsonNode nodeValue = OBJECT_MAPPER_INSTANCE.valueToTree(value);
node.set(fieldName, nodeValue);
return node;
}

public static <T> T fromJson(String json, Class<T> clazz) {
try {
return OBJECT_MAPPER_INSTANCE.reader().readValue(json, clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ private static void sleepSafely(int duration) {

/** Wrap the consumer for different message queues. */
public interface ConsumerWrapper extends AutoCloseable {

List<CdcSourceRecord> getRecords(int pollTimeOutMills);

String topic();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public abstract class RecordParser

protected JsonNode root;

protected JsonNode key;

public RecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
this.typeMapping = typeMapping;
this.computedColumns = computedColumns;
Expand All @@ -85,6 +87,7 @@ public RecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumn
public Schema buildSchema(CdcSourceRecord record) {
try {
setRoot(record);
setKey(record);
if (isDDL()) {
return null;
}
Expand Down Expand Up @@ -175,7 +178,7 @@ protected void evalComputedColumns(
});
}

private List<String> extractPrimaryKeys() {
protected List<String> extractPrimaryKeys() {
ArrayNode pkNames = getNodeAs(root, primaryField(), ArrayNode.class);
if (pkNames == null) {
return Collections.emptyList();
Expand Down Expand Up @@ -204,6 +207,10 @@ private RichCdcMultiplexRecord createRecord(
new CdcRecord(rowKind, data));
}

protected void setKey(CdcSourceRecord record) {
key = (JsonNode) record.getKey();
}

protected void setRoot(CdcSourceRecord record) {
root = (JsonNode) record.getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
Expand Down Expand Up @@ -135,6 +136,22 @@ protected void setRoot(CdcSourceRecord record) {
}
}

@Override
protected void setKey(CdcSourceRecord record) {
JsonNode node = (JsonNode) record.getKey();
hasSchema = false;
if (node.has(FIELD_SCHEMA)) {
key = node.get(FIELD_PAYLOAD);
JsonNode schema = node.get(FIELD_SCHEMA);
if (!isNull(schema)) {
parseSchema(schema);
hasSchema = true;
}
} else {
key = node;
}
}

private void parseSchema(JsonNode schema) {
debeziumTypes.clear();
classNames.clear();
Expand Down Expand Up @@ -217,6 +234,16 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
return resultMap;
}

@Override
protected List<String> extractPrimaryKeys() {
if (key != null) {
List<String> primaryKeys = Lists.newArrayList();
key.fieldNames().forEachRemaining(primaryKeys::add);
return primaryKeys;
}
return super.extractPrimaryKeys();
}

@Override
protected String primaryField() {
return FIELD_PRIMARY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@
package org.apache.paimon.flink.action.cdc.format.debezium;

import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import io.debezium.data.Bits;
import io.debezium.data.geometry.Geometry;
Expand All @@ -46,16 +52,67 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Base64;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_PAYLOAD;
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_PRIMARY;
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_SCHEMA;

/**
* Utils to handle 'schema' field in debezium Json. TODO: The methods have many duplicate codes with
* MySqlRecordParser. Need refactor.
*/
public class DebeziumSchemaUtils {

/** Rewrite value. */
public static String rewriteValue(Pair<String, String> message, String format) {
DataFormat dataFormat = DataFormat.fromConfigString(format);
String value = message.getValue();
switch (dataFormat) {
case DEBEZIUM_JSON:
if (StringUtils.isBlank(message.getKey())) {
return value;
} else {
String key = message.getKey();
Pair<String, String> keyValue = Pair.of(key, value);
String newValue = extractPrimaryKeys(keyValue);
return newValue;
}
default:
return value;
}
}

/** Append primary keys to value. */
public static String extractPrimaryKeys(Pair<String, String> record) {
String key = record.getKey();
if (StringUtils.isBlank(key)) {
return record.getValue();
}
try {
List<String> primaryKeys = Lists.newArrayList();
JsonNode keyNode = JsonSerdeUtil.fromJson(key, JsonNode.class);
JsonNode payload;
if (keyNode.has(FIELD_SCHEMA)) {
payload = keyNode.get(FIELD_PAYLOAD);
} else {
payload = keyNode;
}
payload.fieldNames().forEachRemaining(primaryKeys::add);
ObjectNode valueNode = JsonSerdeUtil.fromJson(record.getValue(), ObjectNode.class);

// append primary keys
JsonSerdeUtil.setNode(valueNode, FIELD_PRIMARY, primaryKeys);
return JsonSerdeUtil.writeValueAsString(valueNode);
} catch (JsonProcessingException e) {
throw new RuntimeException(
"An error occurred when automatically attaching the debezium primary keys to Value",
e);
}
}

/** Transform raw string value according to schema. */
public static String transformRawValue(
@Nullable String rawValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public static KafkaSource<CdcSourceRecord> buildKafkaSource(Configuration kafkaC
kafkaSourceBuilder
.setValueOnlyDeserializer(new CdcJsonDeserializationSchema())
.setGroupId(kafkaPropertiesGroupId(kafkaConfig));

Properties properties = createKafkaProperties(kafkaConfig);

StartupMode startupMode =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
import org.apache.paimon.utils.Pair;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

/** Deserialization for kafka key and value. */
public class KafkaKeyValueDeserializationSchema
implements KafkaRecordDeserializationSchema<String> {

private static final Logger LOG =
LoggerFactory.getLogger(KafkaKeyValueDeserializationSchema.class);

private String charset;
private String format;

public KafkaKeyValueDeserializationSchema(String format) {
this(StandardCharsets.UTF_8.name(), format);
}

public KafkaKeyValueDeserializationSchema(String charset, String format) {
this.charset = Preconditions.checkNotNull(charset);
this.format = format;
}

@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<String> collector)
throws IOException {
if (record.value() != null) {
String value = new String(record.value(), Charset.forName(charset));
String key =
record.key() != null
? new String(record.key(), Charset.forName(charset))
: null;
collector.collect(DebeziumSchemaUtils.rewriteValue(Pair.of(key, value), format));
} else {
// see
// https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-tombstone-events
LOG.info(
"Found null message value:\n{}\nThis message will be ignored. It might be produced by tombstone-event, "
+ "please check your Debezium and Kafka configuration.",
record);
}
}

@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.pulsar;

import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
import org.apache.paimon.utils.Pair;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.Message;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

/** Deserialization for Pulsar key and value. */
public class PulsarKeyValueDeserializationSchema implements PulsarDeserializationSchema<String> {

private String charset;
private String format;

public PulsarKeyValueDeserializationSchema(String format) {
this(StandardCharsets.UTF_8.name(), format);
}

public PulsarKeyValueDeserializationSchema(String charset, String format) {
this.charset = Preconditions.checkNotNull(charset);
this.format = format;
}

@Override
public void deserialize(Message<byte[]> message, Collector<String> collector) throws Exception {
String value = new String(message.getValue(), Charset.forName(charset));
collector.collect(
DebeziumSchemaUtils.rewriteValue(Pair.of(message.getKey(), value), format));
}

@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,29 @@ private void send(String topic, String record, boolean wait) {
}
}

void writeRecordsToKafka(String topic, Map<String, String> data) throws Exception {
Properties producerProperties = getStandardProps();
producerProperties.setProperty("retries", "0");
producerProperties.put(
"key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put(
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer kafkaProducer = new KafkaProducer(producerProperties);
for (Map.Entry<String, String> entry : data.entrySet()) {
try {
JsonNode keyNode = objectMapper.readTree(entry.getKey());
JsonNode valueNode = objectMapper.readTree(entry.getValue());
if (!StringUtils.isEmpty(entry.getValue())) {
kafkaProducer.send(
new ProducerRecord<>(topic, entry.getKey(), entry.getValue()));
}
} catch (Exception e) {
// ignore
}
}
kafkaProducer.close();
}

/** Kafka container extension for junit5. */
private static class KafkaContainerExtension extends KafkaContainer
implements BeforeAllCallback, AfterAllCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,16 @@ public void testMessageWithNullValue() throws Exception {
rowType,
Collections.singletonList("id"));
}

@Timeout(120)
@Test
public void testRecordAndAutoDiscoveryPrimaryKeys() throws Exception {
testRecordWithPrimaryKeys(DEBEZIUM);
}

@Test
@Timeout(120)
public void testSchemaIncludeRecordAndAutoDiscoveryPrimaryKeys() throws Exception {
testSchemaIncludeRecordWithPrimaryKeys(DEBEZIUM);
}
}
Loading

0 comments on commit cf92903

Please sign in to comment.