Skip to content

Commit

Permalink
[INLONG-11076][Sort] Discard unretryable data when send kafka failed (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng authored Sep 10, 2024
1 parent 6099a13 commit 033b1bf
Showing 1 changed file with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;

import com.google.common.base.Preconditions;
Expand All @@ -31,16 +30,21 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.datanucleus.util.StringUtils;
import org.slf4j.Logger;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Properties;

/** wrapper of kafka producer */
/**
* wrapper of kafka producer
*/
public class KafkaProducerCluster implements LifecycleAware {

public static final Logger LOG = InlongLoggerFactory.getLogger(KafkaProducerCluster.class);
Expand Down Expand Up @@ -71,7 +75,9 @@ public KafkaProducerCluster(
this.handler = sinkContext.createEventHandler();
}

/** start and init kafka producer */
/**
* start and init kafka producer
*/
@Override
public void start() {
if (CommonPropertiesHolder.useUnifiedConfiguration()) {
Expand Down Expand Up @@ -159,7 +165,9 @@ public Properties defaultKafkaProperties() {
return props;
}

/** stop and close kafka producer */
/**
* stop and close kafka producer
*/
@Override
public void stop() {
this.state = LifecycleState.STOP;
Expand All @@ -185,16 +193,16 @@ public LifecycleState getLifecycleState() {
/**
* Send data
*
* @param profileEvent data to send
* @return boolean
* @param profileEvent data to send
* @return boolean
* @throws IOException
*/
public boolean send(ProfileEvent profileEvent, Transaction tx) throws IOException {
String topic = profileEvent.getHeaders().get(Constants.TOPIC);
String topic = sinkContext.getTopic(profileEvent.getUid());
ProducerRecord<String, byte[]> record = handler.parse(sinkContext, profileEvent);
long sendTime = System.currentTimeMillis();
// check
if (record == null) {
if (record == null || StringUtils.isEmpty(topic)) {
tx.commit();
profileEvent.ack();
tx.close();
Expand All @@ -209,9 +217,14 @@ public boolean send(ProfileEvent profileEvent, Transaction tx) throws IOExceptio
sinkContext.addSendResultMetric(profileEvent, topic, true, sendTime);
profileEvent.ack();
} else {
if (ex instanceof UnknownTopicOrPartitionException
|| !(ex instanceof RetriableException)) {
tx.commit();
} else {
tx.rollback();
}
LOG.error(String.format("send failed, topic is %s, partition is %s",
metadata.topic(), metadata.partition()), ex);
tx.rollback();
sinkContext.addSendResultMetric(profileEvent, topic, false, sendTime);
}
tx.close();
Expand Down

0 comments on commit 033b1bf

Please sign in to comment.