Skip to content

Commit

Permalink
Kafka debezium json supports automatic discovery of primary keys
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jun 25, 2024
1 parent 3baca1c commit eae77b9
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public abstract class RecordParser

protected JsonNode root;

protected JsonNode key;

public RecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
this.typeMapping = typeMapping;
this.computedColumns = computedColumns;
Expand All @@ -85,6 +87,7 @@ public RecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumn
public Schema buildSchema(CdcSourceRecord record) {
try {
setRoot(record);
setKey(record);
if (isDDL()) {
return null;
}
Expand Down Expand Up @@ -175,7 +178,7 @@ protected void evalComputedColumns(
});
}

private List<String> extractPrimaryKeys() {
protected List<String> extractPrimaryKeys() {
ArrayNode pkNames = getNodeAs(root, primaryField(), ArrayNode.class);
if (pkNames == null) {
return Collections.emptyList();
Expand Down Expand Up @@ -204,6 +207,10 @@ private RichCdcMultiplexRecord createRecord(
new CdcRecord(rowKind, data));
}

protected void setKey(CdcSourceRecord record) {
key = (JsonNode) record.getKey();
}

protected void setRoot(CdcSourceRecord record) {
root = (JsonNode) record.getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
Expand Down Expand Up @@ -135,6 +136,16 @@ protected void setRoot(CdcSourceRecord record) {
}
}

@Override
protected void setKey(CdcSourceRecord record) {
JsonNode node = (JsonNode) record.getKey();
if (node.has(FIELD_SCHEMA)) {
key = node.get(FIELD_PAYLOAD);
} else {
key = node;
}
}

private void parseSchema(JsonNode schema) {
debeziumTypes.clear();
classNames.clear();
Expand Down Expand Up @@ -217,6 +228,16 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
return resultMap;
}

@Override
protected List<String> extractPrimaryKeys() {
if (key != null) {
List<String> primaryKeys = Lists.newArrayList();
key.fieldNames().forEachRemaining(primaryKeys::add);
return primaryKeys;
}
return super.extractPrimaryKeys();
}

@Override
protected String primaryField() {
return FIELD_PRIMARY;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,18 @@ private boolean isRecordLine(String line) {
}
}

private void send(String topic, String key, String record, boolean wait) {
Future<RecordMetadata> sendFuture =
kafkaProducer.send(new ProducerRecord<>(topic, key, record));
if (wait) {
try {
sendFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}

private void send(String topic, String record, boolean wait) {
Future<RecordMetadata> sendFuture = kafkaProducer.send(new ProducerRecord<>(topic, record));
if (wait) {
Expand All @@ -339,6 +351,20 @@ private void send(String topic, String record, boolean wait) {
}
}

void writeRecordsToKafka(String topic, boolean wait, String resourceDirFormat)
throws Exception {
URL url =
KafkaCanalSyncTableActionITCase.class
.getClassLoader()
.getResource(String.format(resourceDirFormat));
List<String> lines = Files.readAllLines(Paths.get(url.toURI()));
lines.stream()
.map(line -> line.split(";"))
.filter(keyValues -> (keyValues.length > 1))
.filter(keyValues -> isRecordLine(keyValues[0]) && isRecordLine(keyValues[1]))
.forEach(keyValues -> this.send(topic, keyValues[0], keyValues[1], wait));
}

/** Kafka container extension for junit5. */
private static class KafkaContainerExtension extends KafkaContainer
implements BeforeAllCallback, AfterAllCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,16 @@ public void testMessageWithNullValue() throws Exception {
rowType,
Collections.singletonList("id"));
}

@Timeout(120)
@Test
public void testRecordAndAutoDiscoveryPrimaryKeys() throws Exception {
testRecordWithPrimaryKeys(DEBEZIUM);
}

@Test
@Timeout(120)
public void testSchemaIncludeRecordAndAutoDiscoveryPrimaryKeys() throws Exception {
testSchemaIncludeRecordWithPrimaryKeys(DEBEZIUM);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,78 @@ public void testSchemaIncludeRecord(String format) throws Exception {
waitForResult(expected, table, rowType, primaryKeys);
}

public void testRecordWithPrimaryKeys(String format) throws Exception {
String topic = "no_schema_include_with_primary_keys";
createTestTopic(topic, 1, 1);

try {
writeRecordsToKafka(
topic, false, "kafka/debezium/table/schema/primarykeys/debezium-data-1.txt");
} catch (Exception e) {
throw new Exception("Failed to write debezium data to Kafka.", e);
}
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build();
runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable(tableName);

RowType rowType =
RowType.of(
new DataType[] {
DataTypes.STRING().notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING()
},
new String[] {"id", "name", "description", "weight"});
List<String> primaryKeys = Collections.singletonList("id");
List<String> expected =
Arrays.asList(
"+I[101, scooter, Small 2-wheel scooter, 3.14]",
"+I[102, car battery, 12V car battery, 8.1]");
waitForResult(expected, table, rowType, primaryKeys);
}

public void testSchemaIncludeRecordWithPrimaryKeys(String format) throws Exception {
String topic = "schema_include_with_primary_keys";
createTestTopic(topic, 1, 1);
try {
writeRecordsToKafka(
topic,
false,
"kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt");
} catch (Exception e) {
throw new Exception("Failed to write debezium data to Kafka.", e);
}
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build();
runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable(tableName);

RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.DOUBLE()
},
new String[] {"id", "name", "description", "weight"});
List<String> primaryKeys = Collections.singletonList("id");
List<String> expected =
Collections.singletonList(
"+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]");
waitForResult(expected, table, rowType, primaryKeys);
}

// TODO some types are different from mysql cdc; maybe need to fix
public void testAllTypesWithSchemaImpl(String format) throws Exception {
String topic = "schema_include_all_type";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{"schema": null ,"payload": {"id": 101}};{"schema":null, "payload":{"before": null, "after": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}}
{"schema": null ,"payload": {"id": 102}};{"schema":null, "payload":{"before": null, "after": {"id": 102, "name": "car battery", "description": "12V car battery", "weight": 8.1}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{"schema": null ,"payload": {"id": 101}};{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}}

0 comments on commit eae77b9

Please sign in to comment.