From ed9b33c367c0c4a4e6050e5ee05eda75a6b00083 Mon Sep 17 00:00:00 2001 From: Hao Geng Date: Wed, 16 Aug 2023 13:45:44 -0700 Subject: [PATCH] Add log if process fetch takes more than replicaLagTimeMaxMs (#474) LI_DESCRIPTION = LIKAFKA-54083 EXIT_CRITERIA = N/A Description In the recent investigation of produce latency, we found there are strong relationship between produce latency and truncateAndFetch latency. However, it's hard to find why the truncateAndFetch can reach that high, and which partitions are causing the high truncateAndFetch latency. In this commit, we passed through the replicaMaxLagMs config all the way to Fetcher thread, and log the fetch request if it takes longer than it. This could be very helpful for the produce latency investigation. --- .../kafka/server/AbstractAsyncFetcher.scala | 16 ++++++++++++++-- .../scala/kafka/server/AsyncReplicaFetcher.scala | 3 ++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractAsyncFetcher.scala b/core/src/main/scala/kafka/server/AbstractAsyncFetcher.scala index 25d092a738d8..83044fa49ace 100755 --- a/core/src/main/scala/kafka/server/AbstractAsyncFetcher.scala +++ b/core/src/main/scala/kafka/server/AbstractAsyncFetcher.scala @@ -88,7 +88,8 @@ abstract class AbstractAsyncFetcher(name: String, failedPartitions: FailedPartitions, fetchBackOffMs: Int = 0, val brokerTopicStats: BrokerTopicStats, //BrokerTopicStats's lifecycle managed by ReplicaManager - fetcherEventBus: FetcherEventBus) extends FetcherEventProcessor with Logging { + fetcherEventBus: FetcherEventBus, + replicaMaxLagMs: Long) extends FetcherEventProcessor with Logging { type FetchData = FetchResponseData.PartitionData type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition @@ -166,10 +167,21 @@ abstract class AbstractAsyncFetcher(name: String, trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request") true case Some(ReplicaFetch(sessionPartitions, fetchRequest)) => - processFetchRequest(sessionPartitions, fetchRequest) + processFetchRequestAndMaybeLog(sessionPartitions, fetchRequest) } } + private def processFetchRequestAndMaybeLog(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData], + fetchRequest: FetchRequest.Builder): Boolean = { + val startTime = System.currentTimeMillis() + val result = processFetchRequest(sessionPartitions, fetchRequest) + val processFetchRequestTotalTime = System.currentTimeMillis() - startTime + if (processFetchRequestTotalTime >= replicaMaxLagMs) { + warn(s"Process fetch request took $processFetchRequestTotalTime ms. Fetch request: $fetchRequest") + } + result + } + // deal with partitions with errors, potentially due to leadership changes private def handlePartitionsWithErrors(partitions: Iterable[TopicPartition], methodName: String): Unit = { if (partitions.nonEmpty) { diff --git a/core/src/main/scala/kafka/server/AsyncReplicaFetcher.scala b/core/src/main/scala/kafka/server/AsyncReplicaFetcher.scala index 07db194ea77d..1983d25a5886 100644 --- a/core/src/main/scala/kafka/server/AsyncReplicaFetcher.scala +++ b/core/src/main/scala/kafka/server/AsyncReplicaFetcher.scala @@ -66,7 +66,8 @@ class AsyncReplicaFetcher(name: String, failedPartitions, fetchBackOffMs = brokerConfig.replicaFetchBackoffMs, replicaMgr.brokerTopicStats, - fetcherEventBus = fetcherEventBus) { + fetcherEventBus = fetcherEventBus, + replicaMaxLagMs = brokerConfig.replicaLagTimeMaxMs) { private val replicaId = brokerConfig.brokerId private val logContext = new LogContext(s"[ReplicaFetcher replicaId=$replicaId, leaderId=${sourceBroker.id}, " +