Skip to content

Commit

Permalink
Merge pull request #464 from embano1/issue-272
Browse files Browse the repository at this point in the history
Add `JSONPathDetailTypeMapper`
  • Loading branch information
embano1 authored Jan 3, 2025
2 parents e218665 + 19f29f4 commit cef53eb
Show file tree
Hide file tree
Showing 8 changed files with 387 additions and 27 deletions.
110 changes: 89 additions & 21 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class EventBridgeSinkConfig extends AbstractConfig {
"The JSON Path to offload record value";
public static final String AWS_OFFLOADING_DEFAULT_FIELDREF_DEFAULT =
S3EventBridgeEventDetailValueOffloading.JSON_PATH_PREFIX;
public static final String AWS_DETAIL_TYPES_MAPPER_JSON_PATH_MAPPER_FIELDREF =
"aws.eventbridge.detail.types.jsonpathmapper.fieldref";

private static final String AWS_DETAIL_TYPES_MAPPER_CLASS_DEFAULT =
"software.amazon.event.kafkaconnector.mapping.DefaultDetailTypeMapper";
Expand All @@ -85,6 +87,9 @@ public class EventBridgeSinkConfig extends AbstractConfig {
+ "Can be defined per topic e.g., 'topic1:MyDetailType, topic2:MyDetailType', as a single expression "
+ "with a dynamic '${topic}' placeholder for all topics e.g., 'my-detail-type-${topic}', "
+ "or as a static value without additional topic information for all topics e.g., 'my-detail-type'.";
private static final String AWS_DETAIL_TYPES_MAPPER_JSON_PATH_MAPPER_FIELDREF_DOC =
"The JSON Path to extract the detail type from the record when using JsonPathDetailTypeMapper. "
+ "Must be a definite path.";
private static final String AWS_DETAIL_TYPES_MAPPER_DOC =
"Define a custom implementation class for the DetailTypeMapper interface to customize the mapping of Kafka topics or records to the EventBridge detail-type. Define full class path e.g. software.amazon.event.kafkaconnector.mapping.DefaultDetailTypeMapper.";

Expand Down Expand Up @@ -236,6 +241,12 @@ private static void addParams(final ConfigDef configDef) {
AWS_DETAIL_TYPES_MAPPER_CLASS_DEFAULT,
Importance.MEDIUM,
AWS_DETAIL_TYPES_MAPPER_DOC);
configDef.define(
AWS_DETAIL_TYPES_MAPPER_JSON_PATH_MAPPER_FIELDREF,
Type.STRING,
"",
Importance.MEDIUM,
AWS_DETAIL_TYPES_MAPPER_JSON_PATH_MAPPER_FIELDREF_DOC);
configDef.define(
AWS_OFFLOADING_DEFAULT_S3_ENDPOINT_URI,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private DetailTypeMapper getDetailTypeMapper(EventBridgeSinkConfig config) {
return detailTypeMapper;
} catch (Exception e) {
// This will already be verified in the Config Validator
throw new RuntimeException("Topic to Detail-Type Mapper Class can't be loaded.");
throw new RuntimeException("Topic to Detail-Type Mapper Class can't be loaded: ", e);
}
}

Expand All @@ -85,7 +85,7 @@ private TimeMapper getTimeMapper(EventBridgeSinkConfig config) {
return (TimeMapper) constructor.newInstance();
} catch (Exception e) {
// This will already be verified in the Config Validator
throw new RuntimeException("Time Mapper Class can't be loaded.");
throw new RuntimeException("Time Mapper Class can't be loaded: ", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package software.amazon.event.kafkaconnector.mapping;

import static com.jayway.jsonpath.Configuration.defaultConfiguration;
import static com.jayway.jsonpath.Option.SUPPRESS_EXCEPTIONS;
import static java.lang.String.format;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.connect.json.JsonConverterConfig.SCHEMAS_ENABLE_CONFIG;
import static software.amazon.event.kafkaconnector.EventBridgeSinkConfig.AWS_DETAIL_TYPES_MAPPER_JSON_PATH_MAPPER_FIELDREF;

import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import software.amazon.event.kafkaconnector.EventBridgeSinkConfig;
import software.amazon.event.kafkaconnector.logging.ContextAwareLoggerFactory;

public class JsonPathDetailTypeMapper implements DetailTypeMapper {
private static final Logger log =
ContextAwareLoggerFactory.getLogger(JsonPathDetailTypeMapper.class);

private static final Configuration jsonPathConfiguration =
defaultConfiguration()
.addOptions(
// suppress exception otherwise com.jayway.jsonpath.ReadContext#read throws an
// exception if JSON path could not be found
SUPPRESS_EXCEPTIONS);

private String jsonPath;
private final JsonConverter jsonConverter = new JsonConverter();

public JsonPathDetailTypeMapper() {
this.jsonConverter.configure(singletonMap(SCHEMAS_ENABLE_CONFIG, "false"), false);
}

@Override
public String getDetailType(SinkRecord record) {
if (record == null) {
throw new IllegalArgumentException("SinkRecord is null. Unable to extract detail type.");
}

if (record.topic() == null || record.topic().trim().isEmpty()) {
throw new IllegalArgumentException(
"SinkRecord topic is null or empty but is required for fallback logic.");
}

var topic = record.topic();
try {
var jsonBytes = jsonConverter.fromConnectData(topic, record.valueSchema(), record.value());
// super defensive, because null here should never be the case
var jsonString = new String(jsonBytes).trim();
if (jsonBytes == null || jsonString.isEmpty()) {
log.error(
"Record value conversion to JSON bytes returned null or empty string for record '{}', using topic '{}' as"
+ " fallback",
record,
topic);
return topic;
}

var extractedValue = JsonPath.using(jsonPathConfiguration).parse(jsonString).read(jsonPath);

if (extractedValue == null) {
log.warn(
"Parsed JSON value is null for JSON path '{}' and record '{}', using topic '{}' as fallback",
jsonPath,
record,
topic);
return topic;
}

if (!(extractedValue instanceof String)) {
log.warn(
"Parsed JSON value is not of type String for for JSON path '{}' and record '{}', using topic '{}' as fallback",
jsonPath,
record,
topic);
return topic;
}

if (((String) extractedValue).trim().isEmpty()) {
log.warn(
"Parsed JSON value is empty String for JSON path '{}' and record '{}', using topic '{}' as fallback",
jsonPath,
record,
topic);
return topic;
}

log.trace(
"Successfully extracted detail type '{}' for JSON path '{}' and record '{}'",
extractedValue,
jsonPath,
record);
return (String) extractedValue;
} catch (Exception e) {
log.error(
"Could not extract JSON value for JSON path '{}' and record '{}', using topic '{}' as fallback",
jsonPath,
record,
topic);
return topic;
}
}

@Override
public void configure(EventBridgeSinkConfig config) {
if (config == null) {
throw new IllegalArgumentException("EventBridgeSinkConfig cannot be null.");
}

var jsonPath = config.getString(AWS_DETAIL_TYPES_MAPPER_JSON_PATH_MAPPER_FIELDREF);

if (jsonPath == null || jsonPath.trim().isEmpty()) {
throw new IllegalArgumentException(
"JSON path configuration must be provided and cannot be empty.");
}

var path = JsonPath.compile(jsonPath);
if (!path.isDefinite()) {
throw new IllegalArgumentException(
format("JSON path must be definite but '%s' is not", jsonPath));
}

this.jsonPath = path.getPath();
log.info("JsonPathDetailTypeMapper configured successfully with JSON path '{}'", this.jsonPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ public class S3EventBridgeEventDetailValueOffloading
private static final Configuration jsonPathConfiguration =
defaultConfiguration()
.addOptions(
SUPPRESS_EXCEPTIONS // suppress exception otherwise
// com.jayway.jsonpath.ReadContext#read throws an exception if JSON path could not
// be found
);
// suppress exception otherwise com.jayway.jsonpath.ReadContext#read throws an
// exception if JSON path could not be found
SUPPRESS_EXCEPTIONS);

private final String bucketName;
private final S3AsyncClient client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package software.amazon.event.kafkaconnector.mapping;

import static org.assertj.core.api.Assertions.assertThat;
import static software.amazon.event.kafkaconnector.mapping.TestDetailTypeMapper.DETAIL_TYPE;

import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -42,6 +43,21 @@ public void mapWithSpecifiedTimeMapper() {
assertThat(putEventsRequestEntry.getValue().time()).isEqualTo("1981-12-24T00:00:00Z");
}

@Test
@DisplayName(
"Mapping with DetailTypeMapperClass parameter defined should give custom detail type value")
public void mapWithSpecifiedDetailTypeMapper() {
var props = new HashMap<>(defaultConfig());
props.put(
"aws.eventbridge.detail.types.mapper.class",
"software.amazon.event.kafkaconnector.mapping.TestDetailTypeMapper");
var config = new EventBridgeSinkConfig(props);
var mapper = new DefaultEventBridgeMapper(config);
var result = mapper.map(List.of(defaultSinkRecord()));
var putEventsRequestEntry = result.success.stream().findFirst().get();
assertThat(putEventsRequestEntry.getValue().detailType()).isEqualTo(DETAIL_TYPE);
}

@NotNull
private static SinkRecord defaultSinkRecord() {
return new SinkRecord(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package software.amazon.event.kafkaconnector.mapping;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static software.amazon.event.kafkaconnector.EventBridgeSinkConfig.AWS_DETAIL_TYPES_MAPPER_JSON_PATH_MAPPER_FIELDREF;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.sink.SinkRecord;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.event.kafkaconnector.EventBridgeSinkConfig;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class JsonPathDetailTypeMapperTest {
private static final String TEST_TOPIC = "test-topic";
private static final String TEST_PATH = "$.event-type";
private DetailTypeMapper mapper;

@BeforeEach
void resetMapper() {
this.mapper = new JsonPathDetailTypeMapper();
}

private SinkRecord createSinkRecord(String topic, Map<String, Object> value) {
return new SinkRecord(
topic, 0, null, "key-1", null, value, 0L, 1234568790123L, TimestampType.CREATE_TIME);
}

private void configureMapperWithPath(String jsonPath) {
var config = new HashMap<>(defaultConfig());
if (jsonPath != null) {
config.put(AWS_DETAIL_TYPES_MAPPER_JSON_PATH_MAPPER_FIELDREF, jsonPath);
}
mapper.configure(new EventBridgeSinkConfig(config));
}

@Test
@DisplayName("Exception when record has no topic")
void exceptionWhenRecordHasNoTopic() {
configureMapperWithPath(TEST_PATH);
var invalidRecord = createSinkRecord("", Map.of("somekey", "somevalue"));

var exception =
assertThrows(IllegalArgumentException.class, () -> mapper.getDetailType(invalidRecord));
assertThat(exception.getMessage()).contains("SinkRecord topic is null or empty");
}

@ParameterizedTest
@DisplayName("Uses topic as fallback when JSON path value is invalid")
@MethodSource("provideFallbackTestCases")
void returnsTopicAsFallback(Map<String, Object> value) {
configureMapperWithPath(TEST_PATH);

var record = createSinkRecord(TEST_TOPIC, value);
assertThat(mapper.getDetailType(record)).isEqualTo(TEST_TOPIC);
}

private static Stream<Map<String, Object>> provideFallbackTestCases() {
return Stream.of(
Map.of("some-key", "some-value"), // path not found
Map.of("event-type", Map.of("key2", "val2")), // not a string
Map.of("event-type", "") // empty string
);
}

@Test
@DisplayName("Uses value from JSON path as detail type")
void returnsJsonPathValue() {
configureMapperWithPath(TEST_PATH);

var record = createSinkRecord(TEST_TOPIC, Map.of("event-type", "test.event.v0"));
assertThat(mapper.getDetailType(record)).isEqualTo("test.event.v0");
}

@Test
@DisplayName("Exception when providing empty JSON path")
void configExceptionWhenProvidingEmptyJsonPath() {
var exception =
assertThrows(IllegalArgumentException.class, () -> configureMapperWithPath(null));
assertThat(exception.getMessage()).contains("JSON path configuration must be provided");
}

@Test
@DisplayName("Exception when providing non-definite JSON path")
void configExceptionWhenProvidingNonDefiniteJsonPath() {
var exception =
assertThrows(
IllegalArgumentException.class, () -> configureMapperWithPath("$..somenestedkey"));

assertThat(exception.getMessage()).contains("JSON path must be definite");
}

@NotNull
private static Map<Object, Object> defaultConfig() {
return Map.of(
"aws.eventbridge.retries.max", 10,
"aws.eventbridge.connector.id", "testConnectorId",
"aws.eventbridge.region", "us-east-1",
"aws.eventbridge.eventbus.arn", "arn:aws:events:us-east-1:000000000000:event-bus/e2e",
"aws.eventbridge.detail.types", "test-${topic}");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package software.amazon.event.kafkaconnector.mapping;

import org.apache.kafka.connect.sink.SinkRecord;
import software.amazon.event.kafkaconnector.EventBridgeSinkConfig;

public class TestDetailTypeMapper implements DetailTypeMapper {
public static final String DETAIL_TYPE = "TestDetailType";

@Override
public String getDetailType(SinkRecord record) {
return DETAIL_TYPE;
}

@Override
public void configure(EventBridgeSinkConfig eventBridgeSinkConfig) {}
}

0 comments on commit cef53eb

Please sign in to comment.