diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index b546d638c620..74917f614e50 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -42,6 +42,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid} +import java.util.concurrent.{CompletableFuture, TimeUnit} import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ @@ -258,6 +259,9 @@ class Partition(val topicPartition: TopicPartition, val logSlowReplicationBucketLengthMs: Long = 60000 val logSlowReplicationBucketMaxLogCount: Int = 300 + // maybeShrinkISR will block returning by isr update completion, or this timeout + val isrUpdateTimeoutMs: Long = 5000 + // Logs belonging to this partition. Majority of time it will be only one log, but if log directory // is getting changed (as a result of ReplicaAlterLogDirs command), we may have two logs until copy // completes and a switch to new location is performed. @@ -982,37 +986,46 @@ class Partition(val topicPartition: TopicPartition, val needsIsrUpdate = !isrState.isInflight && inReadLock(leaderIsrUpdateLock) { needsShrinkIsr() } - val leaderHWIncremented = needsIsrUpdate && inWriteLock(leaderIsrUpdateLock) { - leaderLogIfLocal.exists { leaderLog => - val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs) - if (outOfSyncReplicaIds.nonEmpty) { - val outOfSyncReplicaLog = outOfSyncReplicaIds.map { replicaId => - val replica = getReplica(replicaId) - val logEndOffsetMessage = replica - .map(_.logEndOffset.toString) - .getOrElse("unknown") - val lastCaughtUpTimeMessage = replica.map(_.lastCaughtUpTimeMs.toString).getOrElse("unknown") - s"(brokerId: $replicaId, endOffset: $logEndOffsetMessage, lastCaughtUpTime: $lastCaughtUpTimeMessage)" - }.mkString(" ") - val newIsrLog = (isrState.isr -- outOfSyncReplicaIds).mkString(",") - info(s"Shrinking ISR from ${isrState.isr.mkString(",")} to $newIsrLog. " + - s"Leader: (highWatermark: ${leaderLog.highWatermark}, " + - s"endOffset: ${leaderLog.logEndOffset}). " + - s"Out of sync replicas: $outOfSyncReplicaLog.") - - shrinkIsr(outOfSyncReplicaIds) - // we may need to increment high watermark since ISR could be down to 1 - maybeIncrementLeaderHW(leaderLog) - } else { - false + if (needsIsrUpdate) { + info(s"Needs shrink isr for ${topicPartition}") + val isrUpdateCompleteFuture: CompletableFuture[Boolean] = new CompletableFuture() + + inWriteLock(leaderIsrUpdateLock) { + leaderLogIfLocal.foreach { leaderLog => + val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs) + if (outOfSyncReplicaIds.nonEmpty) { + val outOfSyncReplicaLog = outOfSyncReplicaIds.map { replicaId => + val replica = getReplica(replicaId) + val logEndOffsetMessage = replica + .map(_.logEndOffset.toString) + .getOrElse("unknown") + val lastCaughtUpTimeMessage = replica.map(_.lastCaughtUpTimeMs.toString).getOrElse("unknown") + s"(brokerId: $replicaId, endOffset: $logEndOffsetMessage, lastCaughtUpTime: $lastCaughtUpTimeMessage)" + }.mkString(" ") + val newIsrLog = (isrState.isr -- outOfSyncReplicaIds).mkString(",") + info(s"Shrinking ISR from ${isrState.isr.mkString(",")} to $newIsrLog. " + + s"Leader: (highWatermark: ${leaderLog.highWatermark}, " + + s"endOffset: ${leaderLog.logEndOffset}). " + + s"Out of sync replicas: $outOfSyncReplicaLog.") + + shrinkIsr(outOfSyncReplicaIds, isrUpdateCompleteFuture) + } } } - } - // some delayed operations may be unblocked after HW changed - if (leaderHWIncremented) - tryCompleteDelayedRequests() + try { + // some delayed operations may be unblocked after HW changed + if (isrUpdateCompleteFuture.get(isrUpdateTimeoutMs, TimeUnit.MILLISECONDS)) { + info(s"Completed isr shrink and incremented HW for partition ${topicPartition}." + + s" Current isr is ${isrState}. Trying to complete delayed request") + tryCompleteDelayedRequests() + } + } catch { + case e: Exception => + error("Failed to complete shrinking isr with exception.", e) + } + } } /** @@ -1432,15 +1445,16 @@ class Partition(val topicPartition: TopicPartition, } } - private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = { + private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int], isrUpdateCompleteFuture: CompletableFuture[Boolean] = new CompletableFuture[Boolean]): Unit = { // This is called from maybeShrinkIsr which holds the ISR write lock if (!isrState.isInflight) { // When shrinking the ISR, we cannot assume that the update will succeed as this could erroneously advance the HW // We update pendingInSyncReplicaIds here simply to prevent any further ISR updates from occurring until we get // the next LeaderAndIsr - sendAlterIsrRequest(PendingShrinkIsr(isrState.isr, outOfSyncReplicas)) + sendAlterIsrRequest(PendingShrinkIsr(isrState.isr, outOfSyncReplicas), isrUpdateCompleteFuture) } else { trace(s"ISR update in-flight, not removing out-of-sync replicas $outOfSyncReplicas") + isrUpdateCompleteFuture.complete(false) } } @@ -1457,7 +1471,7 @@ class Partition(val topicPartition: TopicPartition, } } - private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = { + private def sendAlterIsrRequest(proposedIsrState: IsrState, isrUpdateCompleteFuture: CompletableFuture[Boolean] = new CompletableFuture[Boolean]): Unit = { val isrToSend: Set[Int] = proposedIsrState match { case PendingExpandIsr(isr, newInSyncReplicaId) => isr + newInSyncReplicaId case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- outOfSyncReplicaIds @@ -1467,7 +1481,18 @@ class Partition(val topicPartition: TopicPartition, } val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion) - val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, handleAlterIsrResponse(proposedIsrState), controllerEpoch) + val alterIsrItem = AlterIsrItem( + topicPartition, + newLeaderAndIsr, + result => { + try { + isrUpdateCompleteFuture.complete(handleAlterIsrResponse(proposedIsrState)(result)) + } catch { + case e: Exception => isrUpdateCompleteFuture.completeExceptionally(e) + } + }, + controllerEpoch + ) val oldState = isrState isrState = proposedIsrState @@ -1489,15 +1514,17 @@ class Partition(val topicPartition: TopicPartition, * give up. This leaves [[Partition.isrState]] in an in-flight state (either pending shrink or pending expand). * Since our error was non-retryable we are okay staying in this state until we see new metadata from UpdateMetadata * or LeaderAndIsr + * + * @return true if the HW incremented after handleAlterIsrResponse */ - private def handleAlterIsrResponse(proposedIsrState: IsrState)(result: Either[Errors, LeaderAndIsr]): Unit = { + private def handleAlterIsrResponse(proposedIsrState: IsrState)(result: Either[Errors, LeaderAndIsr]): Boolean = { inWriteLock(leaderIsrUpdateLock) { if (isrState != proposedIsrState) { // This means isrState was updated through leader election or some other mechanism before we got the AlterIsr // response. We don't know what happened on the controller exactly, but we do know this response is out of date // so we ignore it. warn(s"Ignoring failed ISR update to $proposedIsrState since we have already updated state to $isrState") - return + return false } result match { @@ -1533,12 +1560,20 @@ class Partition(val topicPartition: TopicPartition, info(s"ISR updated to ${isrState.isr.mkString(",")} and version updated to [$zkVersion]") proposedIsrState match { case PendingExpandIsr(_, _) => isrChangeListener.markExpand() - case PendingShrinkIsr(_, _) => isrChangeListener.markShrink() + case PendingShrinkIsr(_, _) => + isrChangeListener.markShrink() + leaderLogIfLocal.foreach{leaderLog => + // Try to increment leader HWM if it tries to shrinkISR. + val leaderHWIncremented = maybeIncrementLeaderHW(leaderLog) + info(s"HW incremented: ${leaderHWIncremented}, for partition ${topicPartition} after shrinking ISR to ${isrState.isr.mkString(",")}") + return leaderHWIncremented + } case _ => // nothing to do, shouldn't get here } } } } + false } override def equals(that: Any): Boolean = that match { diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index b8fab54b9f10..e520f0ad0396 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -273,10 +273,10 @@ class PartitionLockTest extends Logging { alterIsrManager, transferLeaderManager) { - override def shrinkIsr(newIsr: Set[Int]): Unit = { + override def shrinkIsr(newIsr: Set[Int], isrUpdateCompleteFuture: CompletableFuture[Boolean]): Unit = { shrinkIsrSemaphore.acquire() try { - super.shrinkIsr(newIsr) + super.shrinkIsr(newIsr, isrUpdateCompleteFuture) } finally { shrinkIsrSemaphore.release() }