Skip to content

Conversation

@fenil25
Copy link
Contributor

@fenil25 fenil25 commented Oct 22, 2025

This PR - changed the logic of how we close the coordinator in ICR mode.

Initially leaderPartition was only checked while assigning new partitions to the task during rebalancing.
While closing the partitions for a task, the coordinator was killed. This did not work during incremental co-operative rebalancing. This PR - #12372 fixed this issue.

However, we are being too conservative right now in checking whether there is leaderPartition. When, Kafka connect consumers are rebalancing, there can be a possibility that partition-0 is assigned to task A, and we start the coordinator. During a rebalance operation, partition 0 can move from task A to task B. When it calls close() on A, the group is rebalancing, and we never close the coordinator.
This removes the criteria that consumer needs to be stable while opening or closing partitions. Even during continuous rebalance, Kafka will always provide new partitions in opening and closing call. So, we don't have to check whether consumer group is stable while closing the coordinator.

Currently, without this, during rebalancing, there is a race condition and there is a possibility of having multiple coordinators around. This fixes the issue.

This PR #13756 addresses some of the scenarios where there can be multiple coordinators.
This PR handles the case while rebalancing.

@fenil25 fenil25 changed the title always stop the coordinator if it has leader partition Always stop the coordinator if it has leader partition Oct 22, 2025
@fenil25
Copy link
Contributor Author

fenil25 commented Oct 22, 2025

@kumarpritam863 @bryanck will appreciate a review! 🙇

}

private boolean hasLeaderPartition(Collection<TopicPartition> currentAssignedPartitions) {
private boolean hasLeaderPartition(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change the parameter name to onlyIfStable to be a little bit more clear? Alternatively the method could return an enum indicating the state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Updated. ✅

@fenil25 fenil25 requested a review from bryanck October 22, 2025 21:44
@bryanck
Copy link
Contributor

bryanck commented Oct 23, 2025

@kumarpritam863 Do you have a minute to take a look at this PR?

@github-actions github-actions bot added the Specification Issues that may introduce spec changes. label Oct 24, 2025
@fenil25 fenil25 changed the title Always stop the coordinator if it has leader partition Don't ensure consumer group is stable when checking for leader partitions as some race conditions can allow dual coordinators Oct 24, 2025
@fenil25
Copy link
Contributor Author

fenil25 commented Oct 24, 2025

I realized if we always close the coordinator when a partiion is revoked; during race conditions, we can also end up with no coordinators if we check for stability during open call for new partitions.
This ensures we never check for stability and just let Kafka Connect provide the list of partitions during open and close call while electing leader partition for coordinators.

@kumarpritam863 let me know your thoughts! Thank you. 🙇

@kumarpritam863
Copy link
Contributor

@fenil25 Thank you so much for this thoughtful change — really appreciate your effort!
Just wanted to share that this was already discussed with @bryanck a while back, and we’d agreed that the check in CommitterImpl is indeed redundant. I was about to make this change but was a little busy and a bunch of resiliency tests were pending to cover some scenarios. The client can’t guarantee server-side invariants, so removing it makes perfect sense.
Also if you see the consumer coordinator class of kafka, it does check stability before proceeding with the open and close calls.
Your change aligns perfectly with that direction.
Just before going ahead with this, it would be nice if we can add some unit and integration tests on this one.

private CommitterImpl committer;
private List<MemberDescription> members;
private List<TopicPartition> leaderAssignments;
private List<TopicPartition> nonLeaderAssignments;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these need to be member variables, I feel it would be better to initialize these in the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use these variables in both the tests so to avoid code repetition, I added them in Before call. I can add them individually in each test.

@bryanck bryanck self-requested a review November 3, 2025 18:29
@fenil25
Copy link
Contributor Author

fenil25 commented Nov 4, 2025

@fenil25 Thank you so much for this thoughtful change — really appreciate your effort! Just wanted to share that this was already discussed with @bryanck a while back, and we’d agreed that the check in CommitterImpl is indeed redundant. I was about to make this change but was a little busy and a bunch of resiliency tests were pending to cover some scenarios. The client can’t guarantee server-side invariants, so removing it makes perfect sense. Also if you see the consumer coordinator class of kafka, it does check stability before proceeding with the open and close calls. Your change aligns perfectly with that direction. Just before going ahead with this, it would be nice if we can add some unit and integration tests on this one.

Perfect. Thanks a lot for the insights! Really appreciate your eyes. I added some tests 🙇‍♂️

@bryanck
Copy link
Contributor

bryanck commented Nov 4, 2025

LGTM. Thanks @fenil25 for the contribution, and @kumarpritam863 for the review!

@bryanck bryanck changed the title Don't ensure consumer group is stable when checking for leader partitions as some race conditions can allow dual coordinators Don't check that consumer group is stable for coordinator open/close Nov 4, 2025
@bryanck bryanck changed the title Don't check that consumer group is stable for coordinator open/close Kafka Connect: Don't check that consumer group is stable for coordinator leader election Nov 4, 2025
@bryanck bryanck merged commit c235305 into apache:main Nov 4, 2025
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

KAFKACONNECT Specification Issues that may introduce spec changes.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants