Skip to content

Commit

Permalink
Kafka debezium json supports automatic discovery of primary keys
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jun 25, 2024
1 parent 6e2deb5 commit 54b4672
Show file tree
Hide file tree
Showing 13 changed files with 358 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public CdcSourceRecord(@Nullable String topic, @Nullable Object key, Object valu
this.value = value;
}

public CdcSourceRecord(Object key, Object value) {
this(null, key, value);
}

public CdcSourceRecord(Object value) {
this(null, null, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ protected void evalComputedColumns(
});
}

private List<String> extractPrimaryKeys() {
protected List<String> extractPrimaryKeys() {
ArrayNode pkNames = getNodeAs(root, primaryField(), ArrayNode.class);
if (pkNames == null) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
Expand Down Expand Up @@ -82,6 +83,7 @@ public class DebeziumRecordParser extends RecordParser {
private final Map<String, String> debeziumTypes = new HashMap<>();
private final Map<String, String> classNames = new HashMap<>();
private final Map<String, Map<String, String>> parameters = new HashMap<>();
private final List<String> primaryKeys = Lists.newArrayList();

public DebeziumRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
super(typeMapping, computedColumns);
Expand Down Expand Up @@ -121,7 +123,7 @@ private JsonNode getBefore(String op) {
@Override
protected void setRoot(CdcSourceRecord record) {
JsonNode node = (JsonNode) record.getValue();

preparePrimaryKeys(record);
hasSchema = false;
if (node.has(FIELD_SCHEMA)) {
root = node.get(FIELD_PAYLOAD);
Expand All @@ -135,6 +137,18 @@ protected void setRoot(CdcSourceRecord record) {
}
}

protected void preparePrimaryKeys(CdcSourceRecord record) {
primaryKeys.clear();
if (record.getKey() == null) {
return;
}
JsonNode node = (JsonNode) record.getKey();
if (node.has(FIELD_SCHEMA)) {
node = node.get(FIELD_PAYLOAD);
}
node.fieldNames().forEachRemaining(primaryKeys::add);
}

private void parseSchema(JsonNode schema) {
debeziumTypes.clear();
classNames.clear();
Expand Down Expand Up @@ -217,6 +231,14 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
return resultMap;
}

@Override
protected List<String> extractPrimaryKeys() {
if (!primaryKeys.isEmpty()) {
return primaryKeys;
}
return super.extractPrimaryKeys();
}

@Override
protected String primaryField() {
return FIELD_PRIMARY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -85,7 +84,8 @@ public static KafkaSource<CdcSourceRecord> buildKafkaSource(Configuration kafkaC
}

kafkaSourceBuilder
.setValueOnlyDeserializer(new CdcJsonDeserializationSchema())
.setDeserializer(new KafkaKeyValueDeserializationSchema())
// .setValueOnlyDeserializer(new CdcJsonDeserializationSchema())
.setGroupId(kafkaPropertiesGroupId(kafkaConfig));

Properties properties = createKafkaProperties(kafkaConfig);
Expand Down Expand Up @@ -325,13 +325,13 @@ private static class KafkaConsumerWrapper implements MessageQueueSchemaUtils.Con
public List<CdcSourceRecord> getRecords(int pollTimeOutMills) {
ConsumerRecords<byte[], byte[]> consumerRecords =
consumer.poll(Duration.ofMillis(pollTimeOutMills));
CdcJsonDeserializationSchema deserializationSchema = new CdcJsonDeserializationSchema();
KafkaKeyValueDeserializationSchema deserializationSchema =
new KafkaKeyValueDeserializationSchema();
return StreamSupport.stream(consumerRecords.records(topic).spliterator(), false)
.map(
consumerRecord -> {
try {
return deserializationSchema.deserialize(
consumerRecord.value());
return deserializationSchema.deserialize(consumerRecord);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.flink.action.cdc.CdcSourceRecord;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;

/** A kafka key value deserialization schema for {@link CdcSourceRecord}. */
public class KafkaKeyValueDeserializationSchema
implements KafkaRecordDeserializationSchema<CdcSourceRecord> {

private static final long serialVersionUID = 1L;

private static final Logger LOG =
LoggerFactory.getLogger(KafkaKeyValueDeserializationSchema.class);

private final ObjectMapper objectMapper = new ObjectMapper();

public KafkaKeyValueDeserializationSchema() {
objectMapper
.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

public CdcSourceRecord deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {
if (record == null || record.value() == null) {
return null;
}
try {
JsonNode key =
record.key() != null
? objectMapper.readValue(record.key(), JsonNode.class)
: null;
CdcSourceRecord sourceRecord =
new CdcSourceRecord(
key, objectMapper.readValue(record.value(), JsonNode.class));
return sourceRecord;
} catch (Exception e) {
LOG.error("Invalid Json:\n{}", new String(record.value()));
throw e;
}
}

@Override
public void deserialize(
ConsumerRecord<byte[], byte[]> record, Collector<CdcSourceRecord> collector)
throws IOException {
try {
CdcSourceRecord sourceRecord = deserialize(record);
if (sourceRecord == null) {
return;
}
collector.collect(sourceRecord);
} catch (Exception e) {
LOG.error("Invalid Json:\n{}", new String(record.value()));
throw e;
}
}

@Override
public TypeInformation<CdcSourceRecord> getProducedType() {
return getForClass(CdcSourceRecord.class);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
Expand Down Expand Up @@ -172,7 +171,7 @@ public static PulsarSource<CdcSourceRecord> buildPulsarSource(Configuration puls
.setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
.setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
.setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
.setDeserializationSchema(new CdcJsonDeserializationSchema());
.setDeserializationSchema(new PulsarKeyValueDeserializationSchema());

pulsarConfig.getOptional(TOPIC).ifPresent(pulsarSourceBuilder::setTopics);
pulsarConfig.getOptional(TOPIC_PATTERN).ifPresent(pulsarSourceBuilder::setTopicPattern);
Expand Down Expand Up @@ -387,12 +386,11 @@ private static class PulsarConsumerWrapper implements MessageQueueSchemaUtils.Co
public List<CdcSourceRecord> getRecords(int pollTimeOutMills) {
try {
Message<byte[]> message = consumer.receive(pollTimeOutMills, TimeUnit.MILLISECONDS);
CdcJsonDeserializationSchema deserializationSchema =
new CdcJsonDeserializationSchema();
PulsarKeyValueDeserializationSchema deserializationSchema =
new PulsarKeyValueDeserializationSchema();
return message == null
? Collections.emptyList()
: Collections.singletonList(
deserializationSchema.deserialize(message.getValue()));
: Collections.singletonList(deserializationSchema.deserialize(message));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Loading

0 comments on commit 54b4672

Please sign in to comment.