Skip to content

Commit 55df049

Browse files
committed
CAMEL-17424: do handle Authorization/Authentication issues when failling to poll
1 parent cdeb997 commit 55df049

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
import org.apache.camel.component.kafka.KafkaFetchRecords;
2121
import org.apache.camel.component.kafka.PollExceptionStrategy;
2222
import org.apache.kafka.clients.consumer.Consumer;
23+
import org.apache.kafka.common.errors.AuthenticationException;
24+
import org.apache.kafka.common.errors.AuthorizationException;
2325
import org.slf4j.Logger;
2426
import org.slf4j.LoggerFactory;
2527

2628
public class BridgeErrorStrategy implements PollExceptionStrategy {
2729
private static final Logger LOG = LoggerFactory.getLogger(BridgeErrorStrategy.class);
2830
private final KafkaFetchRecords recordFetcher;
2931
private final Consumer<?, ?> consumer;
32+
private boolean continueFlag = true; // whether to continue polling or not
3033

3134
public BridgeErrorStrategy(KafkaFetchRecords recordFetcher, Consumer<?, ?> consumer) {
3235
this.recordFetcher = recordFetcher;
@@ -35,7 +38,7 @@ public BridgeErrorStrategy(KafkaFetchRecords recordFetcher, Consumer<?, ?> consu
3538

3639
@Override
3740
public boolean canContinue() {
38-
return true;
41+
return continueFlag;
3942
}
4043

4144
@Override
@@ -46,5 +49,9 @@ public void handle(long partitionLastOffset, Exception exception) {
4649
recordFetcher.getBridge().handleException(exception);
4750
// skip this poison message and seek to next message
4851
SeekUtil.seekToNextOffset(consumer, partitionLastOffset);
52+
53+
if (exception instanceof AuthenticationException || exception instanceof AuthorizationException) {
54+
continueFlag = false;
55+
}
4956
}
5057
}

0 commit comments

Comments
 (0)