diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 21486978f829..1445daa240a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -240,7 +240,7 @@ public class KafkaProducer implements Producer { private static final String JMX_PREFIX = "kafka.producer"; public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread"; public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics"; - private static Sensor msgSendLatencySensor = null; + private final Sensor msgSendLatencySensor; private final String clientId; // Visible for testing @@ -982,7 +982,7 @@ private Future doSend(ProducerRecord record, Callback call log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); } // producer callback will make sure to call both 'callback' and interceptor callback - Callback interceptCallback = new InterceptorCallback<>(callback, startTime, time, this.interceptors, tp); + Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp, startTime, time, this.msgSendLatencySensor); if (transactionManager != null && transactionManager.isTransactional()) { transactionManager.failIfNotReadyForSend(); @@ -999,7 +999,7 @@ private Future doSend(ProducerRecord record, Callback call log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback - interceptCallback = new InterceptorCallback<>(callback, startTime, time, this.interceptors, tp); + interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp, startTime, time, this.msgSendLatencySensor); result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); @@ -1366,9 +1366,6 @@ private static class ClusterAndWaitTime { } } - public static void recordMsgProducingLatency(long start, long now) { - msgSendLatencySensor.record(now - start, now); - } private static class FutureFailure implements Future { @@ -1416,16 +1413,20 @@ private static class InterceptorCallback implements Callback { private final Time time; private final long startTime; - private InterceptorCallback(Callback userCallback, ProducerInterceptors interceptors, TopicPartition tp) { - this(userCallback, Long.MIN_VALUE, null, interceptors, tp); - } + private final Sensor msgSendLatencySensor; + - private InterceptorCallback(Callback userCallback, long startTime, Time time, ProducerInterceptors interceptors, TopicPartition tp) { + private InterceptorCallback(Callback userCallback, ProducerInterceptors interceptors, TopicPartition tp, long startTime, Time time, Sensor msgSendLatencySensor) { this.userCallback = userCallback; this.interceptors = interceptors; this.tp = tp; this.time = time; this.startTime = startTime; + this.msgSendLatencySensor = msgSendLatencySensor; + } + + public void recordMsgProducingLatency(long start, long now) { + msgSendLatencySensor.record(now - start, now); } public void onCompletion(RecordMetadata metadata, Exception exception) {