Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply DLQ strategy on deserialization failure #2256

Merged
merged 1 commit into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions documentation/src/main/docs/kafka/receiving-kafka-records.md
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,20 @@ produce a `null` value. To enable this behavior, set the
`mp.messaging.incoming.$channel.fail-on-deserialization-failure`
attribute to `false`.

If the `fail-on-deserialization-failure` attribute is set to `false` and
the `failure-strategy` attribute is `dead-letter-queue` the failed record
will be sent to the corresponding *dead letter queue* topic.
The forwarded record will have the original key and value,
and the following headers set:

- `deserialization-failure-reason`: The deserialization failure message
- `deserialization-failure-cause`: The deserialization failure cause if any
- `deserialization-failure-key`: Whether the deserialization failure happened on a key
- `deserialization-failure-topic`: The topic of the incoming message when a deserialization failure happen
- `deserialization-failure-deserializer`: The class name of the underlying deserializer
- `deserialization-failure-key-data`: If applicable the key data that was not able to be deserialized
- `deserialization-failure-value-data`: If applicable the value data that was not able to be deserialized

## Receiving Cloud Events

The Kafka connector supports [Cloud Events](https://cloudevents.io/).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,26 @@ public interface DeserializationFailureHandler<T> {
*/
String DESERIALIZATION_FAILURE_DATA = "deserialization-failure-data";

/**
* Header name passing the key data that was not able to be deserialized.
*/
String DESERIALIZATION_FAILURE_KEY_DATA = "deserialization-failure-key-data";

/**
* Header name passing the value data that was not able to be deserialized.
*/
String DESERIALIZATION_FAILURE_VALUE_DATA = "deserialization-failure-value-data";

/**
* Header name passing the class name of the underlying deserializer.
*/
String DESERIALIZATION_FAILURE_DESERIALIZER = "deserialization-failure-deserializer";

/**
* Header name passing the class name of the underlying deserializer.
*/
String DESERIALIZATION_FAILURE_DLQ = "deserialization-failure-dlq";

byte[] TRUE_VALUE = "true".getBytes(StandardCharsets.UTF_8);

/**
Expand Down Expand Up @@ -111,6 +126,7 @@ static Headers addFailureDetailsToHeaders(String deserializer, String topic, boo

if (headers != null) {
headers.add(DESERIALIZATION_FAILURE_DESERIALIZER, deserializer.getBytes(StandardCharsets.UTF_8));
headers.add(DESERIALIZATION_FAILURE_DLQ, TRUE_VALUE);
headers.add(DESERIALIZATION_FAILURE_TOPIC, topic.getBytes(StandardCharsets.UTF_8));

if (isKey) {
Expand All @@ -124,6 +140,12 @@ static Headers addFailureDetailsToHeaders(String deserializer, String topic, boo
headers.add(DESERIALIZATION_FAILURE_CAUSE, cause.getBytes(StandardCharsets.UTF_8));
}
if (data != null) {
if (isKey) {
headers.add(DESERIALIZATION_FAILURE_KEY_DATA, data);
} else {
headers.add(DESERIALIZATION_FAILURE_VALUE_DATA, data);
}
// Do not break retro-compatibility
headers.add(DESERIALIZATION_FAILURE_DATA, data);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.smallrye.reactive.messaging.kafka.fault;

import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.addFailureDetailsToHeaders;

import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -114,6 +116,9 @@ private T wrapDeserialize(Supplier<T> deserialize, String topic, Headers headers
}
throw new KafkaException(e);
}
// insert failure details to headers
addFailureDetailsToHeaders(delegate.getClass().getName(), topic, handleKeys, headers, data, e);
// fallback to null
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.smallrye.reactive.messaging.kafka.fault;

import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ;
import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG;
Expand Down Expand Up @@ -91,9 +92,10 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
(String) deadQueueProducerConfig.get(KEY_SERIALIZER_CLASS_CONFIG),
(String) deadQueueProducerConfig.get(VALUE_SERIALIZER_CLASS_CONFIG));

var dlqSerializationHandler = new KafkaDeadLetterSerializationHandler<>();
// fire producer event (e.g. bind metrics)
ReactiveKafkaProducer<Object, Object> producer = new ReactiveKafkaProducer<>(deadQueueProducerConfig,
deadQueueTopic, 10000, false, null, null, null,
deadQueueTopic, 10000, false, null, dlqSerializationHandler, dlqSerializationHandler,
(p, c) -> kafkaCDIEvents.producer().fire(p));

return new KafkaDeadLetterQueue(config.getChannel(), deadQueueTopic, producer, reportFailure);
Expand Down Expand Up @@ -153,6 +155,8 @@ public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reaso
if (outgoing != null && outgoing.getHeaders() != null) {
outgoing.getHeaders().forEach(header -> dead.headers().add(header));
}
// remove DESERIALIZATION_FAILURE_DLQ header to prevent unconditional DQL in next consume
dead.headers().remove(DESERIALIZATION_FAILURE_DLQ);
log.messageNackedDeadLetter(channel, topic);
return producer.send(dead)
.onFailure().invoke(t -> reportFailure.accept((Throwable) t, true))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.smallrye.reactive.messaging.kafka.fault;

import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DATA;
import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DESERIALIZER;
import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_IS_KEY;
import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_KEY_DATA;
import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_VALUE_DATA;
import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.TRUE_VALUE;

import java.util.Arrays;
import java.util.Objects;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler;

public class KafkaDeadLetterSerializationHandler<T> implements SerializationFailureHandler<T> {

@Override
public byte[] decorateSerialization(Uni<byte[]> serialization, String topic, boolean isKey, String serializer, T data,
Headers headers) {
// deserializer failure
if (headers.lastHeader(DESERIALIZATION_FAILURE_DESERIALIZER) != null) {
// data exists
Header dataHeader = headers.lastHeader(DESERIALIZATION_FAILURE_DATA);
if (dataHeader != null) {
// if this is the key serialization we look at the _KEY_DATA header
if (isKey) {
Header isKeyHeader = headers.lastHeader(DESERIALIZATION_FAILURE_IS_KEY);
if (isKeyHeader != null && Arrays.equals(isKeyHeader.value(), TRUE_VALUE)) {
Header keyDataHeader = headers.lastHeader(DESERIALIZATION_FAILURE_KEY_DATA);
// fallback to data header
return Objects.requireNonNullElse(keyDataHeader, dataHeader).value();
}
// if this is the value serialization we look at the _VALUE_DATA header
} else {
Header valueDataHeader = headers.lastHeader(DESERIALIZATION_FAILURE_VALUE_DATA);
// fallback to data header
return Objects.requireNonNullElse(valueDataHeader, dataHeader).value();
}
}
}
// call serialization
return SerializationFailureHandler.super.decorateSerialization(serialization, topic, isKey, serializer, data, headers);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.smallrye.reactive.messaging.kafka.fault;

import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ;
import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log;
import static io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.createDeserializationFailureHandler;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
Expand Down Expand Up @@ -118,9 +119,10 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,

log.delayedRetryTopic(config.getChannel(), retryTopics, maxRetries, retryTimeout, deadQueueTopic);

var dlqSerializationHandler = new KafkaDeadLetterSerializationHandler<>();
// fire producer event (e.g. bind metrics)
ReactiveKafkaProducer<Object, Object> producer = new ReactiveKafkaProducer<>(delayedRetryTopicProducerConfig,
retryTopics.get(0), 10000, false, null, null, null,
retryTopics.get(0), 10000, false, null, dlqSerializationHandler, dlqSerializationHandler,
(p, c) -> kafkaCDIEvents.producer().fire(p));

Map<String, Object> retryConsumerConfig = new HashMap<>(consumer.configuration());
Expand Down Expand Up @@ -269,6 +271,8 @@ public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reaso
if (outgoing != null && outgoing.getHeaders() != null) {
outgoing.getHeaders().forEach(header -> retry.headers().add(header));
}
// remove DESERIALIZATION_FAILURE_DLQ header to prevent unconditional DQL in next consume
retry.headers().remove(DESERIALIZATION_FAILURE_DLQ);
log.delayedRetryNack(channel, topic);
return producer.send(retry)
.onFailure().invoke(t -> reportFailure.accept((Throwable) t, true))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.smallrye.reactive.messaging.kafka.impl;

import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ;
import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_REASON;
import static io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions.ex;
import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log;
import static io.smallrye.reactive.messaging.kafka.impl.RebalanceListeners.findMatchingListener;
Expand All @@ -20,6 +22,8 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.header.Header;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Multi;
Expand All @@ -34,6 +38,7 @@
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.health.KafkaSourceHealth;
Expand Down Expand Up @@ -161,6 +166,16 @@ public KafkaSource(Vertx vertx,
Multi<IncomingKafkaRecord<K, V>> incomingMulti = multi.onItem().transformToUni(rec -> {
IncomingKafkaRecord<K, V> record = new IncomingKafkaRecord<>(rec, channel, index, commitHandler,
failureHandler, isCloudEventEnabled, isTracingEnabled);
if ((failureHandler instanceof KafkaDeadLetterQueue)
&& rec.headers() != null
&& rec.headers().lastHeader(DESERIALIZATION_FAILURE_DLQ) != null) {
Header reasonMsgHeader = rec.headers().lastHeader(DESERIALIZATION_FAILURE_REASON);
String message = reasonMsgHeader != null ? new String(reasonMsgHeader.value()) : null;
RecordDeserializationException reason = new RecordDeserializationException(
TopicPartitions.getTopicPartition(record), record.getOffset(), message, null);
return failureHandler.handle(record, reason, record.getMetadata())
.onItem().transform(ignore -> null);
}
return commitHandler.received(record);
}).concatenate();

Expand Down
Loading
Loading