Skip to content

Commit

Permalink
Check supplied topics without considering they are new or not
Browse files Browse the repository at this point in the history
  • Loading branch information
gitlw committed May 26, 2021
1 parent caddfe1 commit 38e9116
Showing 1 changed file with 4 additions and 9 deletions.
13 changes: 4 additions & 9 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ class KafkaController(val config: KafkaConfig,
// between the moment this broker started and right now when it becomes controller again.
loadMinIsrForTopics(controllerContext.allTopics)

rearrangePartitionReplicaAssignmentForNewPartitions(controllerContext.allTopics.toSet, true)
rearrangePartitionReplicaAssignmentForNewPartitions(controllerContext.allTopics.toSet)
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
getReplicaAssignmentPolicyCompliant(controllerContext.allTopics.toSet).foreach {
case (topicPartition, replicaAssignment) =>
Expand Down Expand Up @@ -968,15 +968,10 @@ class KafkaController(val config: KafkaConfig,

// Rearrange partition and replica assignment for new topics that get assigned to
// maintenance brokers that do not take new partitions
private def rearrangePartitionReplicaAssignmentForNewPartitions(topics: Set[String], onlyNewTopics: Boolean) {
private def rearrangePartitionReplicaAssignmentForNewPartitions(topicsToCheck: Set[String]) {
try {
val noNewPartitionBrokers = partitionUnassignableBrokerIds
if (noNewPartitionBrokers.nonEmpty) {
val topicsToCheck = if (onlyNewTopics)
zkClient.getPartitionNodeNonExistsTopics(topics.toSet)
else
topics

val topicsToBeRearranged = zkClient.getPartitionAssignmentForTopics(topicsToCheck.toSet).filter {
case (topic, partitionMap) =>
val existingAssignment = controllerContext.partitionAssignments.getOrElse(topic, mutable.Map.empty)
Expand Down Expand Up @@ -1705,7 +1700,7 @@ class KafkaController(val config: KafkaConfig,
val newTopics = topics -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- topics
controllerContext.allTopics = topics
rearrangePartitionReplicaAssignmentForNewPartitions(newTopics, true)
rearrangePartitionReplicaAssignmentForNewPartitions(newTopics)

registerPartitionModificationsHandlers(newTopics.toSeq)
val addedPartitionReplicaAssignment = getReplicaAssignmentPolicyCompliant(newTopics)
Expand Down Expand Up @@ -1763,7 +1758,7 @@ class KafkaController(val config: KafkaConfig,
}

if (!isActive) return
rearrangePartitionReplicaAssignmentForNewPartitions(immutable.Set(topic), false)
rearrangePartitionReplicaAssignmentForNewPartitions(immutable.Set(topic))
val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
Expand Down

0 comments on commit 38e9116

Please sign in to comment.