Skip to content

Commit

Permalink
Avoid handling StopReplicaRequest if no replicas to stop
Browse files Browse the repository at this point in the history
  • Loading branch information
CCisGG committed Aug 15, 2023
1 parent 09c0281 commit ecd7a23
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object LiDecomposedControlRequestUtils {
val stopReplicaRequests = if (stopReplicaRequestVersion == 3) {
extractStopReplicaRequestWithUngroupedPartitions(request, brokerEpoch, stopReplicaRequestVersion)
} else {
List(extractStopReplicaRequestWithTopicStates(request, stopReplicaRequestVersion))
extractStopReplicaRequestWithTopicStates(request, stopReplicaRequestVersion)
}

LiDecomposedControlRequest(leaderAndIsrRequest, updateMetadataRequest, stopReplicaRequests)
Expand Down Expand Up @@ -118,7 +118,11 @@ object LiDecomposedControlRequestUtils {
}
}

private def extractStopReplicaRequestWithTopicStates(request: LiCombinedControlRequest, stopReplicaRequestVersion: Short): StopReplicaRequest = {
private def extractStopReplicaRequestWithTopicStates(request: LiCombinedControlRequest, stopReplicaRequestVersion: Short): List[StopReplicaRequest] = {
if (request.stopReplicaTopicStates().isEmpty) {
return List.empty
}

// for StopReplicaRequests versions 4+, the deletePartitions flag on the top level is not used
val defaultDeletePartitions = false
val stopReplicaTopicStates = new util.ArrayList[StopReplicaTopicState]()
Expand All @@ -137,9 +141,9 @@ object LiDecomposedControlRequestUtils {
.setPartitionStates(partitionStates))
}
}
new StopReplicaRequest.Builder(stopReplicaRequestVersion, request.controllerId(), request.controllerEpoch(),
List(new StopReplicaRequest.Builder(stopReplicaRequestVersion, request.controllerId(), request.controllerEpoch(),
request.brokerEpoch(),
request.maxBrokerEpoch(), defaultDeletePartitions, stopReplicaTopicStates).build()
request.maxBrokerEpoch(), defaultDeletePartitions, stopReplicaTopicStates).build())
}

// extractStopReplicaRequestWithUngroupedPartitions could possible return two StopReplicaRequests
Expand Down

0 comments on commit ecd7a23

Please sign in to comment.