Skip to content

Commit

Permalink
Added test and fixes on the broker side to rearrange assignments duri…
Browse files Browse the repository at this point in the history
…ng partition expansion
  • Loading branch information
gitlw committed May 26, 2021
1 parent 3f16983 commit caddfe1
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
27 changes: 18 additions & 9 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ class KafkaController(val config: KafkaConfig,
// between the moment this broker started and right now when it becomes controller again.
loadMinIsrForTopics(controllerContext.allTopics)

rearrangePartitionReplicaAssignmentForNewTopics(controllerContext.allTopics.toSet)
rearrangePartitionReplicaAssignmentForNewPartitions(controllerContext.allTopics.toSet, true)
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
getReplicaAssignmentPolicyCompliant(controllerContext.allTopics.toSet).foreach {
case (topicPartition, replicaAssignment) =>
Expand Down Expand Up @@ -968,25 +968,33 @@ class KafkaController(val config: KafkaConfig,

// Rearrange partition and replica assignment for new topics that get assigned to
// maintenance brokers that do not take new partitions
private def rearrangePartitionReplicaAssignmentForNewTopics(topics: Set[String]) {
private def rearrangePartitionReplicaAssignmentForNewPartitions(topics: Set[String], onlyNewTopics: Boolean) {
try {
val noNewPartitionBrokers = partitionUnassignableBrokerIds
if (noNewPartitionBrokers.nonEmpty) {
val newTopics = zkClient.getPartitionNodeNonExistsTopics(topics.toSet)
val newTopicsToBeArranged = zkClient.getPartitionAssignmentForTopics(newTopics).filter {
case (_, partitionMap) =>
partitionMap.exists {
val topicsToCheck = if (onlyNewTopics)
zkClient.getPartitionNodeNonExistsTopics(topics.toSet)
else
topics

val topicsToBeRearranged = zkClient.getPartitionAssignmentForTopics(topicsToCheck.toSet).filter {
case (topic, partitionMap) =>
val existingAssignment = controllerContext.partitionAssignments.getOrElse(topic, mutable.Map.empty)
val newPartitions = partitionMap.filter{case (partitionId, _) => partitionId >= existingAssignment.size}
newPartitions.exists {
case (_, assignedReplicas) =>
assignedReplicas.replicas.intersect(noNewPartitionBrokers).nonEmpty
}
}
newTopicsToBeArranged.foreach {
topicsToBeRearranged.foreach {
case (topic, partitionMap) =>
val numPartitions = partitionMap.size
val numReplica = partitionMap.head._2.replicas.size
val brokers = controllerContext.liveOrShuttingDownBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }.toSeq

val replicaAssignment = adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokers.toSet, numPartitions, numReplica)
val existingAssignment = controllerContext.partitionAssignments.getOrElse(topic, mutable.Map.empty)
val partitionsToAdd = numPartitions - existingAssignment.size
val replicaAssignment = adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokers.toSet, partitionsToAdd, numReplica, -1, existingAssignment.size)
adminZkClient.writeTopicPartitionAssignment(topic, replicaAssignment.mapValues(ReplicaAssignment(_)).toMap, true)
info(s"Rearrange partition and replica assignment for topic [$topic]")
}
Expand Down Expand Up @@ -1697,7 +1705,7 @@ class KafkaController(val config: KafkaConfig,
val newTopics = topics -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- topics
controllerContext.allTopics = topics
rearrangePartitionReplicaAssignmentForNewTopics(newTopics)
rearrangePartitionReplicaAssignmentForNewPartitions(newTopics, true)

registerPartitionModificationsHandlers(newTopics.toSeq)
val addedPartitionReplicaAssignment = getReplicaAssignmentPolicyCompliant(newTopics)
Expand Down Expand Up @@ -1755,6 +1763,7 @@ class KafkaController(val config: KafkaConfig,
}

if (!isActive) return
rearrangePartitionReplicaAssignmentForNewPartitions(immutable.Set(topic), false)
val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ class PreferredControllerTest extends ZooKeeperTestHarness {
val brokerConfigs = Seq((0, false), (1, true), (2, false))
createBrokersWithPreferredControllers(brokerConfigs, true)

val brokerList = TestUtils.bootstrapServers(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
TestUtils.waitUntilControllerElected(zkClient)
// create topic using admin client
val topic = "topic1"
Expand Down

0 comments on commit caddfe1

Please sign in to comment.