Skip to content

Commit

Permalink
Fix compiler warnings
Browse files Browse the repository at this point in the history
Related to #3019

* Add `@SuppressWarnings("serial")` to `KafkaTemplate.SkipAbortException`
* Move logic in the `FailedBatchProcessor.seekOrRecover()` out of `finally` block.
Essentially, swallow commit exception and follow with seeks

(cherry picked from commit 7b0cd0f)
  • Loading branch information
artembilan authored and spring-builds committed Mar 5, 2024
1 parent 276275b commit 47ba5da
Showing 1 changed file with 50 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@
* the listener throws a {@link BatchListenerFailedException}, the offsets prior to the
* failed record are committed and the remaining records have seeks performed. When the
* retries are exhausted, the failed record is sent to the recoverer instead of being
* included in the seeks. If other exceptions are thrown processing is delegated to the
* fallback handler.
* included in the seeks. If other exceptions are thrown, the fallback handler takes the processing.
*
* @author Gary Russell
* @author Francois Rosiere
* @author Wang Zhiyang
* @author Artem Bilan
* @since 2.8
*
*/
Expand All @@ -63,10 +63,10 @@ public abstract class FailedBatchProcessor extends FailedRecordProcessor {
* Construct an instance with the provided properties.
* @param recoverer the recoverer.
* @param backOff the back off.
* @param fallbackHandler the fall back handler.
* @param fallbackHandler the fallback handler.
*/
public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
CommonErrorHandler fallbackHandler) {
CommonErrorHandler fallbackHandler) {

this(recoverer, backOff, null, fallbackHandler);
}
Expand All @@ -76,11 +76,11 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
* @param recoverer the recoverer.
* @param backOff the back off.
* @param backOffHandler the {@link BackOffHandler}
* @param fallbackHandler the fall back handler.
* @param fallbackHandler the fallback handler.
* @since 2.9
*/
public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
@Nullable BackOffHandler backOffHandler, CommonErrorHandler fallbackHandler) {
@Nullable BackOffHandler backOffHandler, CommonErrorHandler fallbackHandler) {

super(recoverer, backOff, backOffHandler);
this.fallbackBatchHandler = fallbackHandler;
Expand All @@ -103,7 +103,7 @@ public void setLogLevel(Level logLevel) {
}

/**
* Set to false to not reclassify the exception if different from the previous
* Set to {@code false} to not reclassify the exception if different from the previous
* failure. If the changed exception is classified as retryable, the existing back off
* sequence is used; a new sequence is not started. Default true. Only applies when
* the fallback batch error handler (for exceptions other than
Expand Down Expand Up @@ -195,7 +195,7 @@ private void fallback(Exception thrownException, ConsumerRecords<?, ?> data, Con
this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener);
}

private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
private int findIndex(ConsumerRecords<?, ?> data, @Nullable ConsumerRecord<?, ?> record) {
if (record == null) {
return -1;
}
Expand Down Expand Up @@ -229,57 +229,60 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
remaining.add(datum);
}
}

try {
if (offsets.size() > 0) {
if (!offsets.isEmpty()) {
commit(consumer, container, offsets);
}
}
finally {
if (isSeekAfterError()) {
if (remaining.size() > 0) {
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
getFailureTracker(), this.logger, getLogLevel());
ConsumerRecord<?, ?> recovered = remaining.get(0);
commit(consumer, container,
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1)));
if (remaining.size() > 1) {
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
}
catch (Exception ex) {
// Ignore and follow with seek below
}

if (isSeekAfterError()) {
if (!remaining.isEmpty()) {
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
getFailureTracker(), this.logger, getLogLevel());
ConsumerRecord<?, ?> recovered = remaining.get(0);
commit(consumer, container,
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1)));
if (remaining.size() > 1) {
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
}
return ConsumerRecords.empty();
}
else {
if (remaining.size() > 0) {
try {
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
consumer)) {
remaining.remove(0);
}
}
catch (Exception e) {
if (SeekUtils.isBackoffException(thrownException)) {
this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0))
+ " included in remaining due to retry back off " + thrownException);
}
else {
this.logger.error(e, KafkaUtils.format(remaining.get(0))
+ " included in remaining due to " + thrownException);
}
return ConsumerRecords.empty();
}
else {
if (!remaining.isEmpty()) {
try {
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
consumer)) {
remaining.remove(0);
}
}
if (remaining.isEmpty()) {
return ConsumerRecords.empty();
catch (Exception e) {
if (SeekUtils.isBackoffException(thrownException)) {
this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0))
+ " included in remaining due to retry back off " + thrownException);
}
else {
this.logger.error(e, KafkaUtils.format(remaining.get(0))
+ " included in remaining due to " + thrownException);
}
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
tp -> new ArrayList<>()).add((ConsumerRecord<K, V>) rec));
return new ConsumerRecords<>(remains);
}
if (remaining.isEmpty()) {
return ConsumerRecords.empty();
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
tp -> new ArrayList<>()).add((ConsumerRecord<K, V>) rec));
return new ConsumerRecords<>(remains);
}
}

private void commit(Consumer<?, ?> consumer, MessageListenerContainer container,
private static void commit(Consumer<?, ?> consumer, MessageListenerContainer container,
Map<TopicPartition, OffsetAndMetadata> offsets) {

ContainerProperties properties = container.getContainerProperties();
Expand All @@ -296,7 +299,7 @@ private void commit(Consumer<?, ?> consumer, MessageListenerContainer container,
}

@Nullable
private BatchListenerFailedException getBatchListenerFailedException(Throwable throwableArg) {
private static BatchListenerFailedException getBatchListenerFailedException(@Nullable Throwable throwableArg) {
if (throwableArg == null || throwableArg instanceof BatchListenerFailedException) {
return (BatchListenerFailedException) throwableArg;
}
Expand Down

0 comments on commit 47ba5da

Please sign in to comment.