Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jun 26, 2024
1 parent 54b4672 commit 8b54215
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,8 @@ public CdcSourceRecord deserialize(ConsumerRecord<byte[], byte[]> record) throws
record.key() != null
? objectMapper.readValue(record.key(), JsonNode.class)
: null;
CdcSourceRecord sourceRecord =
new CdcSourceRecord(
key, objectMapper.readValue(record.value(), JsonNode.class));
return sourceRecord;
JsonNode value = objectMapper.readValue(record.value(), JsonNode.class);
return new CdcSourceRecord(key, value);
} catch (Exception e) {
LOG.error("Invalid Json:\n{}", new String(record.value()));
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,8 @@ public CdcSourceRecord deserialize(Message<byte[]> message) throws IOException {
message.getKey() != null
? objectMapper.readValue(message.getKey(), JsonNode.class)
: null;
CdcSourceRecord record =
new CdcSourceRecord(
key, objectMapper.readValue(message.getValue(), JsonNode.class));
return record;
JsonNode value = objectMapper.readValue(message.getValue(), JsonNode.class);
return new CdcSourceRecord(key, value);
} catch (Exception e) {
LOG.error("Invalid Json:\n{} \n{}", message.getKey(), new String(message.getValue()));
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ private void send(String topic, String record, boolean wait) {
}
}

void writeRecordsToKafka(String topic, boolean wait, String resourceDirFormat)
void writeRecordsToKafkaWithKey(String topic, boolean wait, String resourceDirFormat)
throws Exception {
URL url =
KafkaCanalSyncTableActionITCase.class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ public void testSchemaIncludeRecord(String format) throws Exception {
public void testRecordWithPrimaryKeys(String format) throws Exception {
String topic = "no_schema_include_with_primary_keys";
createTestTopic(topic, 1, 1);
writeRecordsToKafka(
writeRecordsToKafkaWithKey(
topic, false, "kafka/debezium/table/schema/primarykeys/debezium-data-1.txt");
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
Expand Down Expand Up @@ -603,7 +603,7 @@ public void testRecordWithPrimaryKeys(String format) throws Exception {
public void testSchemaIncludeRecordWithPrimaryKeys(String format) throws Exception {
String topic = "schema_include_with_primary_keys";
createTestTopic(topic, 1, 1);
writeRecordsToKafka(
writeRecordsToKafkaWithKey(
topic,
false,
"kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt");
Expand Down

0 comments on commit 8b54215

Please sign in to comment.