Skip to content

Commit

Permalink
DefaultErrorHandler#handleBatchAndReturnRemaining recovered invalid
Browse files Browse the repository at this point in the history
DefaultErrorHandler#handleBatchAndReturnRemaining recovered invalid and infinite loop when kafka listener threw BatchListenerFailedException and error record is last one
  • Loading branch information
Zhiyang.Wang1 committed Dec 22, 2023
1 parent b1e7623 commit 2740b07
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@

package org.springframework.kafka.listener;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -120,7 +118,7 @@ public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange)
@Override
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) {
notRetryable.forEach(ex -> handler.addNotRetryableExceptions(ex));
notRetryable.forEach(handler::addNotRetryableExceptions);
}
}

Expand Down Expand Up @@ -178,7 +176,6 @@ protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, Consume
else {
return String.format("Record not found in batch, index %d out of bounds (0, %d); "
+ "re-seeking batch", index, data.count() - 1);

}
});
fallback(thrownException, data, consumer, container, invokeListener);
Expand All @@ -201,11 +198,9 @@ private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
return -1;
}
int i = 0;
Iterator<?> iterator = data.iterator();
while (iterator.hasNext()) {
ConsumerRecord<?, ?> candidate = (ConsumerRecord<?, ?>) iterator.next();
if (candidate.topic().equals(record.topic()) && candidate.partition() == record.partition()
&& candidate.offset() == record.offset()) {
for (ConsumerRecord<?, ?> datum : data) {
if (datum.topic().equals(record.topic()) && datum.partition() == record.partition()
&& datum.offset() == record.offset()) {
break;
}
i++;
Expand All @@ -220,29 +215,25 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
if (data == null) {
return ConsumerRecords.empty();
}
Iterator<?> iterator = data.iterator();
List<ConsumerRecord<?, ?>> toCommit = new ArrayList<>();
List<ConsumerRecord<?, ?>> remaining = new ArrayList<>();
int index = indexArg;
while (iterator.hasNext()) {
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) iterator.next();
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<?, ?> datum : data) {
if (index-- > 0) {
toCommit.add(record);
offsets.compute(new TopicPartition(datum.topic(), datum.partition()),
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, datum.offset() + 1));
}
else {
remaining.add(record);
remaining.add(datum);
}
}
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()),
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
if (offsets.size() > 0) {
commit(consumer, container, offsets);
}
if (isSeekAfterError()) {
if (remaining.size() > 0) {
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
getFailureTracker()::recovered, this.logger, getLogLevel());
getFailureTracker(), this.logger, getLogLevel());
ConsumerRecord<?, ?> recovered = remaining.get(0);
commit(consumer, container,
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
Expand All @@ -254,35 +245,33 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
return ConsumerRecords.empty();
}
else {
if (indexArg == 0) {
return (ConsumerRecords<K, V>) data; // first record just rerun the whole thing
}
else {
try {
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
consumer)) {
remaining.remove(0);
}
}
catch (Exception e) {
try {
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
consumer)) {
remaining.remove(0);
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
tp -> new ArrayList<ConsumerRecord<K, V>>()).add((ConsumerRecord<K, V>) rec));
return new ConsumerRecords<>(remains);
}
catch (Exception e) {
}
if (remaining.isEmpty()) {
return ConsumerRecords.empty();
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
tp -> new ArrayList<>()).add((ConsumerRecord<K, V>) rec));
return new ConsumerRecords<>(remains);
}
}

private void commit(Consumer<?, ?> consumer, MessageListenerContainer container, Map<TopicPartition, OffsetAndMetadata> offsets) {
private void commit(Consumer<?, ?> consumer, MessageListenerContainer container,
Map<TopicPartition, OffsetAndMetadata> offsets) {

boolean syncCommits = container.getContainerProperties().isSyncCommits();
Duration timeout = container.getContainerProperties().getSyncCommitTimeout();
if (syncCommits) {
consumer.commitSync(offsets, timeout);
ContainerProperties properties = container.getContainerProperties();
if (properties.isSyncCommits()) {
consumer.commitSync(offsets, properties.getSyncCommitTimeout());
}
else {
OffsetCommitCallback commitCallback = container.getContainerProperties().getCommitCallback();
OffsetCommitCallback commitCallback = properties.getCommitCallback();
if (commitCallback == null) {
commitCallback = LOGGING_COMMIT_CALLBACK;
}
Expand All @@ -304,8 +293,8 @@ private BatchListenerFailedException getBatchListenerFailedException(Throwable t
throwable = throwable.getCause();
checked.add(throwable);

if (throwable instanceof BatchListenerFailedException) {
target = (BatchListenerFailedException) throwable;
if (throwable instanceof BatchListenerFailedException batchListenerFailedException) {
target = batchListenerFailedException;
break;
}
}
Expand Down
Loading

0 comments on commit 2740b07

Please sign in to comment.