Skip to content

Commit

Permalink
Kafka debezium json supports automatic discovery of primary keys
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jun 25, 2024
1 parent 3baca1c commit 47e7472
Show file tree
Hide file tree
Showing 13 changed files with 338 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public CdcSourceRecord(@Nullable String topic, @Nullable Object key, Object valu
this.value = value;
}

public CdcSourceRecord(Object key, Object value) {
this(null, key, value);
}

public CdcSourceRecord(Object value) {
this(null, null, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public abstract class RecordParser

protected JsonNode root;

protected JsonNode key;

public RecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
this.typeMapping = typeMapping;
this.computedColumns = computedColumns;
Expand Down Expand Up @@ -175,7 +177,7 @@ protected void evalComputedColumns(
});
}

private List<String> extractPrimaryKeys() {
protected List<String> extractPrimaryKeys() {
ArrayNode pkNames = getNodeAs(root, primaryField(), ArrayNode.class);
if (pkNames == null) {
return Collections.emptyList();
Expand Down Expand Up @@ -204,8 +206,15 @@ private RichCdcMultiplexRecord createRecord(
new CdcRecord(rowKind, data));
}

protected void setKey(CdcSourceRecord record) {
key = (JsonNode) record.getKey();
}

protected void setRoot(CdcSourceRecord record) {
root = (JsonNode) record.getValue();
if (record.getKey() != null) {
setKey(record);
}
}

protected JsonNode mergeOldRecord(JsonNode data, JsonNode oldNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
Expand Down Expand Up @@ -135,6 +136,16 @@ protected void setRoot(CdcSourceRecord record) {
}
}

@Override
protected void setKey(CdcSourceRecord record) {
JsonNode node = (JsonNode) record.getKey();
if (node.has(FIELD_SCHEMA)) {
key = node.get(FIELD_PAYLOAD);
} else {
key = node;
}
}

private void parseSchema(JsonNode schema) {
debeziumTypes.clear();
classNames.clear();
Expand Down Expand Up @@ -217,6 +228,16 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
return resultMap;
}

@Override
protected List<String> extractPrimaryKeys() {
if (key != null) {
List<String> primaryKeys = Lists.newArrayList();
key.fieldNames().forEachRemaining(primaryKeys::add);
return primaryKeys;
}
return super.extractPrimaryKeys();
}

@Override
protected String primaryField() {
return FIELD_PRIMARY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public static KafkaSource<CdcSourceRecord> buildKafkaSource(Configuration kafkaC
}

kafkaSourceBuilder
.setValueOnlyDeserializer(new CdcJsonDeserializationSchema())
.setDeserializer(new KafkaKeyValueDeserializationSchema())
// .setValueOnlyDeserializer(new CdcJsonDeserializationSchema())
.setGroupId(kafkaPropertiesGroupId(kafkaConfig));

Properties properties = createKafkaProperties(kafkaConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;

/** A kafka key value deserialization schema for {@link CdcSourceRecord}. */
public class KafkaKeyValueDeserializationSchema
implements KafkaRecordDeserializationSchema<CdcSourceRecord> {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(CdcJsonDeserializationSchema.class);

private final ObjectMapper objectMapper = new ObjectMapper();

public KafkaKeyValueDeserializationSchema() {
objectMapper
.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

@Override
public void deserialize(
ConsumerRecord<byte[], byte[]> consumerRecord, Collector<CdcSourceRecord> collector)
throws IOException {
if (consumerRecord == null || consumerRecord.value() == null) {
return;
}
try {
CdcSourceRecord record =
new CdcSourceRecord(
consumerRecord.key() == null
? null
: objectMapper.readValue(consumerRecord.key(), JsonNode.class),
objectMapper.readValue(consumerRecord.value(), JsonNode.class));
if (consumerRecord.key() != null) {
LOG.info("key :\n{}", new String(consumerRecord.key()));
LOG.info("value :\n{}", new String(consumerRecord.value()));
}
collector.collect(record);
} catch (Exception e) {
LOG.error("Invalid Json:\n{}", new String(consumerRecord.value()));
throw e;
}
}

@Override
public TypeInformation<CdcSourceRecord> getProducedType() {
return getForClass(CdcSourceRecord.class);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public static PulsarSource<CdcSourceRecord> buildPulsarSource(Configuration puls
.setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
.setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
.setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
.setDeserializationSchema(new CdcJsonDeserializationSchema());
.setDeserializationSchema(new PulsarKeyValueDeserializationSchema());

pulsarConfig.getOptional(TOPIC).ifPresent(pulsarSourceBuilder::setTopics);
pulsarConfig.getOptional(TOPIC_PATTERN).ifPresent(pulsarSourceBuilder::setTopicPattern);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.pulsar;

import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.pulsar.client.api.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;

/** A pulsar key value deserialization schema for {@link CdcSourceRecord}. */
public class PulsarKeyValueDeserializationSchema
implements PulsarDeserializationSchema<CdcSourceRecord> {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(CdcJsonDeserializationSchema.class);

private final ObjectMapper objectMapper = new ObjectMapper();

public PulsarKeyValueDeserializationSchema() {
objectMapper
.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

@Override
public void deserialize(Message<byte[]> message, Collector<CdcSourceRecord> collector)
throws Exception {
if (message == null || message.getValue() == null) {
return;
}
try {
CdcSourceRecord record =
new CdcSourceRecord(
message.getKey() == null
? null
: objectMapper.readValue(message.getKey(), JsonNode.class),
objectMapper.readValue(message.getValue(), JsonNode.class));
collector.collect(record);
} catch (Exception e) {
LOG.error("Invalid Json:\n{} \n{}", message.getKey(), new String(message.getValue()));
throw e;
}
}

@Override
public TypeInformation<CdcSourceRecord> getProducedType() {
return getForClass(CdcSourceRecord.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,18 @@ private boolean isRecordLine(String line) {
}
}

private void send(String topic, String key, String record, boolean wait) {
Future<RecordMetadata> sendFuture =
kafkaProducer.send(new ProducerRecord<>(topic, key, record));
if (wait) {
try {
sendFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}

private void send(String topic, String record, boolean wait) {
Future<RecordMetadata> sendFuture = kafkaProducer.send(new ProducerRecord<>(topic, record));
if (wait) {
Expand All @@ -339,6 +351,20 @@ private void send(String topic, String record, boolean wait) {
}
}

void writeRecordsToKafka(String topic, boolean wait, String resourceDirFormat)
throws Exception {
URL url =
KafkaCanalSyncTableActionITCase.class
.getClassLoader()
.getResource(String.format(resourceDirFormat));
List<String> lines = Files.readAllLines(Paths.get(url.toURI()));
lines.stream()
.map(line -> line.split(";"))
.filter(keyValues -> (keyValues.length > 1))
.filter(keyValues -> isRecordLine(keyValues[0]) && isRecordLine(keyValues[1]))
.forEach(keyValues -> this.send(topic, keyValues[0], keyValues[1], wait));
}

/** Kafka container extension for junit5. */
private static class KafkaContainerExtension extends KafkaContainer
implements BeforeAllCallback, AfterAllCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,16 @@ public void testMessageWithNullValue() throws Exception {
rowType,
Collections.singletonList("id"));
}

@Timeout(120)
@Test
public void testRecordAndAutoDiscoveryPrimaryKeys() throws Exception {
testRecordWithPrimaryKeys(DEBEZIUM);
}

@Test
@Timeout(120)
public void testRecordIncludeSchemaAndAutoDiscoveryPrimaryKeys() throws Exception {
testSchemaIncludeRecordWithPrimaryKeys(DEBEZIUM);
}
}
Loading

0 comments on commit 47e7472

Please sign in to comment.