Skip to content

Commit

Permalink
skip resetting leader when resetting tso allocator
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Aug 6, 2024
1 parent c8ad186 commit 88bf631
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 7 deletions.
3 changes: 3 additions & 0 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
log.Info("the raftcluster is closed, stop to alloc nodes to all keyspace groups")
return
case <-ticker.C:
if m.nodesBalancer.Len() == 0 {
continue
}
}
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ func (am *AllocatorManager) campaignAllocatorLeader(
// Start keepalive the Local TSO Allocator leadership and enable Local TSO service.
ctx, cancel := context.WithCancel(loopCtx)
defer cancel()
defer am.ResetAllocatorGroup(allocator.GetDCLocation())
defer am.ResetAllocatorGroup(allocator.GetDCLocation(), false)
// Maintain the Local TSO Allocator leader
go allocator.KeepAllocatorLeader(ctx)

Expand Down Expand Up @@ -785,7 +785,7 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
zap.String("dc-location", ag.dcLocation),
zap.String("name", am.member.Name()),
errs.ZapError(err))
am.ResetAllocatorGroup(ag.dcLocation)
am.ResetAllocatorGroup(ag.dcLocation, false)
return
}
}
Expand Down Expand Up @@ -1038,7 +1038,7 @@ func (am *AllocatorManager) PriorityChecker() {
log.Info("next leader key found, resign current leader",
logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0),
zap.Uint64("nextLeaderID", nextLeader))
am.ResetAllocatorGroup(allocatorGroup.dcLocation)
am.ResetAllocatorGroup(allocatorGroup.dcLocation, false)

Check warning on line 1041 in pkg/tso/allocator_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/allocator_manager.go#L1041

Added line #L1041 was not covered by tests
}
}
}
Expand Down Expand Up @@ -1132,13 +1132,13 @@ func (am *AllocatorManager) HandleRequest(ctx context.Context, dcLocation string

// ResetAllocatorGroup will reset the allocator's leadership and TSO initialized in memory.
// It usually should be called before re-triggering an Allocator leader campaign.
func (am *AllocatorManager) ResetAllocatorGroup(dcLocation string) {
func (am *AllocatorManager) ResetAllocatorGroup(dcLocation string, skipResetLeader bool) {
am.mu.Lock()
defer am.mu.Unlock()
if allocatorGroup, exist := am.mu.allocatorGroups[dcLocation]; exist {
allocatorGroup.allocator.Reset()
// Reset if it still has the leadership. Otherwise the data race may occur because of the re-campaigning.
if allocatorGroup.leadership.Check() {
if !skipResetLeader && allocatorGroup.leadership.Check() {
allocatorGroup.leadership.Reset()
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
return
}
defer func() {
gta.am.ResetAllocatorGroup(GlobalDCLocation)
gta.am.ResetAllocatorGroup(GlobalDCLocation, false)
}()

tsoLabel := fmt.Sprintf("TSO Service Group %d", gta.getGroupID())
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1746,7 +1746,7 @@ func (s *Server) campaignLeader() {
return
}
defer func() {
s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation)
s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation, false)
failpoint.Inject("updateAfterResetTSO", func() {
if err = allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
Expand Down

0 comments on commit 88bf631

Please sign in to comment.