From cf92903bbd779225ac0cdc3d032602a716dfe947 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Mon, 29 Jan 2024 12:02:34 +0800 Subject: [PATCH] Kafka debezium json supports automatic discovery of primary keys --- .../apache/paimon/utils/JsonSerdeUtil.java | 8 ++ .../action/cdc/MessageQueueSchemaUtils.java | 1 - .../flink/action/cdc/format/RecordParser.java | 9 +- .../format/debezium/DebeziumRecordParser.java | 27 ++++++ .../format/debezium/DebeziumSchemaUtils.java | 57 ++++++++++++ .../action/cdc/kafka/KafkaActionUtils.java | 1 - .../KafkaKeyValueDeserializationSchema.java | 79 ++++++++++++++++ .../PulsarKeyValueDeserializationSchema.java | 59 ++++++++++++ .../cdc/kafka/KafkaActionITCaseBase.java | 23 +++++ .../KafkaDebeziumSyncTableActionITCase.java | 12 +++ .../cdc/kafka/KafkaSyncTableActionITCase.java | 89 +++++++++++++++++++ .../schema/primarykeys/debezium-data-1.txt | 19 ++++ .../debezium-data-with-schema-1.txt | 18 ++++ 13 files changed, 399 insertions(+), 3 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java index 43d16c04c27fc..1a9c07ccb2b1a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java @@ -35,6 +35,7 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.SerializerProvider; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; @@ -130,6 +131,13 @@ public static T fromJson(String json, TypeReference typeReference) { } } + public static ObjectNode setNode(ObjectNode node, String fieldName, T value) { + ObjectMapper mapper = new ObjectMapper(); + JsonNode nodeValue = OBJECT_MAPPER_INSTANCE.valueToTree(value); + node.set(fieldName, nodeValue); + return node; + } + public static T fromJson(String json, Class clazz) { try { return OBJECT_MAPPER_INSTANCE.reader().readValue(json, clazz); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java index c517670e40cf8..109cf971af4ae 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java @@ -87,7 +87,6 @@ private static void sleepSafely(int duration) { /** Wrap the consumer for different message queues. */ public interface ConsumerWrapper extends AutoCloseable { - List getRecords(int pollTimeOutMills); String topic(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java index ea9fa5492b7a2..790b8a5c53a5c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java @@ -76,6 +76,8 @@ public abstract class RecordParser protected JsonNode root; + protected JsonNode key; + public RecordParser(TypeMapping typeMapping, List computedColumns) { this.typeMapping = typeMapping; this.computedColumns = computedColumns; @@ -85,6 +87,7 @@ public RecordParser(TypeMapping typeMapping, List computedColumn public Schema buildSchema(CdcSourceRecord record) { try { setRoot(record); + setKey(record); if (isDDL()) { return null; } @@ -175,7 +178,7 @@ protected void evalComputedColumns( }); } - private List extractPrimaryKeys() { + protected List extractPrimaryKeys() { ArrayNode pkNames = getNodeAs(root, primaryField(), ArrayNode.class); if (pkNames == null) { return Collections.emptyList(); @@ -204,6 +207,10 @@ private RichCdcMultiplexRecord createRecord( new CdcRecord(rowKind, data)); } + protected void setKey(CdcSourceRecord record) { + key = (JsonNode) record.getKey(); + } + protected void setRoot(CdcSourceRecord record) { root = (JsonNode) record.getValue(); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java index c2b6587547e40..2d37eefe7ee5a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java @@ -28,6 +28,7 @@ import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -135,6 +136,22 @@ protected void setRoot(CdcSourceRecord record) { } } + @Override + protected void setKey(CdcSourceRecord record) { + JsonNode node = (JsonNode) record.getKey(); + hasSchema = false; + if (node.has(FIELD_SCHEMA)) { + key = node.get(FIELD_PAYLOAD); + JsonNode schema = node.get(FIELD_SCHEMA); + if (!isNull(schema)) { + parseSchema(schema); + hasSchema = true; + } + } else { + key = node; + } + } + private void parseSchema(JsonNode schema) { debeziumTypes.clear(); classNames.clear(); @@ -217,6 +234,16 @@ protected Map extractRowData(JsonNode record, RowType.Builder ro return resultMap; } + @Override + protected List extractPrimaryKeys() { + if (key != null) { + List primaryKeys = Lists.newArrayList(); + key.fieldNames().forEachRemaining(primaryKeys::add); + return primaryKeys; + } + return super.extractPrimaryKeys(); + } + @Override protected String primaryField() { return FIELD_PRIMARY; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java index 1aab6653d4d4f..332b8fa5b3f22 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java @@ -19,14 +19,20 @@ package org.apache.paimon.flink.action.cdc.format.debezium; import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.action.cdc.format.DataFormat; import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.DecimalType; import org.apache.paimon.utils.DateTimeUtils; +import org.apache.paimon.utils.JsonSerdeUtil; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.StringUtils; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import io.debezium.data.Bits; import io.debezium.data.geometry.Geometry; @@ -46,9 +52,13 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Base64; +import java.util.List; import java.util.Map; import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING; +import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_PAYLOAD; +import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_PRIMARY; +import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_SCHEMA; /** * Utils to handle 'schema' field in debezium Json. TODO: The methods have many duplicate codes with @@ -56,6 +66,53 @@ */ public class DebeziumSchemaUtils { + /** Rewrite value. */ + public static String rewriteValue(Pair message, String format) { + DataFormat dataFormat = DataFormat.fromConfigString(format); + String value = message.getValue(); + switch (dataFormat) { + case DEBEZIUM_JSON: + if (StringUtils.isBlank(message.getKey())) { + return value; + } else { + String key = message.getKey(); + Pair keyValue = Pair.of(key, value); + String newValue = extractPrimaryKeys(keyValue); + return newValue; + } + default: + return value; + } + } + + /** Append primary keys to value. */ + public static String extractPrimaryKeys(Pair record) { + String key = record.getKey(); + if (StringUtils.isBlank(key)) { + return record.getValue(); + } + try { + List primaryKeys = Lists.newArrayList(); + JsonNode keyNode = JsonSerdeUtil.fromJson(key, JsonNode.class); + JsonNode payload; + if (keyNode.has(FIELD_SCHEMA)) { + payload = keyNode.get(FIELD_PAYLOAD); + } else { + payload = keyNode; + } + payload.fieldNames().forEachRemaining(primaryKeys::add); + ObjectNode valueNode = JsonSerdeUtil.fromJson(record.getValue(), ObjectNode.class); + + // append primary keys + JsonSerdeUtil.setNode(valueNode, FIELD_PRIMARY, primaryKeys); + return JsonSerdeUtil.writeValueAsString(valueNode); + } catch (JsonProcessingException e) { + throw new RuntimeException( + "An error occurred when automatically attaching the debezium primary keys to Value", + e); + } + } + /** Transform raw string value according to schema. */ public static String transformRawValue( @Nullable String rawValue, diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index 4f0be0ef221e1..c425cb6544b15 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -87,7 +87,6 @@ public static KafkaSource buildKafkaSource(Configuration kafkaC kafkaSourceBuilder .setValueOnlyDeserializer(new CdcJsonDeserializationSchema()) .setGroupId(kafkaPropertiesGroupId(kafkaConfig)); - Properties properties = createKafkaProperties(kafkaConfig); StartupMode startupMode = diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java new file mode 100644 index 0000000000000..0bfe9ef6ff194 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.kafka; + +import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils; +import org.apache.paimon.utils.Pair; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** Deserialization for kafka key and value. */ +public class KafkaKeyValueDeserializationSchema + implements KafkaRecordDeserializationSchema { + + private static final Logger LOG = + LoggerFactory.getLogger(KafkaKeyValueDeserializationSchema.class); + + private String charset; + private String format; + + public KafkaKeyValueDeserializationSchema(String format) { + this(StandardCharsets.UTF_8.name(), format); + } + + public KafkaKeyValueDeserializationSchema(String charset, String format) { + this.charset = Preconditions.checkNotNull(charset); + this.format = format; + } + + @Override + public void deserialize(ConsumerRecord record, Collector collector) + throws IOException { + if (record.value() != null) { + String value = new String(record.value(), Charset.forName(charset)); + String key = + record.key() != null + ? new String(record.key(), Charset.forName(charset)) + : null; + collector.collect(DebeziumSchemaUtils.rewriteValue(Pair.of(key, value), format)); + } else { + // see + // https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-tombstone-events + LOG.info( + "Found null message value:\n{}\nThis message will be ignored. It might be produced by tombstone-event, " + + "please check your Debezium and Kafka configuration.", + record); + } + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(String.class); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java new file mode 100644 index 0000000000000..55ad8a9fdcf3d --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.pulsar; + +import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils; +import org.apache.paimon.utils.Pair; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; +import org.apache.pulsar.client.api.Message; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** Deserialization for Pulsar key and value. */ +public class PulsarKeyValueDeserializationSchema implements PulsarDeserializationSchema { + + private String charset; + private String format; + + public PulsarKeyValueDeserializationSchema(String format) { + this(StandardCharsets.UTF_8.name(), format); + } + + public PulsarKeyValueDeserializationSchema(String charset, String format) { + this.charset = Preconditions.checkNotNull(charset); + this.format = format; + } + + @Override + public void deserialize(Message message, Collector collector) throws Exception { + String value = new String(message.getValue(), Charset.forName(charset)); + collector.collect( + DebeziumSchemaUtils.rewriteValue(Pair.of(message.getKey(), value), format)); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(String.class); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java index 7d8c78c2eddf8..4726a4206623c 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java @@ -339,6 +339,29 @@ private void send(String topic, String record, boolean wait) { } } + void writeRecordsToKafka(String topic, Map data) throws Exception { + Properties producerProperties = getStandardProps(); + producerProperties.setProperty("retries", "0"); + producerProperties.put( + "key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProperties.put( + "value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + KafkaProducer kafkaProducer = new KafkaProducer(producerProperties); + for (Map.Entry entry : data.entrySet()) { + try { + JsonNode keyNode = objectMapper.readTree(entry.getKey()); + JsonNode valueNode = objectMapper.readTree(entry.getValue()); + if (!StringUtils.isEmpty(entry.getValue())) { + kafkaProducer.send( + new ProducerRecord<>(topic, entry.getKey(), entry.getValue())); + } + } catch (Exception e) { + // ignore + } + } + kafkaProducer.close(); + } + /** Kafka container extension for junit5. */ private static class KafkaContainerExtension extends KafkaContainer implements BeforeAllCallback, AfterAllCallback { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java index 04dfb3769a615..a4fbca868b9f9 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java @@ -147,4 +147,16 @@ public void testMessageWithNullValue() throws Exception { rowType, Collections.singletonList("id")); } + + @Timeout(120) + @Test + public void testRecordAndAutoDiscoveryPrimaryKeys() throws Exception { + testRecordWithPrimaryKeys(DEBEZIUM); + } + + @Test + @Timeout(120) + public void testSchemaIncludeRecordAndAutoDiscoveryPrimaryKeys() throws Exception { + testSchemaIncludeRecordWithPrimaryKeys(DEBEZIUM); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java index 5f7df79e48ed3..841a380ef0c5f 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -569,6 +570,94 @@ public void testSchemaIncludeRecord(String format) throws Exception { waitForResult(expected, table, rowType, primaryKeys); } + public void testRecordWithPrimaryKeys(String format) throws Exception { + String topic = "no_schema_include_with_primary_keys"; + createTestTopic(topic, 1, 1); + + List lines = + readLines("kafka/debezium/table/schema/primarykeys/debezium-data-1.txt"); + Map keyValues = new HashMap<>(); + for (String line : lines) { + String[] splitLines = line.split(";"); + if (splitLines.length > 1) { + keyValues.put(splitLines[0], splitLines[1]); + } + } + try { + writeRecordsToKafka(topic, keyValues); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); + kafkaConfig.put(TOPIC.key(), topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + List expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14]", + "+I[102, car battery, 12V car battery, 8.1]"); + waitForResult(expected, table, rowType, primaryKeys); + } + + public void testSchemaIncludeRecordWithPrimaryKeys(String format) throws Exception { + String topic = "schema_include_with_primary_keys"; + createTestTopic(topic, 1, 1); + + List lines = + readLines( + "kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt"); + Map keyValues = new HashMap<>(); + for (String line : lines) { + String[] splitLines = line.split(";"); + if (splitLines.length > 1) { + keyValues.put(splitLines[0], splitLines[1]); + } + } + try { + writeRecordsToKafka(topic, keyValues); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); + kafkaConfig.put(TOPIC.key(), topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.DOUBLE() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + List expected = + Collections.singletonList( + "+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]"); + waitForResult(expected, table, rowType, primaryKeys); + } + // TODO some types are different from mysql cdc; maybe need to fix public void testAllTypesWithSchemaImpl(String format) throws Exception { String topic = "schema_include_all_type"; diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt new file mode 100644 index 0000000000000..9bb00c7786d8c --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"schema": null ,"payload": {"id": 101}};{"schema":null, "payload":{"before": null, "after": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}} +{"schema": null ,"payload": {"id": 102}};{"schema":null, "payload":{"before": null, "after": {"id": 102, "name": "car battery", "description": "12V car battery", "weight": 8.1}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt new file mode 100644 index 0000000000000..1feaafe9fc20b --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"schema": null ,"payload": {"id": 101}};{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}}