diff --git a/clients/src/main/java/org/apache/kafka/common/utils/LiCombinedControlTransformer.java b/clients/src/main/java/org/apache/kafka/common/utils/LiCombinedControlTransformer.java index be54b5338077..382265206ff9 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/LiCombinedControlTransformer.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/LiCombinedControlTransformer.java @@ -56,7 +56,8 @@ public static LiCombinedControlRequestData.LeaderAndIsrPartitionState transformL .setReplicas(partitionState.replicas()) .setAddingReplicas(partitionState.addingReplicas()) .setRemovingReplicas(partitionState.removingReplicas()) - .setIsNew(partitionState.isNew()); + .setIsNew(partitionState.isNew()) + .setBlockFollowerFromAddingBack(partitionState.blockFollowerFromAddingBack()); } public static LeaderAndIsrRequestData.LeaderAndIsrPartitionState restoreLeaderAndIsrPartition( @@ -71,7 +72,8 @@ public static LeaderAndIsrRequestData.LeaderAndIsrPartitionState restoreLeaderAn .setReplicas(partitionState.replicas()) .setAddingReplicas(partitionState.addingReplicas()) .setRemovingReplicas(partitionState.removingReplicas()) - .setIsNew(partitionState.isNew()); + .setIsNew(partitionState.isNew()) + .setBlockFollowerFromAddingBack(partitionState.blockFollowerFromAddingBack()); } public static LiCombinedControlRequestData.UpdateMetadataPartitionState transformUpdateMetadataPartition( diff --git a/clients/src/main/resources/common/message/LiCombinedControlRequest.json b/clients/src/main/resources/common/message/LiCombinedControlRequest.json index 015f8a93163b..079846cc7fa9 100644 --- a/clients/src/main/resources/common/message/LiCombinedControlRequest.json +++ b/clients/src/main/resources/common/message/LiCombinedControlRequest.json @@ -127,7 +127,9 @@ { "name": "RemovingReplicas", "type": "[]int32", "versions": "0+", "ignorable": true, "about": "The replica IDs that we are removing this partition from, or null if no replicas are being removed." }, { "name": "IsNew", "type": "bool", "versions": "0+", "default": "false", "ignorable": true, - "about": "Whether the replica should have existed on the broker or not." } + "about": "Whether the replica should have existed on the broker or not." }, + { "name": "BlockFollowerFromAddingBack", "type": "bool", "tag": 0, "taggedVersions": "1+", "versions": "1+", "default": "false", + "about": "Whether the leader should block follower being adding back in a short period."} ]}, { "name": "UpdateMetadataPartitionState", "versions": "0+", "fields": [ { "name": "TopicName", "type": "string", "versions": "0", "entityType": "topicName", "ignorable": true, diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 76d5a64dabaa..ac6212c08963 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -552,6 +552,8 @@ class Partition(val topicPartition: TopicPartition, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Boolean = { val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { + info(s"Making leader to partition $partitionState") + // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = partitionState.controllerEpoch @@ -764,7 +766,10 @@ class Partition(val topicPartition: TopicPartition, else assignmentState = SimpleAssignmentState(assignment) - blockFollowersFromAddingBackToIsr(isr) + if (blockFollowerFromAddingBack) { + info(s"hgeng: block follower from adding to isr: $isr") + blockFollowersFromAddingBackToIsr(isr) + } isrState = CommittedIsr(isr) } @@ -1392,9 +1397,12 @@ class Partition(val topicPartition: TopicPartition, private[cluster] def expandIsr(newInSyncReplica: Int): Unit = { if (expandIsrLocks.contains(newInSyncReplica)) { - if (expandIsrLocks(newInSyncReplica) + expandIsrLockTime < System.currentTimeMillis()) { - trace(s"Avoiding adding $newInSyncReplica to isr as it is locked since $expandIsrLocks(newInSyncReplica)") + if (System.currentTimeMillis() < expandIsrLocks(newInSyncReplica) + expandIsrLockTime) { + info(s"hgeng: Avoid adding $newInSyncReplica to isr as it is locked since ${expandIsrLocks(newInSyncReplica)}") + return } else { + info(s"hgeng: Remove $newInSyncReplica from expandIsrLock map. Current time ${System.currentTimeMillis()}" + + s", lock time ${expandIsrLocks(newInSyncReplica)}") expandIsrLocks.remove(newInSyncReplica) } } diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index dc8cf457754b..d0c388f2b5bb 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -664,6 +664,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1 else 0 + var blockFollowerAddingBack = false + val maxBrokerEpoch = controllerContext.maxBrokerEpoch leaderAndIsrRequestMap.forKeyValue { (broker, leaderAndIsrPartitionStates) => if (controllerContext.liveOrShuttingDownBrokerIds.contains(broker)) { @@ -677,8 +679,14 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, } else { "become-follower" } + if (state.blockFollowerFromAddingBack()) { + blockFollowerAddingBack = true + info("hgeng:Blocking follower from adding back is true") + } + if (stateChangeLog.isTraceEnabled) - stateChangeLog.trace(s"Sending $typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition") + stateChangeLog.trace(s"hgeng: Sending $typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition" + + s"blockFollowerAddingBack $blockFollowerAddingBack") } stateChangeLog.info(s"Sending LeaderAndIsr request to broker $broker with $numBecomeLeaders become-leader " + s"and ${leaderAndIsrPartitionStates.size - numBecomeLeaders} become-follower partitions") @@ -701,6 +709,9 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, controllerEpoch, brokerEpoch, maxBrokerEpoch, leaderAndIsrPartitionStates.values.toBuffer.asJava, topicIds.asJava, leaders.asJava) } + if (blockFollowerAddingBack) { + info(s"hgeng: LeaderAndIsrRequestBuilder is $leaderAndIsrRequestBuilder") + } sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) => { val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse] diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index ac88a63ace42..4cd1c369aafa 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1745,7 +1745,7 @@ class KafkaController(val config: KafkaConfig, zkClient.recordBrokerShutdown(id, brokerEpoch, controllerContext.epochZkVersion) controllerContext.shuttingDownBrokerIds += (id -> brokerEpoch) - info(s"Shutting down broker $id") + info(s"hgeng:Shutting down broker $id") debug(s"All shutting down brokers: ${controllerContext.shuttingDownBrokerIds.mkString(",")}") debug(s"Live brokers: ${controllerContext.liveBrokerIds.mkString(",")}") @@ -1764,8 +1764,12 @@ class KafkaController(val config: KafkaConfig, // If the broker is a follower, updates the isr in ZK and notifies the current leader val followerReplicas = partitionsFollowedByBroker.map(partition => PartitionAndReplica(partition, id)).toSeq controllerContext.replicasBeingShutdown ++= followerReplicas + info(s"hgeng: replicas being shutdown: ${controllerContext.replicasBeingShutdown}") + info("hgeng: Handling state changes") replicaStateMachine.handleStateChanges(followerReplicas, OfflineReplica) + info("hgeng: finished handling changes") controllerContext.replicasBeingShutdown --= followerReplicas + info(s"hgeng: replicas being shutdown: ${controllerContext.replicasBeingShutdown}") trace(s"All leaders = ${controllerContext.partitionsLeadershipInfo.mkString(",")}") if (shouldSkipShutdownSafetyCheck) { // When skipping shutdown safety check, we allow the broker to shutdown even though it may be the leader for some partitions. diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 6f8bfc9982f3..b246d395d5ba 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -289,10 +289,12 @@ class ZkReplicaStateMachine(config: KafkaConfig, } val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition)) updatedLeaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) => - stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica") + stateLogger.info(s"hgeng: Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica") if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) { - val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId) + val recipients = controllerContext.partitionReplicaAssignment(partition) // possible PERF TODO? could add controllerContextSnapshot as 6th arg here, too: + stateLogger.info(s"hgeng: partition: $partition" + + s"replicaId: $replicaId, contains? ${controllerContext.replicasBeingShutdown.contains(PartitionAndReplica(partition, replicaId))}") controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients, partition, leaderIsrAndControllerEpoch, diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index ced8d5f53a35..a35bfa356c91 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -54,7 +54,8 @@ class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, responseCallback: Map[TopicPartition, PartitionResponse] => Unit, - lockOpt: Option[Lock] = None) + lockOpt: Option[Lock] = None, + creationTime: Long = System.currentTimeMillis()) extends DelayedOperation(delayMs, lockOpt) { import DelayedOperation._ @@ -126,6 +127,14 @@ class DelayedProduce(delayMs: Long, * Upon completion, return the current response status along with the error code per partition */ override def onComplete(): Unit = { + val timeToComplete = System.currentTimeMillis() - creationTime + if (timeToComplete > 1000) { + info(s"Delayed request takes ${timeToComplete} to complete," + + s" partitions are ${produceMetadata.produceStatus.keysIterator.next()}") + info(s"Delayed request takes ${timeToComplete} to complete," + + s" partitions are ${produceMetadata.produceStatus.keySet}") + } + val responseStatus = produceMetadata.produceStatus.map { case (k, status) => k -> status.responseStatus } responseCallback(responseStatus) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2afd58cc8803..4a09218db62b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -289,6 +289,12 @@ class KafkaApis(val requestChannel: RequestChannel, } private def doHandleLeaderAndIsrRequest(correlationId: Int, leaderAndIsrRequest: LeaderAndIsrRequest): LeaderAndIsrResponse = { + + leaderAndIsrRequest.partitionStates().forEach( state => + info(s"hgeng: Handling LAIR request partition state ${state}") + ) + + replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _)) } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 9c3a8ee544fc..688e5102685f 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -221,7 +221,7 @@ class KafkaServer( */ override def startup(): Unit = { try { - info("starting") + info("hgeng:1:25: starting") if (isShuttingDown.get) throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!") @@ -547,7 +547,8 @@ class KafkaServer( } private def initZkClient(time: Time): Unit = { - info(s"Connecting to zookeeper on ${config.zkConnect}") + info("hgeng: init zk client") + info(s"hgeng fat: Connecting to zookeeper on ${config.zkConnect}") val secureAclsEnabled = config.zkEnableSecureAcls val isZkSecurityEnabled = JaasUtils.isZkSaslEnabled() || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig) @@ -758,7 +759,7 @@ class KafkaServer( // We request the controller to do a controlled shutdown. On failure, we backoff for a configured period // of time and try again for a configured number of retries. If all the attempt fails, we simply force // the shutdown. - info("Starting controlled shutdown") + info("hgeng:Starting controlled shutdown") _brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN @@ -775,7 +776,7 @@ class KafkaServer( */ override def shutdown(): Unit = { try { - info("shutting down") + info("hgeng: shutting down") if (isStartingUp.get) throw new IllegalStateException("Kafka server is still starting up, cannot shut down!") @@ -784,11 +785,13 @@ class KafkaServer( // last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to // `true` at the end of this method. if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) { + info("hgeng: sending controlled shutdown") CoreUtils.swallow(controlledShutdown(), this) _brokerState = BrokerState.SHUTTING_DOWN // This delay is to wait LeaderAndIsrRequest to remove the followers on this broker from their ISRs. Thread.sleep(60000) + info("hgeng: received controlled shutdown success, and slept for 60 seconds") if (healthCheckScheduler != null) healthCheckScheduler.shutdown()