From c278061bb8b94aa331e457c99a1d1b2d27ce2a7a Mon Sep 17 00:00:00 2001 From: Jens Rathsman Date: Mon, 12 Aug 2024 11:14:23 +0200 Subject: [PATCH] feat: Added TimeMapper Interface and Default Implementation fix some sonarlint issue test timeMapper class override Code review changes Update DefaultEventBridgeMapperTest.java Test displaynames Signed-off-by: Jens Rathsman Update DefaultEventBridgeMapperTest.java Signed-off-by: Jens Rathsman Update src/main/java/software/amazon/event/kafkaconnector/mapping/DefaultTimeMapper.java Co-authored-by: Michael Gasch <15986659+embano1@users.noreply.github.com> Signed-off-by: Jens Rathsman Update src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfig.java Co-authored-by: Michael Gasch <15986659+embano1@users.noreply.github.com> Signed-off-by: Jens Rathsman Code review changes plus test that makes more sense Code review change: Private internals in SinkRecordJsonMapper formatting Update src/test/java/software/amazon/event/kafkaconnector/mapping/DefaultEventBridgeMapperTest.java Co-authored-by: Michael Gasch <15986659+embano1@users.noreply.github.com> Signed-off-by: Jens Rathsman --- .../kafkaconnector/EventBridgeSinkConfig.java | 21 ++++- .../EventBridgeSinkConfigValidator.java | 10 +- .../mapping/DefaultEventBridgeMapper.java | 94 ++++--------------- .../mapping/DefaultTimeMapper.java | 19 ++++ .../mapping/SinkRecordJsonMapper.java | 89 ++++++++++++++++++ .../kafkaconnector/mapping/TimeMapper.java | 12 +++ .../mapping/DefaultEventBridgeMapperTest.java | 68 ++++++++++++++ .../mapping/TestTimeMapper.java | 24 +++++ 8 files changed, 257 insertions(+), 80 deletions(-) create mode 100644 src/main/java/software/amazon/event/kafkaconnector/mapping/DefaultTimeMapper.java create mode 100644 src/main/java/software/amazon/event/kafkaconnector/mapping/SinkRecordJsonMapper.java create mode 100644 src/main/java/software/amazon/event/kafkaconnector/mapping/TimeMapper.java create mode 100644 src/test/java/software/amazon/event/kafkaconnector/mapping/DefaultEventBridgeMapperTest.java create mode 100644 src/test/java/software/amazon/event/kafkaconnector/mapping/TestTimeMapper.java diff --git a/src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfig.java b/src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfig.java index ca501107..ab37b0a5 100644 --- a/src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfig.java +++ b/src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfig.java @@ -36,6 +36,7 @@ public class EventBridgeSinkConfig extends AbstractConfig { static final String AWS_ROLE_EXTERNAL_ID_CONFIG = "aws.eventbridge.iam.external.id"; static final String AWS_DETAIL_TYPES_CONFIG = "aws.eventbridge.detail.types"; static final String AWS_DETAIL_TYPES_MAPPER_CLASS = "aws.eventbridge.detail.types.mapper.class"; + static final String AWS_TIME_MAPPER_CLASS = "aws.eventbridge.time.mapper.class"; static final String AWS_EVENTBUS_RESOURCES_CONFIG = "aws.eventbridge.eventbus.resources"; static final String AWS_OFFLOADING_DEFAULT_S3_BUCKET = "aws.eventbridge.offloading.default.s3.bucket"; @@ -76,10 +77,14 @@ public class EventBridgeSinkConfig extends AbstractConfig { + "Can be defined per topic e.g., 'topic1:MyDetailType, topic2:MyDetailType', as a single expression " + "with a dynamic '${topic}' placeholder for all topics e.g., 'my-detail-type-${topic}', " + "or as a static value without additional topic information for all topics e.g., 'my-detail-type'."; - private static final String AWS_DETAIL_TYPES_MAPPER_DOC = "Define a custom implementation class for the DetailTypeMapper interface to customize the mapping of Kafka topics or records to the EventBridge detail-type. Define full class path e.g. software.amazon.event.kafkaconnector.mapping.DefaultDetailTypeMapper."; + private static final String AWS_TIME_MAPPER_CLASS_DEFAULT = + "software.amazon.event.kafkaconnector.mapping.DefaultTimeMapper"; + private static final String AWS_TIME_MAPPER_DOC = + "Provide a custom implementation class for the TimeMapper interface to customize the mapping of records to EventBridge metadata field 'time' e.g. 'software.amazon.event.kafkaconnector.mapping.DefaultTimeMapper'."; + private static final String AWS_EVENTBUS_RESOURCES_DOC = "An optional comma-separated list of strings to add to " + "the resources field in the outgoing EventBridge events."; @@ -99,6 +104,7 @@ public class EventBridgeSinkConfig extends AbstractConfig { public Map detailTypeByTopic; public String detailType; public String detailTypeMapperClass; + public String timeMapperClass; public String offloadingDefaultS3Bucket; public String offloadingDefaultFieldRef; @@ -116,6 +122,7 @@ public EventBridgeSinkConfig(final Map originalProps) { this.retriesDelay = getInt(AWS_RETRIES_DELAY_CONFIG); this.resources = getList(AWS_EVENTBUS_RESOURCES_CONFIG); this.detailTypeMapperClass = getString(AWS_DETAIL_TYPES_MAPPER_CLASS); + this.timeMapperClass = getString(AWS_TIME_MAPPER_CLASS); this.offloadingDefaultS3Bucket = getString(AWS_OFFLOADING_DEFAULT_S3_BUCKET); this.offloadingDefaultFieldRef = getString(AWS_OFFLOADING_DEFAULT_FIELDREF); @@ -132,7 +139,7 @@ public EventBridgeSinkConfig(final Map originalProps) { "EventBridge properties: connectorId={} eventBusArn={} eventBusRegion={} eventBusEndpointURI={} " + "eventBusMaxRetries={} eventBusRetriesDelay={} eventBusResources={} " + "eventBusEndpointID={} roleArn={} roleSessionName={} roleExternalID={} " - + "offloadingDefaultS3Bucket={} offloadingDefaultFieldRef={}", + + "offloadingDefaultS3Bucket={} offloadingDefaultFieldRef={} detailTypeMapperClass={} timeMapperClass={}", connectorId, eventBusArn, region, @@ -145,7 +152,9 @@ public EventBridgeSinkConfig(final Map originalProps) { connectorId, externalId, offloadingDefaultS3Bucket, - offloadingDefaultFieldRef); + offloadingDefaultFieldRef, + detailTypeMapperClass, + timeMapperClass); } private static ConfigDef createConfigDef() { @@ -189,6 +198,12 @@ private static void addParams(final ConfigDef configDef) { AWS_DETAIL_TYPES_DEFAULT, Importance.MEDIUM, AWS_DETAIL_TYPES_DOC); + configDef.define( + AWS_TIME_MAPPER_CLASS, + Type.STRING, + AWS_TIME_MAPPER_CLASS_DEFAULT, + Importance.MEDIUM, + AWS_TIME_MAPPER_DOC); configDef.define( AWS_EVENTBUS_RESOURCES_CONFIG, Type.LIST, diff --git a/src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfigValidator.java b/src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfigValidator.java index dac7c69c..9d9e2b90 100644 --- a/src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfigValidator.java +++ b/src/main/java/software/amazon/event/kafkaconnector/EventBridgeSinkConfigValidator.java @@ -90,7 +90,13 @@ public static void validate(ConfigValue configValue, EnvVarGetter getenv) { case AWS_DETAIL_TYPES_MAPPER_CLASS: { - validateDetailTypeMapperClass(configValue); + validateClassExists(configValue); + break; + } + + case AWS_TIME_MAPPER_CLASS: + { + validateClassExists(configValue); break; } @@ -115,7 +121,7 @@ private static void validateConnectorId(ConfigValue configValue) { } } - private static void validateDetailTypeMapperClass(ConfigValue configValue) { + private static void validateClassExists(ConfigValue configValue) { var mapperClass = (String) configValue.value(); try { Class.forName(mapperClass); diff --git a/src/main/java/software/amazon/event/kafkaconnector/mapping/DefaultEventBridgeMapper.java b/src/main/java/software/amazon/event/kafkaconnector/mapping/DefaultEventBridgeMapper.java index 470e7c92..6bbcb673 100644 --- a/src/main/java/software/amazon/event/kafkaconnector/mapping/DefaultEventBridgeMapper.java +++ b/src/main/java/software/amazon/event/kafkaconnector/mapping/DefaultEventBridgeMapper.java @@ -4,20 +4,13 @@ */ package software.amazon.event.kafkaconnector.mapping; -import static java.util.Collections.singletonMap; import static java.util.stream.Collectors.partitioningBy; import static java.util.stream.Collectors.toList; import static software.amazon.event.kafkaconnector.EventBridgeResult.Error.reportOnly; import static software.amazon.event.kafkaconnector.EventBridgeResult.failure; import static software.amazon.event.kafkaconnector.EventBridgeResult.success; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import java.io.IOException; import java.util.List; -import org.apache.kafka.connect.header.Header; -import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import software.amazon.awssdk.services.eventbridge.model.PutEventsRequestEntry; import software.amazon.event.kafkaconnector.EventBridgeResult; @@ -28,14 +21,17 @@ public class DefaultEventBridgeMapper implements EventBridgeMapper { private static final String sourcePrefix = "kafka-connect."; private final EventBridgeSinkConfig config; - private final JsonConverter jsonConverter = new JsonConverter(); - private final ObjectMapper objectMapper = new ObjectMapper(); + + private final SinkRecordJsonMapper jsonMapper = new SinkRecordJsonMapper(); + private final DetailTypeMapper detailTypeMapper; + private final TimeMapper timeMapper; public DefaultEventBridgeMapper(EventBridgeSinkConfig config) { - jsonConverter.configure(singletonMap("schemas.enable", "false"), false); + this.config = config; this.detailTypeMapper = getDetailTypeMapper(config); + this.timeMapper = getTimeMapper(config); } public EventBridgeMappingResult map(List records) { @@ -61,77 +57,14 @@ private EventBridgeResult createPutEventsEntry(SinkRecord .source(sourcePrefix + config.connectorId) .detailType(detailTypeMapper.getDetailType(record)) .resources(config.resources) - .detail(createJsonPayload(record)) + .detail(jsonMapper.createJsonPayload(record)) + .time(timeMapper.getTime(record)) .build()); } catch (Exception e) { return failure(record, reportOnly("Cannot convert Kafka record to EventBridge.", e)); } } - private String createJsonPayload(SinkRecord record) throws IOException { - var root = objectMapper.createObjectNode(); - root.put("topic", record.topic()); - root.put("partition", record.kafkaPartition()); - root.put("offset", record.kafkaOffset()); - root.put("timestamp", record.timestamp()); - root.put("timestampType", record.timestampType().toString()); - root.set("headers", createHeaderArray(record)); - - if (record.key() == null) { - root.set("key", null); - } else { - root.set( - "key", - createJSONFromByteArray( - jsonConverter.fromConnectData(record.topic(), record.keySchema(), record.key()))); - } - - // tombstone handling - if (record.value() == null) { - root.set("value", null); - } else { - root.set( - "value", - createJSONFromByteArray( - jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()))); - } - - return root.toString(); - } - - /** - * This method serializes Kafka message headers to JSON. - * - * @param record Kafka record to be sent to EventBridge - * @return headers to be added to EventBridge message - * @throws IOException - */ - private ArrayNode createHeaderArray(SinkRecord record) throws IOException { - var headersArray = objectMapper.createArrayNode(); - - for (Header header : record.headers()) { - var headerItem = objectMapper.createObjectNode(); - headerItem.set( - header.key(), - createJSONFromByteArray( - jsonConverter.fromConnectHeader( - record.topic(), header.key(), header.schema(), header.value()))); - headersArray.add(headerItem); - } - return headersArray; - } - - /** - * This method converts the byteArray which is returned by the {@link JsonConverter} to JSON. - * - * @param jsonBytes - byteArray to convert to JSON - * @return the JSON representation of jsonBytes - * @throws IOException - */ - private JsonNode createJSONFromByteArray(byte[] jsonBytes) throws IOException { - return objectMapper.readTree(jsonBytes); - } - private DetailTypeMapper getDetailTypeMapper(EventBridgeSinkConfig config) { try { var myClass = Class.forName(config.detailTypeMapperClass); @@ -144,4 +77,15 @@ private DetailTypeMapper getDetailTypeMapper(EventBridgeSinkConfig config) { throw new RuntimeException("Topic to Detail-Type Mapper Class can't be loaded."); } } + + private TimeMapper getTimeMapper(EventBridgeSinkConfig config) { + try { + var myClass = Class.forName(config.timeMapperClass); + var constructor = myClass.getDeclaredConstructor(); + return (TimeMapper) constructor.newInstance(); + } catch (Exception e) { + // This will already be verified in the Config Validator + throw new RuntimeException("Time Mapper Class can't be loaded."); + } + } } diff --git a/src/main/java/software/amazon/event/kafkaconnector/mapping/DefaultTimeMapper.java b/src/main/java/software/amazon/event/kafkaconnector/mapping/DefaultTimeMapper.java new file mode 100644 index 00000000..e02a42bd --- /dev/null +++ b/src/main/java/software/amazon/event/kafkaconnector/mapping/DefaultTimeMapper.java @@ -0,0 +1,19 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +package software.amazon.event.kafkaconnector.mapping; + +import java.time.Instant; +import org.apache.kafka.connect.sink.SinkRecord; + +public class DefaultTimeMapper implements TimeMapper { + + @Override + public Instant getTime(SinkRecord sinkRecord) { + // As described in AWS documentation + // https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_PutEventsRequestEntry.html + // If no timestamp is provided, the timestamp of the PutEvents call is used. + return null; + } +} diff --git a/src/main/java/software/amazon/event/kafkaconnector/mapping/SinkRecordJsonMapper.java b/src/main/java/software/amazon/event/kafkaconnector/mapping/SinkRecordJsonMapper.java new file mode 100644 index 00000000..76b12137 --- /dev/null +++ b/src/main/java/software/amazon/event/kafkaconnector/mapping/SinkRecordJsonMapper.java @@ -0,0 +1,89 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +package software.amazon.event.kafkaconnector.mapping; + +import static java.util.Collections.singletonMap; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import java.io.IOException; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.sink.SinkRecord; + +public class SinkRecordJsonMapper { + private final JsonConverter jsonConverter = new JsonConverter(); + private final ObjectMapper objectMapper = new ObjectMapper(); + + public SinkRecordJsonMapper() { + jsonConverter.configure(singletonMap("schemas.enable", "false"), false); + } + + public String createJsonPayload(SinkRecord sinkRecord) throws IOException { + var root = objectMapper.createObjectNode(); + root.put("topic", sinkRecord.topic()); + root.put("partition", sinkRecord.kafkaPartition()); + root.put("offset", sinkRecord.kafkaOffset()); + root.put("timestamp", sinkRecord.timestamp()); + root.put("timestampType", sinkRecord.timestampType().toString()); + root.set("headers", createHeaderArray(sinkRecord)); + + if (sinkRecord.key() == null) { + root.set("key", null); + } else { + root.set( + "key", + createJSONFromByteArray( + jsonConverter.fromConnectData( + sinkRecord.topic(), sinkRecord.keySchema(), sinkRecord.key()))); + } + + // tombstone handling + if (sinkRecord.value() == null) { + root.set("value", null); + } else { + root.set( + "value", + createJSONFromByteArray( + jsonConverter.fromConnectData( + sinkRecord.topic(), sinkRecord.valueSchema(), sinkRecord.value()))); + } + return root.toString(); + } + + /** + * This method serializes Kafka message headers to JSON. + * + * @param sinkRecord Kafka record to be sent to EventBridge + * @return headers to be added to EventBridge message + * @throws IOException + */ + private ArrayNode createHeaderArray(SinkRecord sinkRecord) throws IOException { + var headersArray = objectMapper.createArrayNode(); + + for (Header header : sinkRecord.headers()) { + var headerItem = objectMapper.createObjectNode(); + headerItem.set( + header.key(), + createJSONFromByteArray( + jsonConverter.fromConnectHeader( + sinkRecord.topic(), header.key(), header.schema(), header.value()))); + headersArray.add(headerItem); + } + return headersArray; + } + + /** + * This method converts the byteArray which is returned by the {@link JsonConverter} to JSON. + * + * @param jsonBytes - byteArray to convert to JSON + * @return the JSON representation of jsonBytes + * @throws IOException + */ + private JsonNode createJSONFromByteArray(byte[] jsonBytes) throws IOException { + return objectMapper.readTree(jsonBytes); + } +} diff --git a/src/main/java/software/amazon/event/kafkaconnector/mapping/TimeMapper.java b/src/main/java/software/amazon/event/kafkaconnector/mapping/TimeMapper.java new file mode 100644 index 00000000..9cbedb11 --- /dev/null +++ b/src/main/java/software/amazon/event/kafkaconnector/mapping/TimeMapper.java @@ -0,0 +1,12 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +package software.amazon.event.kafkaconnector.mapping; + +import java.time.Instant; +import org.apache.kafka.connect.sink.SinkRecord; + +public interface TimeMapper { + Instant getTime(SinkRecord sinkRecord); +} diff --git a/src/test/java/software/amazon/event/kafkaconnector/mapping/DefaultEventBridgeMapperTest.java b/src/test/java/software/amazon/event/kafkaconnector/mapping/DefaultEventBridgeMapperTest.java new file mode 100644 index 00000000..70022d47 --- /dev/null +++ b/src/test/java/software/amazon/event/kafkaconnector/mapping/DefaultEventBridgeMapperTest.java @@ -0,0 +1,68 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +package software.amazon.event.kafkaconnector.mapping; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.sink.SinkRecord; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import software.amazon.event.kafkaconnector.EventBridgeSinkConfig; + +public class DefaultEventBridgeMapperTest { + + @Test + @DisplayName("Mapping without timeMapperClass parameter defined should give time null") + public void mapWithDefaultTimeMapper() { + var config = new EventBridgeSinkConfig(defaultConfig()); + var mapper = new DefaultEventBridgeMapper(config); + var result = mapper.map(List.of(defaultSinkRecord())); + var putEventsRequestEntry = result.success.stream().findFirst().get(); + assertThat(putEventsRequestEntry.getValue().time()).isNull(); + } + + @Test + @DisplayName("Mapping with timeMapperClass parameter defined should give a time value") + public void mapWithSpecifiedTimeMapper() { + var props = new HashMap<>(defaultConfig()); + props.put( + "aws.eventbridge.time.mapper.class", + "software.amazon.event.kafkaconnector.mapping.TestTimeMapper"); + var config = new EventBridgeSinkConfig(props); + var mapper = new DefaultEventBridgeMapper(config); + var result = mapper.map(List.of(defaultSinkRecord())); + var putEventsRequestEntry = result.success.stream().findFirst().get(); + assertThat(putEventsRequestEntry.getValue().time()).isEqualTo("1981-12-24T00:00:00Z"); + } + + @NotNull + private static SinkRecord defaultSinkRecord() { + return new SinkRecord( + "topic-1", + 0, + null, + "key-1", + null, + Map.of("timeField", "1981-12-24T00:00:00Z"), + 0L, + 1234568790123L, + TimestampType.CREATE_TIME); + } + + @NotNull + private static Map defaultConfig() { + return Map.of( + "aws.eventbridge.retries.max", 10, + "aws.eventbridge.connector.id", "testConnectorId", + "aws.eventbridge.region", "us-east-1", + "aws.eventbridge.eventbus.arn", "arn:aws:events:us-east-1:000000000000:event-bus/e2e", + "aws.eventbridge.detail.types", "test-${topic}"); + } +} diff --git a/src/test/java/software/amazon/event/kafkaconnector/mapping/TestTimeMapper.java b/src/test/java/software/amazon/event/kafkaconnector/mapping/TestTimeMapper.java new file mode 100644 index 00000000..ebcdc681 --- /dev/null +++ b/src/test/java/software/amazon/event/kafkaconnector/mapping/TestTimeMapper.java @@ -0,0 +1,24 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +package software.amazon.event.kafkaconnector.mapping; + +import com.jayway.jsonpath.JsonPath; +import java.io.IOException; +import java.time.Instant; +import org.apache.kafka.connect.sink.SinkRecord; + +public class TestTimeMapper implements TimeMapper { + + @Override + public Instant getTime(SinkRecord record) { + try { + String jsonPayload = new SinkRecordJsonMapper().createJsonPayload(record); + String fieldValue = JsonPath.read(jsonPayload, "$.value.timeField").toString(); + return Instant.parse(fieldValue); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +}