Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Perform cloud event deserialization in deserializer wrapper in order to handle failures #2502

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,18 @@ public IncomingKafkaRecord(ConsumerRecord<K, T> record,
boolean payloadSet = false;
if (cloudEventEnabled) {
// Cloud Event detection
KafkaCloudEventHelper.CloudEventMode mode = KafkaCloudEventHelper.getCloudEventMode(record);
KafkaCloudEventHelper.CloudEventMode mode = KafkaCloudEventHelper.getCloudEventMode(record.headers());
switch (mode) {
case NOT_A_CLOUD_EVENT:
break;
case STRUCTURED:
CloudEventMetadata<T> event = KafkaCloudEventHelper
.createFromStructuredCloudEvent(record);
meta.add(event);
payloadSet = true;
payload = event.getData();
if (record.value() != null) {
CloudEventMetadata<T> event = KafkaCloudEventHelper
.createFromStructuredCloudEvent(record);
meta.add(event);
payloadSet = true;
payload = event.getData();
}
break;
case BINARY:
meta.add(KafkaCloudEventHelper.createFromBinaryCloudEvent(record));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper;

/**
* Wraps a delegate deserializer to handle config and deserialization failures.
Expand All @@ -31,14 +32,16 @@ public class DeserializerWrapper<T> implements Deserializer<T> {
private final BiConsumer<Throwable, Boolean> reportFailure;

private final boolean failOnDeserializationErrorWithoutHandler;
private final boolean cloudEventsEnabled;

public DeserializerWrapper(String className, boolean key, DeserializationFailureHandler<T> failureHandler,
BiConsumer<Throwable, Boolean> reportFailure, boolean failByDefault) {
BiConsumer<Throwable, Boolean> reportFailure, boolean failByDefault, boolean cloudEventsEnabled) {
this.delegate = createDelegateDeserializer(className);
this.handleKeys = key;
this.deserializationFailureHandler = failureHandler;
this.reportFailure = reportFailure;
this.failOnDeserializationErrorWithoutHandler = failByDefault;
this.cloudEventsEnabled = cloudEventsEnabled;
}

/**
Expand Down Expand Up @@ -69,12 +72,13 @@ private Deserializer<T> createDelegateDeserializer(String clazz) {

@Override
public T deserialize(String topic, byte[] data) {
return wrapDeserialize(() -> this.delegate.deserialize(topic, data), topic, null, data);
return wrapDeserialize(() -> handleCloudEvents(this.delegate.deserialize(topic, data), null), topic, null, data);
}

@Override
public T deserialize(String topic, Headers headers, byte[] data) {
return wrapDeserialize(() -> this.delegate.deserialize(topic, headers, data), topic, headers, data);
return wrapDeserialize(() -> handleCloudEvents(this.delegate.deserialize(topic, headers, data), headers), topic,
headers, data);
}

/**
Expand Down Expand Up @@ -124,6 +128,22 @@ private T wrapDeserialize(Supplier<T> deserialize, String topic, Headers headers
}
}

private T handleCloudEvents(T payload, Headers headers) {
if (cloudEventsEnabled && !handleKeys && headers != null) {
KafkaCloudEventHelper.CloudEventMode mode = KafkaCloudEventHelper.getCloudEventMode(headers);
switch (mode) {
case STRUCTURED:
//noinspection unchecked
return (T) KafkaCloudEventHelper.parseStructuredContent(payload);
case BINARY:
return KafkaCloudEventHelper.checkBinaryRecord(payload, headers);
case NOT_A_CLOUD_EVENT:
return payload;
}
}
return payload;
}

@Override
public void close() {
// Be a bit more defensive here as close can be called after an instantiation failure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.smallrye.reactive.messaging.kafka.*;
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.health.KafkaSourceHealth;
Expand Down Expand Up @@ -166,9 +165,7 @@ public KafkaSource(Vertx vertx,
Multi<IncomingKafkaRecord<K, V>> incomingMulti = multi.onItem().transformToUni(rec -> {
IncomingKafkaRecord<K, V> record = new IncomingKafkaRecord<>(rec, channel, index, commitHandler,
failureHandler, isCloudEventEnabled, isTracingEnabled);
if ((failureHandler instanceof KafkaDeadLetterQueue)
&& rec.headers() != null
&& rec.headers().lastHeader(DESERIALIZATION_FAILURE_DLQ) != null) {
if (rec.headers() != null && rec.headers().lastHeader(DESERIALIZATION_FAILURE_DLQ) != null) {
Header reasonMsgHeader = rec.headers().lastHeader(DESERIALIZATION_FAILURE_REASON);
String message = reasonMsgHeader != null ? new String(reasonMsgHeader.value()) : null;
RecordDeserializationException reason = new RecordDeserializationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public ReactiveKafkaConsumer(KafkaConnectorIncomingConfiguration config,
config.getLazyClient(),
config.getPollTimeout(),
config.getFailOnDeserializationFailure(),
config.getCloudEvents(),
onConsumerCreated,
reportFailure,
context);
Expand All @@ -84,6 +85,7 @@ public ReactiveKafkaConsumer(Map<String, Object> kafkaConfiguration,
boolean lazyClient,
int pollTimeout,
boolean failOnDeserializationFailure,
boolean cloudEventsEnabled,
java.util.function.Consumer<Consumer<K, V>> onConsumerCreated,
BiConsumer<Throwable, Boolean> reportFailure,
Context context) {
Expand All @@ -99,9 +101,9 @@ public ReactiveKafkaConsumer(Map<String, Object> kafkaConfiguration,
}

Deserializer<K> keyDeserializer = new DeserializerWrapper<>(keyDeserializerCN, true,
keyDeserializationFailureHandler, reportFailure, failOnDeserializationFailure);
keyDeserializationFailureHandler, reportFailure, failOnDeserializationFailure, cloudEventsEnabled);
Deserializer<V> valueDeserializer = new DeserializerWrapper<>(valueDeserializerCN, false,
valueDeserializationFailureHandler, reportFailure, failOnDeserializationFailure);
valueDeserializationFailureHandler, reportFailure, failOnDeserializationFailure, cloudEventsEnabled);

// Configure the underlying deserializers
keyDeserializer.configure(kafkaConfiguration, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,7 @@ public static <T, K> IncomingKafkaCloudEventMetadata<K, T> createFromStructuredC
ConsumerRecord<K, T> record) {
DefaultCloudEventMetadataBuilder<T> builder = new DefaultCloudEventMetadataBuilder<>();

JsonObject content;
if (record.value() instanceof JsonObject) {
content = (JsonObject) record.value();
} else if (record.value() instanceof String) {
content = new JsonObject((String) record.value());
} else if (record.value() instanceof byte[]) {
byte[] bytes = (byte[]) record.value();
Buffer buffer = Buffer.buffer(bytes);
content = buffer.toJsonObject();
} else {
throw new IllegalArgumentException(
"Invalid value type. Structured Cloud Event can only be created from String, JsonObject and byte[], found: "
+ record.value().getClass());
}
JsonObject content = (JsonObject) record.value();

// Required
builder.withSpecVersion(content.getString(CloudEventMetadata.CE_ATTRIBUTE_SPEC_VERSION));
Expand Down Expand Up @@ -134,6 +121,30 @@ public static <T, K> IncomingKafkaCloudEventMetadata<K, T> createFromStructuredC
new DefaultIncomingCloudEventMetadata<>(cloudEventMetadata));
}

public static JsonObject parseStructuredContent(Object value) {
JsonObject content;
if (value instanceof JsonObject) {
content = (JsonObject) value;
} else if (value instanceof String) {
content = new JsonObject((String) value);
} else if (value instanceof byte[]) {
byte[] bytes = (byte[]) value;
Buffer buffer = Buffer.buffer(bytes);
content = buffer.toJsonObject();
} else {
throw new IllegalArgumentException(
"Invalid value type. Structured Cloud Event can only be created from String, JsonObject and byte[], found: "
+ Optional.ofNullable(value).map(Object::getClass).orElse(null));
}
// validate required source attribute
String source = content.getString(CloudEventMetadata.CE_ATTRIBUTE_SOURCE);
if (source == null) {
throw new IllegalArgumentException(
"The JSON value must contain the " + CloudEventMetadata.CE_ATTRIBUTE_SOURCE + " attribute");
}
return content;
}

public static <T, K> IncomingKafkaCloudEventMetadata<K, T> createFromBinaryCloudEvent(
ConsumerRecord<?, T> record) {
DefaultCloudEventMetadataBuilder<T> builder = new DefaultCloudEventMetadataBuilder<>();
Expand Down Expand Up @@ -206,6 +217,15 @@ public static <T, K> IncomingKafkaCloudEventMetadata<K, T> createFromBinaryCloud
new DefaultIncomingCloudEventMetadata<>(cloudEventMetadata));
}

public static <T> T checkBinaryRecord(T payload, Headers headers) {
Header sourceHeader = headers.lastHeader(KAFKA_HEADER_FOR_SOURCE);
if (sourceHeader == null) {
throw new IllegalArgumentException(
"The Kafka record must contain the " + KAFKA_HEADER_FOR_SOURCE + " header");
}
return payload;
}

@SuppressWarnings("rawtypes")
public static ProducerRecord<?, ?> createBinaryRecord(Message<?> message, String topic,
OutgoingKafkaRecordMetadata<?> outgoingMetadata, IncomingKafkaRecordMetadata<?, ?> incomingMetadata,
Expand Down Expand Up @@ -438,32 +458,30 @@ public enum CloudEventMode {
NOT_A_CLOUD_EVENT
}

public static CloudEventMode getCloudEventMode(ConsumerRecord<?, ?> record) {
String contentType = getHeader(KAFKA_HEADER_CONTENT_TYPE, record);
public static CloudEventMode getCloudEventMode(Headers headers) {
String contentType = getHeader(KAFKA_HEADER_CONTENT_TYPE, headers);
if (contentType != null && contentType.startsWith(CE_CONTENT_TYPE_PREFIX)) {
return CloudEventMode.STRUCTURED;
} else if (containsAllMandatoryAttributes(record)) {
} else if (containsAllMandatoryAttributes(headers)) {
return CloudEventMode.BINARY;
}
return CloudEventMode.NOT_A_CLOUD_EVENT;
}

private static boolean containsAllMandatoryAttributes(ConsumerRecord<?, ?> record) {
return getHeader(KAFKA_HEADER_FOR_ID, record) != null
&& getHeader(KAFKA_HEADER_FOR_SOURCE, record) != null
&& getHeader(KAFKA_HEADER_FOR_TYPE, record) != null
&& getHeader(KAFKA_HEADER_FOR_SPEC_VERSION, record) != null;
private static boolean containsAllMandatoryAttributes(Headers headers) {
return getHeader(KAFKA_HEADER_FOR_ID, headers) != null
&& getHeader(KAFKA_HEADER_FOR_SOURCE, headers) != null
&& getHeader(KAFKA_HEADER_FOR_TYPE, headers) != null
&& getHeader(KAFKA_HEADER_FOR_SPEC_VERSION, headers) != null;
}

private static String getHeader(String name, ConsumerRecord<?, ?> record) {
Headers headers = record.headers();
private static String getHeader(String name, Headers headers) {
for (Header header : headers) {
if (header.key().equals(name)) {
return new String(header.value(), StandardCharsets.UTF_8);
}
}
return null;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,69 @@ public void testReceivingStructuredCloudEventsWithStringDeserializer() {

}

@SuppressWarnings("unchecked")
@Test
public void testReceivingStructuredCloudEventsWithNonValidJson() {
KafkaMapBasedConfig config = newCommonConfig();
config.put("topic", topic);
config.put("value.deserializer", StringDeserializer.class.getName());
config.put("failure-strategy", "ignore");
config.put("fail-on-deserialization-failure", false);
config.put("channel-name", topic);
KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(config);
source = new KafkaSource<>(vertx, UUID.randomUUID().toString(), ic, commitHandlerFactories, failureHandlerFactories,
UnsatisfiedInstance.instance(), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(), -1);

List<Message<?>> messages = new ArrayList<>();
source.getStream().subscribe().with(messages::add);

companion.produceStrings().fromRecords(
new ProducerRecord<>(topic, null, null, null, "{\"type\":",
Collections.singletonList(
new RecordHeader("content-type",
"application/cloudevents+json; charset=utf-8".getBytes()))),
new ProducerRecord<>(topic, null, null, null, new JsonObject()
.put("specversion", CloudEventMetadata.CE_VERSION_1_0)
.put("type", "type")
.put("id", "id")
.put("source", "test://test")
.put("subject", "foo")
.put("datacontenttype", "application/json")
.put("dataschema", "http://schema.io")
.put("time", "2020-07-23T09:12:34Z")
.put("data", new JsonObject().put("name", "neo")).encode(),
Collections.singletonList(
new RecordHeader("content-type",
"application/cloudevents+json; charset=utf-8".getBytes()))));

await().atMost(2, TimeUnit.MINUTES).until(() -> messages.size() >= 1);

Message<?> message = messages.get(0);
IncomingKafkaCloudEventMetadata<String, JsonObject> metadata = message
.getMetadata(IncomingKafkaCloudEventMetadata.class)
.orElse(null);
assertThat(metadata).isNotNull();
assertThat(metadata.getSpecVersion()).isEqualTo(CloudEventMetadata.CE_VERSION_1_0);
assertThat(metadata.getType()).isEqualTo("type");
assertThat(metadata.getId()).isEqualTo("id");
assertThat(metadata.getSource()).isEqualTo(URI.create("test://test"));
assertThat(metadata.getSubject()).hasValue("foo");
assertThat(metadata.getDataContentType()).hasValue("application/json");
assertThat(metadata.getDataSchema()).hasValue(URI.create("http://schema.io"));
assertThat(metadata.getTimeStamp()).isNotEmpty();
assertThat(metadata.getData().getString("name")).isEqualTo("neo");

// Extensions
assertThat(metadata.getKey()).isNull();
// Rule 3.1 - partitionkey attribute
assertThat(metadata.<String> getExtension("partitionkey")).isEmpty();
assertThat(metadata.getTopic()).isEqualTo(topic);

assertThat(message.getPayload()).isInstanceOf(JsonObject.class);
assertThat(((JsonObject) message.getPayload()).getString("name")).isEqualTo("neo");

}

@SuppressWarnings("unchecked")
@Test
public void testReceivingStructuredCloudEventsWithJsonObjectDeserializer() {
Expand Down
Loading