diff --git a/src/main/java/software/amazon/event/kafkaconnector/mapping/DefaultEventBridgeMapper.java b/src/main/java/software/amazon/event/kafkaconnector/mapping/DefaultEventBridgeMapper.java index 3af53404..bde560e3 100644 --- a/src/main/java/software/amazon/event/kafkaconnector/mapping/DefaultEventBridgeMapper.java +++ b/src/main/java/software/amazon/event/kafkaconnector/mapping/DefaultEventBridgeMapper.java @@ -73,12 +73,12 @@ private String createJsonPayload(SinkRecord record) throws IOException { root.put("offset", record.kafkaOffset()); root.put("timestamp", record.timestamp()); root.put("timestampType", record.timestampType().toString()); - root.putIfAbsent("headers", createHeaderArray(record)); + root.set("headers", createHeaderArray(record)); if (record.key() == null) { - root.putIfAbsent("key", null); + root.set("key", null); } else { - root.putIfAbsent( + root.set( "key", createJSONFromByteArray( jsonConverter.fromConnectData(record.topic(), record.keySchema(), record.key()))); @@ -86,9 +86,9 @@ private String createJsonPayload(SinkRecord record) throws IOException { // tombstone handling if (record.value() == null) { - root.putIfAbsent("value", null); + root.set("value", null); } else { - root.putIfAbsent( + root.set( "value", createJSONFromByteArray( jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()))); @@ -109,7 +109,7 @@ private ArrayNode createHeaderArray(SinkRecord record) throws IOException { for (Header header : record.headers()) { var headerItem = objectMapper.createObjectNode(); - headerItem.putIfAbsent( + headerItem.set( header.key(), createJSONFromByteArray( jsonConverter.fromConnectHeader(