From 6462be2dcc2280460d42fdc3769dd1cdfadd2ff9 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Wed, 28 Feb 2024 10:01:49 +0100 Subject: [PATCH] Perform cloud event deserialization in deserializer wrapper in order to handle failures --- .../messaging/kafka/IncomingKafkaRecord.java | 14 ++-- .../kafka/fault/DeserializerWrapper.java | 26 ++++++- .../messaging/kafka/impl/KafkaSource.java | 5 +- .../kafka/impl/ReactiveKafkaConsumer.java | 6 +- .../kafka/impl/ce/KafkaCloudEventHelper.java | 68 ++++++++++++------- .../ce/KafkaSourceWithCloudEventsTest.java | 63 +++++++++++++++++ 6 files changed, 142 insertions(+), 40 deletions(-) diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java index 6425943ade..38f9ae89e3 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java @@ -42,16 +42,18 @@ public IncomingKafkaRecord(ConsumerRecord record, boolean payloadSet = false; if (cloudEventEnabled) { // Cloud Event detection - KafkaCloudEventHelper.CloudEventMode mode = KafkaCloudEventHelper.getCloudEventMode(record); + KafkaCloudEventHelper.CloudEventMode mode = KafkaCloudEventHelper.getCloudEventMode(record.headers()); switch (mode) { case NOT_A_CLOUD_EVENT: break; case STRUCTURED: - CloudEventMetadata event = KafkaCloudEventHelper - .createFromStructuredCloudEvent(record); - meta.add(event); - payloadSet = true; - payload = event.getData(); + if (record.value() != null) { + CloudEventMetadata event = KafkaCloudEventHelper + .createFromStructuredCloudEvent(record); + meta.add(event); + payloadSet = true; + payload = event.getData(); + } break; case BINARY: meta.add(KafkaCloudEventHelper.createFromBinaryCloudEvent(record)); diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/DeserializerWrapper.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/DeserializerWrapper.java index f08065ebb5..8f72141697 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/DeserializerWrapper.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/DeserializerWrapper.java @@ -15,6 +15,7 @@ import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler; import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions; import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging; +import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper; /** * Wraps a delegate deserializer to handle config and deserialization failures. @@ -31,14 +32,16 @@ public class DeserializerWrapper implements Deserializer { private final BiConsumer reportFailure; private final boolean failOnDeserializationErrorWithoutHandler; + private final boolean cloudEventsEnabled; public DeserializerWrapper(String className, boolean key, DeserializationFailureHandler failureHandler, - BiConsumer reportFailure, boolean failByDefault) { + BiConsumer reportFailure, boolean failByDefault, boolean cloudEventsEnabled) { this.delegate = createDelegateDeserializer(className); this.handleKeys = key; this.deserializationFailureHandler = failureHandler; this.reportFailure = reportFailure; this.failOnDeserializationErrorWithoutHandler = failByDefault; + this.cloudEventsEnabled = cloudEventsEnabled; } /** @@ -69,12 +72,13 @@ private Deserializer createDelegateDeserializer(String clazz) { @Override public T deserialize(String topic, byte[] data) { - return wrapDeserialize(() -> this.delegate.deserialize(topic, data), topic, null, data); + return wrapDeserialize(() -> handleCloudEvents(this.delegate.deserialize(topic, data), null), topic, null, data); } @Override public T deserialize(String topic, Headers headers, byte[] data) { - return wrapDeserialize(() -> this.delegate.deserialize(topic, headers, data), topic, headers, data); + return wrapDeserialize(() -> handleCloudEvents(this.delegate.deserialize(topic, headers, data), headers), topic, + headers, data); } /** @@ -124,6 +128,22 @@ private T wrapDeserialize(Supplier deserialize, String topic, Headers headers } } + private T handleCloudEvents(T payload, Headers headers) { + if (cloudEventsEnabled && !handleKeys && headers != null) { + KafkaCloudEventHelper.CloudEventMode mode = KafkaCloudEventHelper.getCloudEventMode(headers); + switch (mode) { + case STRUCTURED: + //noinspection unchecked + return (T) KafkaCloudEventHelper.parseStructuredContent(payload); + case BINARY: + return KafkaCloudEventHelper.checkBinaryRecord(payload, headers); + case NOT_A_CLOUD_EVENT: + return payload; + } + } + return payload; + } + @Override public void close() { // Be a bit more defensive here as close can be called after an instantiation failure. diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java index 06c6d35dd1..8519bd19d0 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java @@ -28,7 +28,6 @@ import io.smallrye.reactive.messaging.kafka.*; import io.smallrye.reactive.messaging.kafka.commit.ContextHolder; import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler; -import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue; import io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic; import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler; import io.smallrye.reactive.messaging.kafka.health.KafkaSourceHealth; @@ -166,9 +165,7 @@ public KafkaSource(Vertx vertx, Multi> incomingMulti = multi.onItem().transformToUni(rec -> { IncomingKafkaRecord record = new IncomingKafkaRecord<>(rec, channel, index, commitHandler, failureHandler, isCloudEventEnabled, isTracingEnabled); - if ((failureHandler instanceof KafkaDeadLetterQueue) - && rec.headers() != null - && rec.headers().lastHeader(DESERIALIZATION_FAILURE_DLQ) != null) { + if (rec.headers() != null && rec.headers().lastHeader(DESERIALIZATION_FAILURE_DLQ) != null) { Header reasonMsgHeader = rec.headers().lastHeader(DESERIALIZATION_FAILURE_REASON); String message = reasonMsgHeader != null ? new String(reasonMsgHeader.value()) : null; RecordDeserializationException reason = new RecordDeserializationException( diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaConsumer.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaConsumer.java index 1c3e847f95..a145e7ff0b 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaConsumer.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaConsumer.java @@ -72,6 +72,7 @@ public ReactiveKafkaConsumer(KafkaConnectorIncomingConfiguration config, config.getLazyClient(), config.getPollTimeout(), config.getFailOnDeserializationFailure(), + config.getCloudEvents(), onConsumerCreated, reportFailure, context); @@ -84,6 +85,7 @@ public ReactiveKafkaConsumer(Map kafkaConfiguration, boolean lazyClient, int pollTimeout, boolean failOnDeserializationFailure, + boolean cloudEventsEnabled, java.util.function.Consumer> onConsumerCreated, BiConsumer reportFailure, Context context) { @@ -99,9 +101,9 @@ public ReactiveKafkaConsumer(Map kafkaConfiguration, } Deserializer keyDeserializer = new DeserializerWrapper<>(keyDeserializerCN, true, - keyDeserializationFailureHandler, reportFailure, failOnDeserializationFailure); + keyDeserializationFailureHandler, reportFailure, failOnDeserializationFailure, cloudEventsEnabled); Deserializer valueDeserializer = new DeserializerWrapper<>(valueDeserializerCN, false, - valueDeserializationFailureHandler, reportFailure, failOnDeserializationFailure); + valueDeserializationFailureHandler, reportFailure, failOnDeserializationFailure, cloudEventsEnabled); // Configure the underlying deserializers keyDeserializer.configure(kafkaConfiguration, true); diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ce/KafkaCloudEventHelper.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ce/KafkaCloudEventHelper.java index 748138c8d4..5c5a4e9bf9 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ce/KafkaCloudEventHelper.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/ce/KafkaCloudEventHelper.java @@ -69,20 +69,7 @@ public static IncomingKafkaCloudEventMetadata createFromStructuredC ConsumerRecord record) { DefaultCloudEventMetadataBuilder builder = new DefaultCloudEventMetadataBuilder<>(); - JsonObject content; - if (record.value() instanceof JsonObject) { - content = (JsonObject) record.value(); - } else if (record.value() instanceof String) { - content = new JsonObject((String) record.value()); - } else if (record.value() instanceof byte[]) { - byte[] bytes = (byte[]) record.value(); - Buffer buffer = Buffer.buffer(bytes); - content = buffer.toJsonObject(); - } else { - throw new IllegalArgumentException( - "Invalid value type. Structured Cloud Event can only be created from String, JsonObject and byte[], found: " - + record.value().getClass()); - } + JsonObject content = (JsonObject) record.value(); // Required builder.withSpecVersion(content.getString(CloudEventMetadata.CE_ATTRIBUTE_SPEC_VERSION)); @@ -134,6 +121,30 @@ public static IncomingKafkaCloudEventMetadata createFromStructuredC new DefaultIncomingCloudEventMetadata<>(cloudEventMetadata)); } + public static JsonObject parseStructuredContent(Object value) { + JsonObject content; + if (value instanceof JsonObject) { + content = (JsonObject) value; + } else if (value instanceof String) { + content = new JsonObject((String) value); + } else if (value instanceof byte[]) { + byte[] bytes = (byte[]) value; + Buffer buffer = Buffer.buffer(bytes); + content = buffer.toJsonObject(); + } else { + throw new IllegalArgumentException( + "Invalid value type. Structured Cloud Event can only be created from String, JsonObject and byte[], found: " + + Optional.ofNullable(value).map(Object::getClass).orElse(null)); + } + // validate required source attribute + String source = content.getString(CloudEventMetadata.CE_ATTRIBUTE_SOURCE); + if (source == null) { + throw new IllegalArgumentException( + "The JSON value must contain the " + CloudEventMetadata.CE_ATTRIBUTE_SOURCE + " attribute"); + } + return content; + } + public static IncomingKafkaCloudEventMetadata createFromBinaryCloudEvent( ConsumerRecord record) { DefaultCloudEventMetadataBuilder builder = new DefaultCloudEventMetadataBuilder<>(); @@ -206,6 +217,15 @@ public static IncomingKafkaCloudEventMetadata createFromBinaryCloud new DefaultIncomingCloudEventMetadata<>(cloudEventMetadata)); } + public static T checkBinaryRecord(T payload, Headers headers) { + Header sourceHeader = headers.lastHeader(KAFKA_HEADER_FOR_SOURCE); + if (sourceHeader == null) { + throw new IllegalArgumentException( + "The Kafka record must contain the " + KAFKA_HEADER_FOR_SOURCE + " header"); + } + return payload; + } + @SuppressWarnings("rawtypes") public static ProducerRecord createBinaryRecord(Message message, String topic, OutgoingKafkaRecordMetadata outgoingMetadata, IncomingKafkaRecordMetadata incomingMetadata, @@ -438,32 +458,30 @@ public enum CloudEventMode { NOT_A_CLOUD_EVENT } - public static CloudEventMode getCloudEventMode(ConsumerRecord record) { - String contentType = getHeader(KAFKA_HEADER_CONTENT_TYPE, record); + public static CloudEventMode getCloudEventMode(Headers headers) { + String contentType = getHeader(KAFKA_HEADER_CONTENT_TYPE, headers); if (contentType != null && contentType.startsWith(CE_CONTENT_TYPE_PREFIX)) { return CloudEventMode.STRUCTURED; - } else if (containsAllMandatoryAttributes(record)) { + } else if (containsAllMandatoryAttributes(headers)) { return CloudEventMode.BINARY; } return CloudEventMode.NOT_A_CLOUD_EVENT; } - private static boolean containsAllMandatoryAttributes(ConsumerRecord record) { - return getHeader(KAFKA_HEADER_FOR_ID, record) != null - && getHeader(KAFKA_HEADER_FOR_SOURCE, record) != null - && getHeader(KAFKA_HEADER_FOR_TYPE, record) != null - && getHeader(KAFKA_HEADER_FOR_SPEC_VERSION, record) != null; + private static boolean containsAllMandatoryAttributes(Headers headers) { + return getHeader(KAFKA_HEADER_FOR_ID, headers) != null + && getHeader(KAFKA_HEADER_FOR_SOURCE, headers) != null + && getHeader(KAFKA_HEADER_FOR_TYPE, headers) != null + && getHeader(KAFKA_HEADER_FOR_SPEC_VERSION, headers) != null; } - private static String getHeader(String name, ConsumerRecord record) { - Headers headers = record.headers(); + private static String getHeader(String name, Headers headers) { for (Header header : headers) { if (header.key().equals(name)) { return new String(header.value(), StandardCharsets.UTF_8); } } return null; - } } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSourceWithCloudEventsTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSourceWithCloudEventsTest.java index 42c55caab3..da78df2a25 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSourceWithCloudEventsTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSourceWithCloudEventsTest.java @@ -116,6 +116,69 @@ public void testReceivingStructuredCloudEventsWithStringDeserializer() { } + @SuppressWarnings("unchecked") + @Test + public void testReceivingStructuredCloudEventsWithNonValidJson() { + KafkaMapBasedConfig config = newCommonConfig(); + config.put("topic", topic); + config.put("value.deserializer", StringDeserializer.class.getName()); + config.put("failure-strategy", "ignore"); + config.put("fail-on-deserialization-failure", false); + config.put("channel-name", topic); + KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config); + source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories, + UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1); + + List> messages = new ArrayList<>(); + source.getStream().subscribe().with(messages::add); + + companion.produceStrings().fromRecords( + new ProducerRecord<>(topic, null, null, null, "{\"type\":", + Collections.singletonList( + new RecordHeader("content-type", + "application/cloudevents+json; charset=utf-8".getBytes()))), + new ProducerRecord<>(topic, null, null, null, new JsonObject() + .put("specversion", CloudEventMetadata.CE_VERSION_1_0) + .put("type", "type") + .put("id", "id") + .put("source", "test://test") + .put("subject", "foo") + .put("datacontenttype", "application/json") + .put("dataschema", "http://schema.io") + .put("time", "2020-07-23T09:12:34Z") + .put("data", new JsonObject().put("name", "neo")).encode(), + Collections.singletonList( + new RecordHeader("content-type", + "application/cloudevents+json; charset=utf-8".getBytes())))); + + await().atMost(2, TimeUnit.MINUTES).until(() -> messages.size() >= 1); + + Message message = messages.get(0); + IncomingKafkaCloudEventMetadata metadata = message + .getMetadata(IncomingKafkaCloudEventMetadata.class) + .orElse(null); + assertThat(metadata).isNotNull(); + assertThat(metadata.getSpecVersion()).isEqualTo(CloudEventMetadata.CE_VERSION_1_0); + assertThat(metadata.getType()).isEqualTo("type"); + assertThat(metadata.getId()).isEqualTo("id"); + assertThat(metadata.getSource()).isEqualTo(URI.create("test://test")); + assertThat(metadata.getSubject()).hasValue("foo"); + assertThat(metadata.getDataContentType()).hasValue("application/json"); + assertThat(metadata.getDataSchema()).hasValue(URI.create("http://schema.io")); + assertThat(metadata.getTimeStamp()).isNotEmpty(); + assertThat(metadata.getData().getString("name")).isEqualTo("neo"); + + // Extensions + assertThat(metadata.getKey()).isNull(); + // Rule 3.1 - partitionkey attribute + assertThat(metadata. getExtension("partitionkey")).isEmpty(); + assertThat(metadata.getTopic()).isEqualTo(topic); + + assertThat(message.getPayload()).isInstanceOf(JsonObject.class); + assertThat(((JsonObject) message.getPayload()).getString("name")).isEqualTo("neo"); + + } + @SuppressWarnings("unchecked") @Test public void testReceivingStructuredCloudEventsWithJsonObjectDeserializer() {