Skip to content

Commit

Permalink
[INLONG-11658][Sort] Fix the NPE of the Kafka sink error log for some…
Browse files Browse the repository at this point in the history
… exceptions without metadata information (#11659)

* [INLONG-11658][Sort] Fix the NPE of the Kafka sink error log for some exceptions without metadata information
  • Loading branch information
vernedeng authored Jan 10, 2025
1 parent 3442ece commit 9030cf3
Showing 1 changed file with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
Expand Down Expand Up @@ -62,6 +63,8 @@ public class KafkaProducerCluster implements LifecycleAware {

private KafkaProducer<String, byte[]> producer;

private long configuredMaxPayloadSize = 8388608L;

public KafkaProducerCluster(
String workerName,
CacheClusterConfig cacheClusterConfig,
Expand Down Expand Up @@ -125,6 +128,7 @@ private void startByNodeConfig() {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, nodeConfig.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, nodeConfig.getClientId() + "-" + workerName);
LOG.info("init kafka client by node config info: " + props);
configuredMaxPayloadSize = Long.parseLong(props.getProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG));
producer = new KafkaProducer<>(props, new StringSerializer(), new ByteArraySerializer());
Preconditions.checkNotNull(producer);
} catch (Exception e) {
Expand Down Expand Up @@ -217,14 +221,25 @@ public boolean send(ProfileEvent profileEvent, Transaction tx) throws IOExceptio
sinkContext.addSendResultMetric(profileEvent, topic, true, sendTime);
profileEvent.ack();
} else {
if (ex instanceof UnknownTopicOrPartitionException

if (ex instanceof RecordTooLargeException) {
// for the message bigger than configuredMaxPayloadSize, just discard it;
// otherwise, retry and wait for the server side changes the limitation
if (record.value().length > configuredMaxPayloadSize) {
tx.commit();
profileEvent.ack();
} else {
tx.rollback();
}
} else if (ex instanceof UnknownTopicOrPartitionException
|| !(ex instanceof RetriableException)) {
// for non-retriable exception, just discard it
tx.commit();
profileEvent.ack();
} else {
tx.rollback();
}
LOG.error(String.format("send failed, topic is %s, partition is %s",
metadata.topic(), metadata.partition()), ex);
LOG.error(String.format("send failed, topic is %s", topic), ex);
sinkContext.addSendResultMetric(profileEvent, topic, false, sendTime);
}
tx.close();
Expand Down

0 comments on commit 9030cf3

Please sign in to comment.