Skip to content

Commit

Permalink
fix message loss between poll() and close() (#110)
Browse files Browse the repository at this point in the history
* fix message loss between poll() and close()

poll() keeps a cache of exceptions per partition and throws the exception in the subsequent poll().
poll() when it gets an exception it seeks past the offset that had the exception assuming next poll() will always be called which will throw the exception. This behavior can lead to message loss if the poll() is not called again and the offsets are commit (which have already moved forward by 1).

The fix stores the current offset and resume offset along with the exception. On exception,
it seeks to current offset (before exception) along with resume offset (current + 1).
On next poll(), it will seek past the offset which hit the exception while throwing the exception to the
user. If the user decides to ignore the exception the offset has already moved past the offset with
exception and normal processing will continue.
If the previous poll() hit an exception and the following poll() call will only throw the exception if any
unpaused partition had an exception. Otherwise, it'll clear the exceptions but will keep the offsets
unchanged.
On seek*(), it clear any record processing exceptions that are cached.
On exit, the user can safely commit offsets it consumed (excluding the offset that hit an exception)
without losing messages.
  • Loading branch information
abhishekmendhekar authored Mar 10, 2019
1 parent dba98bc commit 64c130b
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -973,8 +974,149 @@ public void testSearchOffsetByTimestamp() {
}
}

private LiKafkaConsumer<byte[], byte[]> createConsumerForExceptionProcessingTest() {
Properties props = new Properties();
// All the consumers should have the same group id.
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testCommit");
// Make sure we start to consume from the beginning.
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

return new LiKafkaConsumerImpl<>(getConsumerProperties(props),
new ByteArrayDeserializer(),
new Deserializer<byte[]>() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {

}

@Override
public byte[] deserialize(String topic, byte[] data) {
// Throw exception when deserializing
if (new String(data).startsWith(KafkaTestUtils.EXCEPTION_MESSAGE)) {
throw new SerializationException();
}
return data;
}

@Override
public void close() {

}
}, new DefaultSegmentDeserializer(), new NoOpAuditor<>());
}

private void testExceptionProcessingByFunction(String topic, LiKafkaConsumer<byte[], byte[]> consumer,
BiConsumer<LiKafkaConsumer<byte[], byte[]>, TopicPartition> testFunction) throws Exception {
try {
consumer.subscribe(Collections.singleton(topic));
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(1000));
}
assertEquals(records.count(), 4, "Only the first message should be returned");
assertEquals(records.iterator().next().offset(), 2L, "The offset of the first message should be 2.");
assertEquals(consumer.position(new TopicPartition(topic, 0)), 7L, "The position should be 7");

testFunction.accept(consumer, new TopicPartition(topic, 0));
} finally {
consumer.close();
}
}

@Test
public void testExceptionInProcessing() {
public void testExceptionHandlingAndProcessing() {
List<BiConsumer<LiKafkaConsumer<byte[], byte[]>, TopicPartition>> testFuncList = new ArrayList<>(6);
testFuncList.add((consumer, tp) -> {
try {
consumer.poll(Duration.ofMillis(1000));
fail("Should have thrown exception.");
} catch (ConsumerRecordsProcessingException crpe) {
// expected
}

assertEquals(consumer.position(tp), 8L, "The position should be 8");
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
while (records.isEmpty()) {
records = consumer.poll(1000);
}
assertEquals(records.count(), 1, "There should be four messages left.");
});

testFuncList.add((consumer, tp) -> {
consumer.pause(Collections.singleton(tp));
consumer.poll(Duration.ofMillis(1000));
consumer.resume(Collections.singleton(tp));
assertEquals(consumer.position(tp), 7L, "The position should be 7");

ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
try {
while (records.isEmpty()) {
records = consumer.poll(Duration.ofMillis(1000));
}
} catch (ConsumerRecordsProcessingException crpe) {
// expected
}
assertEquals(consumer.position(tp), 8L, "The position should be 8");
});

testFuncList.add((consumer, tp) -> {
consumer.seek(tp, 6);
try {
consumer.poll(Duration.ofMillis(1000));
} catch (Exception e) {
fail("Unexpected exception");
}
assertEquals(consumer.position(tp), 7L, "The position should be 7");
});

testFuncList.add((consumer, tp) -> {
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(4L)));
consumer.seekToCommitted(Collections.singleton(tp));
try {
consumer.poll(Duration.ofMillis(1000));
} catch (Exception e) {
fail("Unexpected exception");
}
assertEquals(consumer.position(tp), 7L, "The position should be 7");
});

testFuncList.add((consumer, tp) -> {
consumer.seekToBeginning(Collections.singleton(tp));
try {
consumer.poll(Duration.ofMillis(1000));
} catch (Exception e) {
fail("Unexpected exception");
}
assertEquals(consumer.position(tp), 7L, "The position should be 7");
});

testFuncList.add((consumer, tp) -> {
consumer.seekToEnd(Collections.singleton(tp));
try {
consumer.poll(Duration.ofMillis(1000));
} catch (Exception e) {
fail("Unexpected exception");
}
assertEquals(consumer.position(tp), 10L, "The position should be 10");
});

testFuncList.forEach(
testFunc -> {
String topic = UUID.randomUUID().toString();
produceSyntheticMessages(topic);
LiKafkaConsumer<byte[], byte[]> consumer = createConsumerForExceptionProcessingTest();
try {
testExceptionProcessingByFunction(topic, consumer, testFunc);
} catch (Exception e) {
fail("failed with unexpected exception");
}
}
);
}

@Test
public void testExceptionInProcessingLargeMessage() {
String topic = "testExceptionInProcessing";
produceSyntheticMessages(topic);
Properties props = new Properties();
Expand All @@ -984,7 +1126,7 @@ public void testExceptionInProcessing() {
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

LiKafkaConsumer<byte[], byte[]> consumer =
new LiKafkaConsumerImpl<byte[], byte[]>(getConsumerProperties(props),
new LiKafkaConsumerImpl<>(getConsumerProperties(props),
new ByteArrayDeserializer(),
new Deserializer<byte[]>() {
int numMessages = 0;
Expand Down Expand Up @@ -1016,7 +1158,7 @@ public void close() {
}
assertEquals(records.count(), 1, "Only the first message should be returned");
assertEquals(records.iterator().next().offset(), 2L, "The offset of the first message should be 2.");
assertEquals(consumer.position(new TopicPartition(topic, 0)), 5L, "The position should be 5");
assertEquals(consumer.position(new TopicPartition(topic, 0)), 4L, "The position should be 4");

try {
consumer.poll(1000);
Expand Down Expand Up @@ -1072,7 +1214,7 @@ public void testGiganticLargeMessages() throws Exception {
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Only fetch one record at a time.
props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
// No auto commmit
// No auto commit
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// Not enough memory to assemble anything
props.setProperty(LiKafkaConsumerConfig.MESSAGE_ASSEMBLER_BUFFER_CAPACITY_CONFIG, "" + (MAX_SEGMENT_SIZE + 1));
Expand Down Expand Up @@ -1145,7 +1287,7 @@ public void testExceptionOnLargeMsgDropped() throws Exception {
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Only fetch one record at a time.
props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
// No auto commmit
// No auto commit
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// Not enough memory to assemble anything
props.setProperty(LiKafkaConsumerConfig.MESSAGE_ASSEMBLER_BUFFER_CAPACITY_CONFIG, "" + (MAX_SEGMENT_SIZE + 1));
Expand Down Expand Up @@ -1518,8 +1660,7 @@ private void produceSyntheticMessages(String topic) {
List<ProducerRecord<byte[], byte[]>> m3Segs = splitter.split(topic, SYNTHETIC_PARTITION_0, messageId3, message3.getBytes());
// M4, 1 segment
UUID messageId4 = LiKafkaClientsUtils.randomUUID();
String message4 = KafkaTestUtils.getRandomString(MAX_SEGMENT_SIZE / 2);

String message4 = KafkaTestUtils.getExceptionString(MAX_SEGMENT_SIZE / 2);
List<ProducerRecord<byte[], byte[]>> m4Segs = splitter.split(topic, SYNTHETIC_PARTITION_0, messageId4, message4.getBytes());
// M5, 2 segments
UUID messageId5 = LiKafkaClientsUtils.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class KafkaTestUtils {
private final static AtomicBoolean SHUTDOWN_HOOK_INSTALLED = new AtomicBoolean(false);
private final static Thread SHUTDOWN_HOOK;
private final static List<File> FILES_TO_CLEAN_UP = Collections.synchronizedList(new ArrayList<>());
public final static String EXCEPTION_MESSAGE = "DESERIALIZATION_EXCEPTION_";

static {
SHUTDOWN_HOOK = new Thread(() -> {
Expand Down Expand Up @@ -120,11 +121,21 @@ public static void quietly(Task task) {
public static String getRandomString(int length) {
char[] chars = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
Random random = new Random();
StringBuilder stringBuiler = new StringBuilder();
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < length; i++) {
stringBuiler.append(chars[Math.abs(random.nextInt()) % 16]);
stringBuilder.append(chars[Math.abs(random.nextInt()) % 16]);
}
return stringBuiler.toString();
return stringBuilder.toString();
}

public static String getExceptionString(int length) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(EXCEPTION_MESSAGE);
for (int i = EXCEPTION_MESSAGE.length(); i < length; i++) {
stringBuilder.append('X');
}

return stringBuilder.toString();
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,18 +241,23 @@ public ConsumerRecords<K, V> poll(long timeout) {
long now = System.currentTimeMillis();
long deadline = now + timeout;
do {
if (_lastProcessedResult != null && _lastProcessedResult.exception() != null) {
ConsumerRecordsProcessingException e = _lastProcessedResult.exception();
_lastProcessedResult = null;
throw e;
ConsumerRecordsProcessingException crpe;

// throw exception to user if the current active (un-paused) topic-partitions has exceptions
Set<TopicPartition> unPausedTopicPartitions = new HashSet<>(_kafkaConsumer.assignment());
unPausedTopicPartitions.removeAll(_kafkaConsumer.paused());
crpe = handleRecordProcessingException(unPausedTopicPartitions);
if (crpe != null) {
throw crpe;
}

if (_autoCommitEnabled && now > _lastAutoCommitMs + _autoCommitInterval) {
commitAsync();
_lastAutoCommitMs = now;
}
ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
try {
rawRecords = _kafkaConsumer.poll(deadline - now);
rawRecords = _kafkaConsumer.poll(Duration.ofMillis(deadline - now));
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) {
handleInvalidOffsetException(oe);
}
Expand All @@ -262,11 +267,14 @@ public ConsumerRecords<K, V> poll(long timeout) {
// Clear the internal reference.
_lastProcessedResult.clearRecords();
// Rewind offset if there are processing exceptions.
if (_lastProcessedResult.exception() != null) {
for (Map.Entry<TopicPartition, Long> entry : _lastProcessedResult.resumeOffsets().entrySet()) {
TopicPartition tp = entry.getKey();
Long offset = entry.getValue();
_kafkaConsumer.seek(tp, offset);
seekToCurrentOffsetsOnRecordProcessingExceptions();

// this is an optimization
// if no records were processed try to throw exception in current poll()
if (processedRecords.isEmpty()) {
crpe = handleRecordProcessingException(null);
if (crpe != null) {
throw crpe;
}
}
now = System.currentTimeMillis();
Expand Down Expand Up @@ -344,6 +352,9 @@ private void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets,

@Override
public void seek(TopicPartition partition, long offset) {
// current offsets are being moved so don't throw cached exceptions in poll.
clearRecordProcessingException();

// The offset seeks is a complicated case, there are four situations to be handled differently.
// 1. Before the earliest consumed message. An OffsetNotTrackedException will be thrown in this case.
// 2. At or after the earliest consumed message but before the first delivered message. We will seek to the earliest
Expand Down Expand Up @@ -399,6 +410,9 @@ public void seek(TopicPartition partition, long offset) {

@Override
public void seekToBeginning(Collection<TopicPartition> partitions) {
// current offsets are being moved so don't throw cached exceptions in poll.
clearRecordProcessingException();

_kafkaConsumer.seekToBeginning(partitions);
for (TopicPartition tp : partitions) {
_consumerRecordsProcessor.clear(tp);
Expand All @@ -409,6 +423,9 @@ public void seekToBeginning(Collection<TopicPartition> partitions) {

@Override
public void seekToEnd(Collection<TopicPartition> partitions) {
// current offsets are being moved so don't throw cached exceptions in poll.
clearRecordProcessingException();

_kafkaConsumer.seekToEnd(partitions);
for (TopicPartition tp : partitions) {
_consumerRecordsProcessor.clear(tp);
Expand All @@ -420,6 +437,9 @@ public void seekToEnd(Collection<TopicPartition> partitions) {

@Override
public void seekToCommitted(Collection<TopicPartition> partitions) {
// current offsets are being moved so don't throw cached exceptions in poll.
clearRecordProcessingException();

for (TopicPartition tp : partitions) {
OffsetAndMetadata offsetAndMetadata = _kafkaConsumer.committed(tp);
if (offsetAndMetadata == null) {
Expand Down Expand Up @@ -461,6 +481,48 @@ private long positionMain(TopicPartition partition, Duration timeout) {
}
}

private ConsumerRecordsProcessingException handleRecordProcessingException(Collection<TopicPartition> topicPartitions) {
if (_lastProcessedResult == null || !_lastProcessedResult.hasException()) {
return null;
}

ConsumerRecordsProcessingException crpe = null;
if (topicPartitions == null || topicPartitions.isEmpty()) {
// seek past offset for all topic-partitions that hit an exception
_lastProcessedResult.offsets().forEach((tp, o) -> _kafkaConsumer.seek(tp, o.getResumeOffset()));
crpe = _lastProcessedResult.exception();
} else {
// seek past offset for topic-partition in the collection that hit an exception
if (_lastProcessedResult.hasError(topicPartitions)) {
Map<TopicPartition, ConsumerRecordsProcessResult<K, V>.OffsetPair> offsets = _lastProcessedResult.offsets();
topicPartitions.forEach(tp -> {
if (offsets.containsKey(tp)) {
_kafkaConsumer.seek(tp, offsets.get(tp).getResumeOffset());
}
});
crpe = _lastProcessedResult.exception(topicPartitions);
}

// if topic-partitions don't have an exception then just drop cached exceptions and move-on
}
_lastProcessedResult = null;
return crpe;
}

private void seekToCurrentOffsetsOnRecordProcessingExceptions() {
// seek to offset which had an exception
if (_lastProcessedResult != null && _lastProcessedResult.hasException()) {
_lastProcessedResult.offsets().forEach((k, v) -> _kafkaConsumer.seek(k, v.getCurrentOffset()));
}
}

private void clearRecordProcessingException() {
if (_lastProcessedResult != null && _lastProcessedResult.hasException()) {
LOG.warn("Clearing all Record Processing Exceptions", _lastProcessedResult.exception());
_lastProcessedResult = null;
}
}

/**
* We don't let the underlying open source consumer reset offsets so we need to do that here.
*/
Expand Down
Loading

0 comments on commit 64c130b

Please sign in to comment.