Skip to content

Commit

Permalink
Add log if process fetch takes more than replicaLagTimeMaxMs
Browse files Browse the repository at this point in the history
  • Loading branch information
CCisGG committed Aug 16, 2023
1 parent 73cdd9b commit 50e372f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
16 changes: 14 additions & 2 deletions core/src/main/scala/kafka/server/AbstractAsyncFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ abstract class AbstractAsyncFetcher(name: String,
failedPartitions: FailedPartitions,
fetchBackOffMs: Int = 0,
val brokerTopicStats: BrokerTopicStats, //BrokerTopicStats's lifecycle managed by ReplicaManager
fetcherEventBus: FetcherEventBus) extends FetcherEventProcessor with Logging {
fetcherEventBus: FetcherEventBus,
replicaMaxLagMs: Long) extends FetcherEventProcessor with Logging {

type FetchData = FetchResponseData.PartitionData
type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
Expand Down Expand Up @@ -166,10 +167,21 @@ abstract class AbstractAsyncFetcher(name: String,
trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
true
case Some(ReplicaFetch(sessionPartitions, fetchRequest)) =>
processFetchRequest(sessionPartitions, fetchRequest)
processFetchRequestAndMaybeLog(sessionPartitions, fetchRequest)
}
}

private def processFetchRequestAndMaybeLog(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
fetchRequest: FetchRequest.Builder): Boolean = {
val startTime = System.currentTimeMillis()
val result = processFetchRequest(sessionPartitions, fetchRequest)
val processFetchRequestTotalTime = System.currentTimeMillis() - startTime
if (processFetchRequestTotalTime >= replicaMaxLagMs) {
warn(s"Process fetch request took $processFetchRequestTotalTime ms. Fetch request: $fetchRequest")
}
result
}

// deal with partitions with errors, potentially due to leadership changes
private def handlePartitionsWithErrors(partitions: Iterable[TopicPartition], methodName: String): Unit = {
if (partitions.nonEmpty) {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/AsyncReplicaFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class AsyncReplicaFetcher(name: String,
failedPartitions,
fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
replicaMgr.brokerTopicStats,
fetcherEventBus = fetcherEventBus) {
fetcherEventBus = fetcherEventBus,
replicaMaxLagMs = brokerConfig.replicaLagTimeMaxMs) {

private val replicaId = brokerConfig.brokerId
private val logContext = new LogContext(s"[ReplicaFetcher replicaId=$replicaId, leaderId=${sourceBroker.id}, " +
Expand Down

0 comments on commit 50e372f

Please sign in to comment.