Skip to content

Commit

Permalink
Add log if process fetch takes more than replicaLagTimeMaxMs (#474)
Browse files Browse the repository at this point in the history
LI_DESCRIPTION = LIKAFKA-54083
EXIT_CRITERIA = N/A

Description

In the recent investigation of produce latency, we found there are strong relationship between produce latency and truncateAndFetch latency. However, it's hard to find why the truncateAndFetch can reach that high, and which partitions are causing the high truncateAndFetch latency.

In this commit, we passed through the replicaMaxLagMs config all the way to Fetcher thread, and log the fetch request if it takes longer than it. This could be very helpful for the produce latency investigation.
  • Loading branch information
CCisGG authored Aug 16, 2023
1 parent 73cdd9b commit ed9b33c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
16 changes: 14 additions & 2 deletions core/src/main/scala/kafka/server/AbstractAsyncFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ abstract class AbstractAsyncFetcher(name: String,
failedPartitions: FailedPartitions,
fetchBackOffMs: Int = 0,
val brokerTopicStats: BrokerTopicStats, //BrokerTopicStats's lifecycle managed by ReplicaManager
fetcherEventBus: FetcherEventBus) extends FetcherEventProcessor with Logging {
fetcherEventBus: FetcherEventBus,
replicaMaxLagMs: Long) extends FetcherEventProcessor with Logging {

type FetchData = FetchResponseData.PartitionData
type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
Expand Down Expand Up @@ -166,10 +167,21 @@ abstract class AbstractAsyncFetcher(name: String,
trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
true
case Some(ReplicaFetch(sessionPartitions, fetchRequest)) =>
processFetchRequest(sessionPartitions, fetchRequest)
processFetchRequestAndMaybeLog(sessionPartitions, fetchRequest)
}
}

private def processFetchRequestAndMaybeLog(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
fetchRequest: FetchRequest.Builder): Boolean = {
val startTime = System.currentTimeMillis()
val result = processFetchRequest(sessionPartitions, fetchRequest)
val processFetchRequestTotalTime = System.currentTimeMillis() - startTime
if (processFetchRequestTotalTime >= replicaMaxLagMs) {
warn(s"Process fetch request took $processFetchRequestTotalTime ms. Fetch request: $fetchRequest")
}
result
}

// deal with partitions with errors, potentially due to leadership changes
private def handlePartitionsWithErrors(partitions: Iterable[TopicPartition], methodName: String): Unit = {
if (partitions.nonEmpty) {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/AsyncReplicaFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class AsyncReplicaFetcher(name: String,
failedPartitions,
fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
replicaMgr.brokerTopicStats,
fetcherEventBus = fetcherEventBus) {
fetcherEventBus = fetcherEventBus,
replicaMaxLagMs = brokerConfig.replicaLagTimeMaxMs) {

private val replicaId = brokerConfig.brokerId
private val logContext = new LogContext(s"[ReplicaFetcher replicaId=$replicaId, leaderId=${sourceBroker.id}, " +
Expand Down

0 comments on commit ed9b33c

Please sign in to comment.