From 47e74725adb4bcc5eaaa76f467f943ec1181d699 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Mon, 29 Jan 2024 12:02:34 +0800 Subject: [PATCH] Kafka debezium json supports automatic discovery of primary keys --- .../flink/action/cdc/CdcSourceRecord.java | 4 + .../flink/action/cdc/format/RecordParser.java | 11 ++- .../format/debezium/DebeziumRecordParser.java | 21 +++++ .../action/cdc/kafka/KafkaActionUtils.java | 3 +- .../KafkaKeyValueDeserializationSchema.java | 84 +++++++++++++++++++ ...ValueOnlyDeserializationSchemaWrapper.java | 71 ---------------- .../action/cdc/pulsar/PulsarActionUtils.java | 2 +- .../PulsarKeyValueDeserializationSchema.java | 77 +++++++++++++++++ .../cdc/kafka/KafkaActionITCaseBase.java | 26 ++++++ .../KafkaDebeziumSyncTableActionITCase.java | 12 +++ .../cdc/kafka/KafkaSyncTableActionITCase.java | 64 ++++++++++++++ .../schema/primarykeys/debezium-data-1.txt | 19 +++++ .../debezium-data-with-schema-1.txt | 18 ++++ 13 files changed, 338 insertions(+), 74 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java delete mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java index 51a14534c4c9c..0be7febe329fa 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java @@ -41,6 +41,10 @@ public CdcSourceRecord(@Nullable String topic, @Nullable Object key, Object valu this.value = value; } + public CdcSourceRecord(Object key, Object value) { + this(null, key, value); + } + public CdcSourceRecord(Object value) { this(null, null, value); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java index ea9fa5492b7a2..fc9b803d9b005 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java @@ -76,6 +76,8 @@ public abstract class RecordParser protected JsonNode root; + protected JsonNode key; + public RecordParser(TypeMapping typeMapping, List computedColumns) { this.typeMapping = typeMapping; this.computedColumns = computedColumns; @@ -175,7 +177,7 @@ protected void evalComputedColumns( }); } - private List extractPrimaryKeys() { + protected List extractPrimaryKeys() { ArrayNode pkNames = getNodeAs(root, primaryField(), ArrayNode.class); if (pkNames == null) { return Collections.emptyList(); @@ -204,8 +206,15 @@ private RichCdcMultiplexRecord createRecord( new CdcRecord(rowKind, data)); } + protected void setKey(CdcSourceRecord record) { + key = (JsonNode) record.getKey(); + } + protected void setRoot(CdcSourceRecord record) { root = (JsonNode) record.getValue(); + if (record.getKey() != null) { + setKey(record); + } } protected JsonNode mergeOldRecord(JsonNode data, JsonNode oldNode) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java index c2b6587547e40..971f9983afc2a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java @@ -28,6 +28,7 @@ import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -135,6 +136,16 @@ protected void setRoot(CdcSourceRecord record) { } } + @Override + protected void setKey(CdcSourceRecord record) { + JsonNode node = (JsonNode) record.getKey(); + if (node.has(FIELD_SCHEMA)) { + key = node.get(FIELD_PAYLOAD); + } else { + key = node; + } + } + private void parseSchema(JsonNode schema) { debeziumTypes.clear(); classNames.clear(); @@ -217,6 +228,16 @@ protected Map extractRowData(JsonNode record, RowType.Builder ro return resultMap; } + @Override + protected List extractPrimaryKeys() { + if (key != null) { + List primaryKeys = Lists.newArrayList(); + key.fieldNames().forEachRemaining(primaryKeys::add); + return primaryKeys; + } + return super.extractPrimaryKeys(); + } + @Override protected String primaryField() { return FIELD_PRIMARY; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index 4f0be0ef221e1..c48a269ee2613 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -85,7 +85,8 @@ public static KafkaSource buildKafkaSource(Configuration kafkaC } kafkaSourceBuilder - .setValueOnlyDeserializer(new CdcJsonDeserializationSchema()) + .setDeserializer(new KafkaKeyValueDeserializationSchema()) + // .setValueOnlyDeserializer(new CdcJsonDeserializationSchema()) .setGroupId(kafkaPropertiesGroupId(kafkaConfig)); Properties properties = createKafkaProperties(kafkaConfig); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java new file mode 100644 index 0000000000000..4fde7d49395d0 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.kafka; + +import org.apache.paimon.flink.action.cdc.CdcSourceRecord; +import org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.util.Collector; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass; + +/** A kafka key value deserialization schema for {@link CdcSourceRecord}. */ +public class KafkaKeyValueDeserializationSchema + implements KafkaRecordDeserializationSchema { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(CdcJsonDeserializationSchema.class); + + private final ObjectMapper objectMapper = new ObjectMapper(); + + public KafkaKeyValueDeserializationSchema() { + objectMapper + .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Override + public void deserialize( + ConsumerRecord consumerRecord, Collector collector) + throws IOException { + if (consumerRecord == null || consumerRecord.value() == null) { + return; + } + try { + CdcSourceRecord record = + new CdcSourceRecord( + consumerRecord.key() == null + ? null + : objectMapper.readValue(consumerRecord.key(), JsonNode.class), + objectMapper.readValue(consumerRecord.value(), JsonNode.class)); + if (consumerRecord.key() != null) { + LOG.info("key :\n{}", new String(consumerRecord.key())); + LOG.info("value :\n{}", new String(consumerRecord.value())); + } + collector.collect(record); + } catch (Exception e) { + LOG.error("Invalid Json:\n{}", new String(consumerRecord.value())); + throw e; + } + } + + @Override + public TypeInformation getProducedType() { + return getForClass(CdcSourceRecord.class); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java deleted file mode 100644 index 5e6b96670bdbd..0000000000000 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.action.cdc.kafka; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; -import org.apache.flink.util.Collector; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * A class that wraps a {@link DeserializationSchema} as the value deserializer for a {@link - * ConsumerRecord}. - * - * @param the return type of the deserialization. - */ -class KafkaValueOnlyDeserializationSchemaWrapper implements KafkaRecordDeserializationSchema { - private static final long serialVersionUID = 1L; - private final DeserializationSchema deserializationSchema; - private static final Logger LOG = - LoggerFactory.getLogger(KafkaValueOnlyDeserializationSchemaWrapper.class); - - KafkaValueOnlyDeserializationSchemaWrapper(DeserializationSchema deserializationSchema) { - this.deserializationSchema = deserializationSchema; - } - - @Override - public void open(DeserializationSchema.InitializationContext context) throws Exception { - deserializationSchema.open(context); - } - - @Override - public void deserialize(ConsumerRecord message, Collector out) - throws IOException { - if (message.value() != null) { - deserializationSchema.deserialize(message.value(), out); - } else { - // see - // https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-tombstone-events - LOG.info( - "Found null message value:\n{}\nThis message will be ignored. It might be produced by tombstone-event, " - + "please check your Debezium and Kafka configuration.", - message); - } - } - - @Override - public TypeInformation getProducedType() { - return deserializationSchema.getProducedType(); - } -} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java index 66bdc1847c578..e016d576c69dc 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java @@ -172,7 +172,7 @@ public static PulsarSource buildPulsarSource(Configuration puls .setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL)) .setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL)) .setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME)) - .setDeserializationSchema(new CdcJsonDeserializationSchema()); + .setDeserializationSchema(new PulsarKeyValueDeserializationSchema()); pulsarConfig.getOptional(TOPIC).ifPresent(pulsarSourceBuilder::setTopics); pulsarConfig.getOptional(TOPIC_PATTERN).ifPresent(pulsarSourceBuilder::setTopicPattern); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java new file mode 100644 index 0000000000000..115dd8a7f2ec0 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.pulsar; + +import org.apache.paimon.flink.action.cdc.CdcSourceRecord; +import org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.util.Collector; +import org.apache.pulsar.client.api.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass; + +/** A pulsar key value deserialization schema for {@link CdcSourceRecord}. */ +public class PulsarKeyValueDeserializationSchema + implements PulsarDeserializationSchema { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(CdcJsonDeserializationSchema.class); + + private final ObjectMapper objectMapper = new ObjectMapper(); + + public PulsarKeyValueDeserializationSchema() { + objectMapper + .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Override + public void deserialize(Message message, Collector collector) + throws Exception { + if (message == null || message.getValue() == null) { + return; + } + try { + CdcSourceRecord record = + new CdcSourceRecord( + message.getKey() == null + ? null + : objectMapper.readValue(message.getKey(), JsonNode.class), + objectMapper.readValue(message.getValue(), JsonNode.class)); + collector.collect(record); + } catch (Exception e) { + LOG.error("Invalid Json:\n{} \n{}", message.getKey(), new String(message.getValue())); + throw e; + } + } + + @Override + public TypeInformation getProducedType() { + return getForClass(CdcSourceRecord.class); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java index 7d8c78c2eddf8..72225e7df2a87 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java @@ -328,6 +328,18 @@ private boolean isRecordLine(String line) { } } + private void send(String topic, String key, String record, boolean wait) { + Future sendFuture = + kafkaProducer.send(new ProducerRecord<>(topic, key, record)); + if (wait) { + try { + sendFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + } + private void send(String topic, String record, boolean wait) { Future sendFuture = kafkaProducer.send(new ProducerRecord<>(topic, record)); if (wait) { @@ -339,6 +351,20 @@ private void send(String topic, String record, boolean wait) { } } + void writeRecordsToKafka(String topic, boolean wait, String resourceDirFormat) + throws Exception { + URL url = + KafkaCanalSyncTableActionITCase.class + .getClassLoader() + .getResource(String.format(resourceDirFormat)); + List lines = Files.readAllLines(Paths.get(url.toURI())); + lines.stream() + .map(line -> line.split(";")) + .filter(keyValues -> (keyValues.length > 1)) + .filter(keyValues -> isRecordLine(keyValues[0]) && isRecordLine(keyValues[1])) + .forEach(keyValues -> this.send(topic, keyValues[0], keyValues[1], wait)); + } + /** Kafka container extension for junit5. */ private static class KafkaContainerExtension extends KafkaContainer implements BeforeAllCallback, AfterAllCallback { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java index 04dfb3769a615..3c3aae7f6fc7b 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java @@ -147,4 +147,16 @@ public void testMessageWithNullValue() throws Exception { rowType, Collections.singletonList("id")); } + + @Timeout(120) + @Test + public void testRecordAndAutoDiscoveryPrimaryKeys() throws Exception { + testRecordWithPrimaryKeys(DEBEZIUM); + } + + @Test + @Timeout(120) + public void testRecordIncludeSchemaAndAutoDiscoveryPrimaryKeys() throws Exception { + testSchemaIncludeRecordWithPrimaryKeys(DEBEZIUM); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java index 5f7df79e48ed3..adb85a2aa4292 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java @@ -569,6 +569,70 @@ public void testSchemaIncludeRecord(String format) throws Exception { waitForResult(expected, table, rowType, primaryKeys); } + public void testRecordWithPrimaryKeys(String format) throws Exception { + String topic = "no_schema_include_with_primary_keys"; + createTestTopic(topic, 1, 1); + writeRecordsToKafka( + topic, false, "kafka/debezium/table/schema/primarykeys/debezium-data-1.txt"); + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); + kafkaConfig.put(TOPIC.key(), topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + List expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14]", + "+I[102, car battery, 12V car battery, 8.1]"); + waitForResult(expected, table, rowType, primaryKeys); + } + + public void testSchemaIncludeRecordWithPrimaryKeys(String format) throws Exception { + String topic = "schema_include_with_primary_keys"; + createTestTopic(topic, 1, 1); + writeRecordsToKafka( + topic, + false, + "kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt"); + + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); + kafkaConfig.put(TOPIC.key(), topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.DOUBLE() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + List expected = + Collections.singletonList( + "+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]"); + waitForResult(expected, table, rowType, primaryKeys); + } + // TODO some types are different from mysql cdc; maybe need to fix public void testAllTypesWithSchemaImpl(String format) throws Exception { String topic = "schema_include_all_type"; diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt new file mode 100644 index 0000000000000..9bb00c7786d8c --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"schema": null ,"payload": {"id": 101}};{"schema":null, "payload":{"before": null, "after": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}} +{"schema": null ,"payload": {"id": 102}};{"schema":null, "payload":{"before": null, "after": {"id": 102, "name": "car battery", "description": "12V car battery", "weight": 8.1}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt new file mode 100644 index 0000000000000..1feaafe9fc20b --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"schema": null ,"payload": {"id": 101}};{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}}