Skip to content

Commit

Permalink
POC code for reduce produce latency
Browse files Browse the repository at this point in the history
  • Loading branch information
CCisGG committed Aug 2, 2023
1 parent 6753a37 commit 720360c
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public static LiCombinedControlRequestData.LeaderAndIsrPartitionState transformL
.setReplicas(partitionState.replicas())
.setAddingReplicas(partitionState.addingReplicas())
.setRemovingReplicas(partitionState.removingReplicas())
.setIsNew(partitionState.isNew());
.setIsNew(partitionState.isNew())
.setBlockFollowerFromAddingBack(partitionState.blockFollowerFromAddingBack());
}

public static LeaderAndIsrRequestData.LeaderAndIsrPartitionState restoreLeaderAndIsrPartition(
Expand All @@ -71,7 +72,8 @@ public static LeaderAndIsrRequestData.LeaderAndIsrPartitionState restoreLeaderAn
.setReplicas(partitionState.replicas())
.setAddingReplicas(partitionState.addingReplicas())
.setRemovingReplicas(partitionState.removingReplicas())
.setIsNew(partitionState.isNew());
.setIsNew(partitionState.isNew())
.setBlockFollowerFromAddingBack(partitionState.blockFollowerFromAddingBack());
}

public static LiCombinedControlRequestData.UpdateMetadataPartitionState transformUpdateMetadataPartition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@
{ "name": "RemovingReplicas", "type": "[]int32", "versions": "0+", "ignorable": true,
"about": "The replica IDs that we are removing this partition from, or null if no replicas are being removed." },
{ "name": "IsNew", "type": "bool", "versions": "0+", "default": "false", "ignorable": true,
"about": "Whether the replica should have existed on the broker or not." }
"about": "Whether the replica should have existed on the broker or not." },
{ "name": "BlockFollowerFromAddingBack", "type": "bool", "tag": 0, "taggedVersions": "1+", "versions": "1+", "default": "false",
"about": "Whether the leader should block follower being adding back in a short period."}
]},
{ "name": "UpdateMetadataPartitionState", "versions": "0+", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0", "entityType": "topicName", "ignorable": true,
Expand Down
14 changes: 11 additions & 3 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,8 @@ class Partition(val topicPartition: TopicPartition,
highWatermarkCheckpoints: OffsetCheckpoints,
topicId: Option[Uuid]): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
info(s"Making leader to partition $partitionState")

// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionState.controllerEpoch
Expand Down Expand Up @@ -764,7 +766,10 @@ class Partition(val topicPartition: TopicPartition,
else
assignmentState = SimpleAssignmentState(assignment)

blockFollowersFromAddingBackToIsr(isr)
if (blockFollowerFromAddingBack) {
info(s"hgeng: block follower from adding to isr: $isr")
blockFollowersFromAddingBackToIsr(isr)
}
isrState = CommittedIsr(isr)
}

Expand Down Expand Up @@ -1392,9 +1397,12 @@ class Partition(val topicPartition: TopicPartition,

private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
if (expandIsrLocks.contains(newInSyncReplica)) {
if (expandIsrLocks(newInSyncReplica) + expandIsrLockTime < System.currentTimeMillis()) {
trace(s"Avoiding adding $newInSyncReplica to isr as it is locked since $expandIsrLocks(newInSyncReplica)")
if (System.currentTimeMillis() < expandIsrLocks(newInSyncReplica) + expandIsrLockTime) {
info(s"hgeng: Avoid adding $newInSyncReplica to isr as it is locked since ${expandIsrLocks(newInSyncReplica)}")
return
} else {
info(s"hgeng: Remove $newInSyncReplica from expandIsrLock map. Current time ${System.currentTimeMillis()}" +
s", lock time ${expandIsrLocks(newInSyncReplica)}")
expandIsrLocks.remove(newInSyncReplica)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
else 0

var blockFollowerAddingBack = false

val maxBrokerEpoch = controllerContext.maxBrokerEpoch
leaderAndIsrRequestMap.forKeyValue { (broker, leaderAndIsrPartitionStates) =>
if (controllerContext.liveOrShuttingDownBrokerIds.contains(broker)) {
Expand All @@ -677,8 +679,14 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
} else {
"become-follower"
}
if (state.blockFollowerFromAddingBack()) {
blockFollowerAddingBack = true
info("hgeng:Blocking follower from adding back is true")
}

if (stateChangeLog.isTraceEnabled)
stateChangeLog.trace(s"Sending $typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition")
stateChangeLog.trace(s"hgeng: Sending $typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition" +
s"blockFollowerAddingBack $blockFollowerAddingBack")
}
stateChangeLog.info(s"Sending LeaderAndIsr request to broker $broker with $numBecomeLeaders become-leader " +
s"and ${leaderAndIsrPartitionStates.size - numBecomeLeaders} become-follower partitions")
Expand All @@ -701,6 +709,9 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
controllerEpoch, brokerEpoch, maxBrokerEpoch, leaderAndIsrPartitionStates.values.toBuffer.asJava, topicIds.asJava, leaders.asJava)
}

if (blockFollowerAddingBack) {
info(s"hgeng: LeaderAndIsrRequestBuilder is $leaderAndIsrRequestBuilder")
}

sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) => {
val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse]
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,7 @@ class KafkaController(val config: KafkaConfig,

zkClient.recordBrokerShutdown(id, brokerEpoch, controllerContext.epochZkVersion)
controllerContext.shuttingDownBrokerIds += (id -> brokerEpoch)
info(s"Shutting down broker $id")
info(s"hgeng:Shutting down broker $id")

debug(s"All shutting down brokers: ${controllerContext.shuttingDownBrokerIds.mkString(",")}")
debug(s"Live brokers: ${controllerContext.liveBrokerIds.mkString(",")}")
Expand All @@ -1764,8 +1764,12 @@ class KafkaController(val config: KafkaConfig,
// If the broker is a follower, updates the isr in ZK and notifies the current leader
val followerReplicas = partitionsFollowedByBroker.map(partition => PartitionAndReplica(partition, id)).toSeq
controllerContext.replicasBeingShutdown ++= followerReplicas
info(s"hgeng: replicas being shutdown: ${controllerContext.replicasBeingShutdown}")
info("hgeng: Handling state changes")
replicaStateMachine.handleStateChanges(followerReplicas, OfflineReplica)
info("hgeng: finished handling changes")
controllerContext.replicasBeingShutdown --= followerReplicas
info(s"hgeng: replicas being shutdown: ${controllerContext.replicasBeingShutdown}")
trace(s"All leaders = ${controllerContext.partitionsLeadershipInfo.mkString(",")}")
if (shouldSkipShutdownSafetyCheck) {
// When skipping shutdown safety check, we allow the broker to shutdown even though it may be the leader for some partitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,12 @@ class ZkReplicaStateMachine(config: KafkaConfig,
}
val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))
updatedLeaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>
stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
stateLogger.info(s"hgeng: Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)
val recipients = controllerContext.partitionReplicaAssignment(partition)
// possible PERF TODO? could add controllerContextSnapshot as 6th arg here, too:
stateLogger.info(s"hgeng: partition: $partition" +
s"replicaId: $replicaId, contains? ${controllerContext.replicasBeingShutdown.contains(PartitionAndReplica(partition, replicaId))}")
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients,
partition,
leaderIsrAndControllerEpoch,
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/kafka/server/DelayedProduce.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class DelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
lockOpt: Option[Lock] = None)
lockOpt: Option[Lock] = None,
creationTime: Long = System.currentTimeMillis())
extends DelayedOperation(delayMs, lockOpt) {

import DelayedOperation._
Expand Down Expand Up @@ -126,6 +127,14 @@ class DelayedProduce(delayMs: Long,
* Upon completion, return the current response status along with the error code per partition
*/
override def onComplete(): Unit = {
val timeToComplete = System.currentTimeMillis() - creationTime
if (timeToComplete > 1000) {
info(s"Delayed request takes ${timeToComplete} to complete," +
s" partitions are ${produceMetadata.produceStatus.keysIterator.next()}")
info(s"Delayed request takes ${timeToComplete} to complete," +
s" partitions are ${produceMetadata.produceStatus.keySet}")
}

val responseStatus = produceMetadata.produceStatus.map { case (k, status) => k -> status.responseStatus }
responseCallback(responseStatus)
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,12 @@ class KafkaApis(val requestChannel: RequestChannel,
}

private def doHandleLeaderAndIsrRequest(correlationId: Int, leaderAndIsrRequest: LeaderAndIsrRequest): LeaderAndIsrResponse = {

leaderAndIsrRequest.partitionStates().forEach( state =>
info(s"hgeng: Handling LAIR request partition state ${state}")
)


replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest,
RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))
}
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class KafkaServer(
*/
override def startup(): Unit = {
try {
info("starting")
info("hgeng:1:25: starting")

if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
Expand Down Expand Up @@ -547,7 +547,8 @@ class KafkaServer(
}

private def initZkClient(time: Time): Unit = {
info(s"Connecting to zookeeper on ${config.zkConnect}")
info("hgeng: init zk client")
info(s"hgeng fat: Connecting to zookeeper on ${config.zkConnect}")

val secureAclsEnabled = config.zkEnableSecureAcls
val isZkSecurityEnabled = JaasUtils.isZkSaslEnabled() || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig)
Expand Down Expand Up @@ -758,7 +759,7 @@ class KafkaServer(
// We request the controller to do a controlled shutdown. On failure, we backoff for a configured period
// of time and try again for a configured number of retries. If all the attempt fails, we simply force
// the shutdown.
info("Starting controlled shutdown")
info("hgeng:Starting controlled shutdown")

_brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN

Expand All @@ -775,7 +776,7 @@ class KafkaServer(
*/
override def shutdown(): Unit = {
try {
info("shutting down")
info("hgeng: shutting down")

if (isStartingUp.get)
throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")
Expand All @@ -784,11 +785,13 @@ class KafkaServer(
// last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to
// `true` at the end of this method.
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
info("hgeng: sending controlled shutdown")
CoreUtils.swallow(controlledShutdown(), this)
_brokerState = BrokerState.SHUTTING_DOWN

// This delay is to wait LeaderAndIsrRequest to remove the followers on this broker from their ISRs.
Thread.sleep(60000)
info("hgeng: received controlled shutdown success, and slept for 60 seconds")

if (healthCheckScheduler != null)
healthCheckScheduler.shutdown()
Expand Down

0 comments on commit 720360c

Please sign in to comment.