Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: skip resetting leader when resetting tso allocator #8495

Merged
merged 3 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
log.Info("the raftcluster is closed, stop to alloc nodes to all keyspace groups")
return
case <-ticker.C:
if m.GetNodesCount() == 0 {
continue
}
}
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
// Start keepalive the Local TSO Allocator leadership and enable Local TSO service.
ctx, cancel := context.WithCancel(loopCtx)
defer cancel()
defer am.ResetAllocatorGroup(allocator.GetDCLocation())
defer am.ResetAllocatorGroup(allocator.GetDCLocation(), false)
// Maintain the Local TSO Allocator leader
go allocator.KeepAllocatorLeader(ctx)

Expand Down Expand Up @@ -785,7 +785,7 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
zap.String("dc-location", ag.dcLocation),
zap.String("name", am.member.Name()),
errs.ZapError(err))
am.ResetAllocatorGroup(ag.dcLocation)
am.ResetAllocatorGroup(ag.dcLocation, false)
return
}
}
Expand Down Expand Up @@ -1038,7 +1038,7 @@ func (am *AllocatorManager) PriorityChecker() {
log.Info("next leader key found, resign current leader",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.Uint64("nextLeaderID", nextLeader))
am.ResetAllocatorGroup(allocatorGroup.dcLocation)
am.ResetAllocatorGroup(allocatorGroup.dcLocation, false)
}
}
}
Expand Down Expand Up @@ -1132,13 +1132,13 @@ func (am *AllocatorManager) HandleRequest(ctx context.Context, dcLocation string

// ResetAllocatorGroup will reset the allocator's leadership and TSO initialized in memory.
// It usually should be called before re-triggering an Allocator leader campaign.
func (am *AllocatorManager) ResetAllocatorGroup(dcLocation string) {
func (am *AllocatorManager) ResetAllocatorGroup(dcLocation string, skipResetLeader bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is skipResetLeader true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a PR description.

am.mu.Lock()
defer am.mu.Unlock()
if allocatorGroup, exist := am.mu.allocatorGroups[dcLocation]; exist {
allocatorGroup.allocator.Reset()
// Reset if it still has the leadership. Otherwise the data race may occur because of the re-campaigning.
if allocatorGroup.leadership.Check() {
if !skipResetLeader && allocatorGroup.leadership.Check() {
allocatorGroup.leadership.Reset()
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
return
}
defer func() {
gta.am.ResetAllocatorGroup(GlobalDCLocation)
gta.am.ResetAllocatorGroup(GlobalDCLocation, false)
}()

tsoLabel := fmt.Sprintf("TSO Service Group %d", gta.getGroupID())
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1746,7 +1746,7 @@ func (s *Server) campaignLeader() {
return
}
defer func() {
s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation)
s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation, false)
failpoint.Inject("updateAfterResetTSO", func() {
if err = allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
Expand Down
Loading