diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java
index 51a14534c4c9c..0be7febe329fa 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java
@@ -41,6 +41,10 @@ public CdcSourceRecord(@Nullable String topic, @Nullable Object key, Object valu
         this.value = value;
     }
 
+    public CdcSourceRecord(Object key, Object value) {
+        this(null, key, value);
+    }
+
     public CdcSourceRecord(Object value) {
         this(null, null, value);
     }
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
index ea9fa5492b7a2..fc9b803d9b005 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
@@ -76,6 +76,8 @@ public abstract class RecordParser
 
     protected JsonNode root;
 
+    protected JsonNode key;
+
     public RecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
         this.typeMapping = typeMapping;
         this.computedColumns = computedColumns;
@@ -175,7 +177,7 @@ protected void evalComputedColumns(
                 });
     }
 
-    private List<String> extractPrimaryKeys() {
+    protected List<String> extractPrimaryKeys() {
         ArrayNode pkNames = getNodeAs(root, primaryField(), ArrayNode.class);
         if (pkNames == null) {
             return Collections.emptyList();
@@ -204,8 +206,15 @@ private RichCdcMultiplexRecord createRecord(
                 new CdcRecord(rowKind, data));
     }
 
+    protected void setKey(CdcSourceRecord record) {
+        key = (JsonNode) record.getKey();
+    }
+
     protected void setRoot(CdcSourceRecord record) {
         root = (JsonNode) record.getValue();
+        if (record.getKey() != null) {
+            setKey(record);
+        }
     }
 
     protected JsonNode mergeOldRecord(JsonNode data, JsonNode oldNode) {
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
index c2b6587547e40..971f9983afc2a 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
@@ -28,6 +28,7 @@
 import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.Preconditions;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -135,6 +136,16 @@ protected void setRoot(CdcSourceRecord record) {
         }
     }
 
+    @Override
+    protected void setKey(CdcSourceRecord record) {
+        JsonNode node = (JsonNode) record.getKey();
+        if (node.has(FIELD_SCHEMA)) {
+            key = node.get(FIELD_PAYLOAD);
+        } else {
+            key = node;
+        }
+    }
+
     private void parseSchema(JsonNode schema) {
         debeziumTypes.clear();
         classNames.clear();
@@ -217,6 +228,16 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
         return resultMap;
     }
 
+    @Override
+    protected List<String> extractPrimaryKeys() {
+        if (key != null) {
+            List<String> primaryKeys = Lists.newArrayList();
+            key.fieldNames().forEachRemaining(primaryKeys::add);
+            return primaryKeys;
+        }
+        return super.extractPrimaryKeys();
+    }
+
     @Override
     protected String primaryField() {
         return FIELD_PRIMARY;
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 4f0be0ef221e1..c48a269ee2613 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -85,7 +85,8 @@ public static KafkaSource<CdcSourceRecord> buildKafkaSource(Configuration kafkaC
         }
 
         kafkaSourceBuilder
-                .setValueOnlyDeserializer(new CdcJsonDeserializationSchema())
+                .setDeserializer(new KafkaKeyValueDeserializationSchema())
+                //                .setValueOnlyDeserializer(new CdcJsonDeserializationSchema())
                 .setGroupId(kafkaPropertiesGroupId(kafkaConfig));
 
         Properties properties = createKafkaProperties(kafkaConfig);
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java
new file mode 100644
index 0000000000000..4fde7d49395d0
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
+
+/** A kafka key value deserialization schema for {@link CdcSourceRecord}. */
+public class KafkaKeyValueDeserializationSchema
+        implements KafkaRecordDeserializationSchema<CdcSourceRecord> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(CdcJsonDeserializationSchema.class);
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    public KafkaKeyValueDeserializationSchema() {
+        objectMapper
+                .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true)
+                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    @Override
+    public void deserialize(
+            ConsumerRecord<byte[], byte[]> consumerRecord, Collector<CdcSourceRecord> collector)
+            throws IOException {
+        if (consumerRecord == null || consumerRecord.value() == null) {
+            return;
+        }
+        try {
+            CdcSourceRecord record =
+                    new CdcSourceRecord(
+                            consumerRecord.key() == null
+                                    ? null
+                                    : objectMapper.readValue(consumerRecord.key(), JsonNode.class),
+                            objectMapper.readValue(consumerRecord.value(), JsonNode.class));
+            if (consumerRecord.key() != null) {
+                LOG.info("key :\n{}", new String(consumerRecord.key()));
+                LOG.info("value :\n{}", new String(consumerRecord.value()));
+            }
+            collector.collect(record);
+        } catch (Exception e) {
+            LOG.error("Invalid Json:\n{}", new String(consumerRecord.value()));
+            throw e;
+        }
+    }
+
+    @Override
+    public TypeInformation<CdcSourceRecord> getProducedType() {
+        return getForClass(CdcSourceRecord.class);
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java
deleted file mode 100644
index 5e6b96670bdbd..0000000000000
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action.cdc.kafka;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
-import org.apache.flink.util.Collector;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * A class that wraps a {@link DeserializationSchema} as the value deserializer for a {@link
- * ConsumerRecord}.
- *
- * @param <T> the return type of the deserialization.
- */
-class KafkaValueOnlyDeserializationSchemaWrapper<T> implements KafkaRecordDeserializationSchema<T> {
-    private static final long serialVersionUID = 1L;
-    private final DeserializationSchema<T> deserializationSchema;
-    private static final Logger LOG =
-            LoggerFactory.getLogger(KafkaValueOnlyDeserializationSchemaWrapper.class);
-
-    KafkaValueOnlyDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
-        this.deserializationSchema = deserializationSchema;
-    }
-
-    @Override
-    public void open(DeserializationSchema.InitializationContext context) throws Exception {
-        deserializationSchema.open(context);
-    }
-
-    @Override
-    public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out)
-            throws IOException {
-        if (message.value() != null) {
-            deserializationSchema.deserialize(message.value(), out);
-        } else {
-            // see
-            // https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-tombstone-events
-            LOG.info(
-                    "Found null message value:\n{}\nThis message will be ignored. It might be produced by tombstone-event, "
-                            + "please check your Debezium and Kafka configuration.",
-                    message);
-        }
-    }
-
-    @Override
-    public TypeInformation<T> getProducedType() {
-        return deserializationSchema.getProducedType();
-    }
-}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
index 66bdc1847c578..e016d576c69dc 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
@@ -172,7 +172,7 @@ public static PulsarSource<CdcSourceRecord> buildPulsarSource(Configuration puls
                 .setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
                 .setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
                 .setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
-                .setDeserializationSchema(new CdcJsonDeserializationSchema());
+                .setDeserializationSchema(new PulsarKeyValueDeserializationSchema());
 
         pulsarConfig.getOptional(TOPIC).ifPresent(pulsarSourceBuilder::setTopics);
         pulsarConfig.getOptional(TOPIC_PATTERN).ifPresent(pulsarSourceBuilder::setTopicPattern);
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java
new file mode 100644
index 0000000000000..115dd8a7f2ec0
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.pulsar;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.util.Collector;
+import org.apache.pulsar.client.api.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
+
+/** A pulsar key value deserialization schema for {@link CdcSourceRecord}. */
+public class PulsarKeyValueDeserializationSchema
+        implements PulsarDeserializationSchema<CdcSourceRecord> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(CdcJsonDeserializationSchema.class);
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    public PulsarKeyValueDeserializationSchema() {
+        objectMapper
+                .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true)
+                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    @Override
+    public void deserialize(Message<byte[]> message, Collector<CdcSourceRecord> collector)
+            throws Exception {
+        if (message == null || message.getValue() == null) {
+            return;
+        }
+        try {
+            CdcSourceRecord record =
+                    new CdcSourceRecord(
+                            message.getKey() == null
+                                    ? null
+                                    : objectMapper.readValue(message.getKey(), JsonNode.class),
+                            objectMapper.readValue(message.getValue(), JsonNode.class));
+            collector.collect(record);
+        } catch (Exception e) {
+            LOG.error("Invalid Json:\n{} \n{}", message.getKey(), new String(message.getValue()));
+            throw e;
+        }
+    }
+
+    @Override
+    public TypeInformation<CdcSourceRecord> getProducedType() {
+        return getForClass(CdcSourceRecord.class);
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index 7d8c78c2eddf8..72225e7df2a87 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -328,6 +328,18 @@ private boolean isRecordLine(String line) {
         }
     }
 
+    private void send(String topic, String key, String record, boolean wait) {
+        Future<RecordMetadata> sendFuture =
+                kafkaProducer.send(new ProducerRecord<>(topic, key, record));
+        if (wait) {
+            try {
+                sendFuture.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     private void send(String topic, String record, boolean wait) {
         Future<RecordMetadata> sendFuture = kafkaProducer.send(new ProducerRecord<>(topic, record));
         if (wait) {
@@ -339,6 +351,20 @@ private void send(String topic, String record, boolean wait) {
         }
     }
 
+    void writeRecordsToKafka(String topic, boolean wait, String resourceDirFormat)
+            throws Exception {
+        URL url =
+                KafkaCanalSyncTableActionITCase.class
+                        .getClassLoader()
+                        .getResource(String.format(resourceDirFormat));
+        List<String> lines = Files.readAllLines(Paths.get(url.toURI()));
+        lines.stream()
+                .map(line -> line.split(";"))
+                .filter(keyValues -> (keyValues.length > 1))
+                .filter(keyValues -> isRecordLine(keyValues[0]) && isRecordLine(keyValues[1]))
+                .forEach(keyValues -> this.send(topic, keyValues[0], keyValues[1], wait));
+    }
+
     /** Kafka container extension for junit5. */
     private static class KafkaContainerExtension extends KafkaContainer
             implements BeforeAllCallback, AfterAllCallback {
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
index 04dfb3769a615..3c3aae7f6fc7b 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
@@ -147,4 +147,16 @@ public void testMessageWithNullValue() throws Exception {
                 rowType,
                 Collections.singletonList("id"));
     }
+
+    @Timeout(120)
+    @Test
+    public void testRecordAndAutoDiscoveryPrimaryKeys() throws Exception {
+        testRecordWithPrimaryKeys(DEBEZIUM);
+    }
+
+    @Test
+    @Timeout(120)
+    public void testRecordIncludeSchemaAndAutoDiscoveryPrimaryKeys() throws Exception {
+        testSchemaIncludeRecordWithPrimaryKeys(DEBEZIUM);
+    }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
index 5f7df79e48ed3..adb85a2aa4292 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
@@ -569,6 +569,70 @@ public void testSchemaIncludeRecord(String format) throws Exception {
         waitForResult(expected, table, rowType, primaryKeys);
     }
 
+    public void testRecordWithPrimaryKeys(String format) throws Exception {
+        String topic = "no_schema_include_with_primary_keys";
+        createTestTopic(topic, 1, 1);
+        writeRecordsToKafka(
+                topic, false, "kafka/debezium/table/schema/primarykeys/debezium-data-1.txt");
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
+        kafkaConfig.put(TOPIC.key(), topic);
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build();
+        runActionWithDefaultEnv(action);
+
+        FileStoreTable table = getFileStoreTable(tableName);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys = Collections.singletonList("id");
+        List<String> expected =
+                Arrays.asList(
+                        "+I[101, scooter, Small 2-wheel scooter, 3.14]",
+                        "+I[102, car battery, 12V car battery, 8.1]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+
+    public void testSchemaIncludeRecordWithPrimaryKeys(String format) throws Exception {
+        String topic = "schema_include_with_primary_keys";
+        createTestTopic(topic, 1, 1);
+        writeRecordsToKafka(
+                topic,
+                false,
+                "kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt");
+
+        Map<String, String> kafkaConfig = getBasicKafkaConfig();
+        kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
+        kafkaConfig.put(TOPIC.key(), topic);
+        KafkaSyncTableAction action =
+                syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build();
+        runActionWithDefaultEnv(action);
+
+        FileStoreTable table = getFileStoreTable(tableName);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.DOUBLE()
+                        },
+                        new String[] {"id", "name", "description", "weight"});
+        List<String> primaryKeys = Collections.singletonList("id");
+        List<String> expected =
+                Collections.singletonList(
+                        "+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+
     // TODO some types are different from mysql cdc; maybe need to fix
     public void testAllTypesWithSchemaImpl(String format) throws Exception {
         String topic = "schema_include_all_type";
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt
new file mode 100644
index 0000000000000..9bb00c7786d8c
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"schema": null ,"payload": {"id": 101}};{"schema":null, "payload":{"before": null, "after": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}}
+{"schema": null ,"payload": {"id": 102}};{"schema":null, "payload":{"before": null, "after": {"id": 102, "name": "car battery", "description": "12V car battery", "weight": 8.1}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt
new file mode 100644
index 0000000000000..1feaafe9fc20b
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"schema": null ,"payload": {"id": 101}};{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}}