diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index a04d7392426..a51d86c87a2 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -181,6 +181,9 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) { log.Info("the raftcluster is closed, stop to alloc nodes to all keyspace groups") return case <-ticker.C: + if m.nodesBalancer.Len() == 0 { + continue + } } groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0) if err != nil { diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 62a4fb97a57..902a43541d6 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -660,7 +660,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( // Start keepalive the Local TSO Allocator leadership and enable Local TSO service. ctx, cancel := context.WithCancel(loopCtx) defer cancel() - defer am.ResetAllocatorGroup(allocator.GetDCLocation()) + defer am.ResetAllocatorGroup(allocator.GetDCLocation(), false) // Maintain the Local TSO Allocator leader go allocator.KeepAllocatorLeader(ctx) @@ -785,7 +785,7 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) { zap.String("dc-location", ag.dcLocation), zap.String("name", am.member.Name()), errs.ZapError(err)) - am.ResetAllocatorGroup(ag.dcLocation) + am.ResetAllocatorGroup(ag.dcLocation, false) return } } @@ -1038,7 +1038,7 @@ func (am *AllocatorManager) PriorityChecker() { log.Info("next leader key found, resign current leader", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.Uint64("nextLeaderID", nextLeader)) - am.ResetAllocatorGroup(allocatorGroup.dcLocation) + am.ResetAllocatorGroup(allocatorGroup.dcLocation, false) } } } @@ -1132,13 +1132,13 @@ func (am *AllocatorManager) HandleRequest(ctx context.Context, dcLocation string // ResetAllocatorGroup will reset the allocator's leadership and TSO initialized in memory. // It usually should be called before re-triggering an Allocator leader campaign. -func (am *AllocatorManager) ResetAllocatorGroup(dcLocation string) { +func (am *AllocatorManager) ResetAllocatorGroup(dcLocation string, skipResetLeader bool) { am.mu.Lock() defer am.mu.Unlock() if allocatorGroup, exist := am.mu.allocatorGroups[dcLocation]; exist { allocatorGroup.allocator.Reset() // Reset if it still has the leadership. Otherwise the data race may occur because of the re-campaigning. - if allocatorGroup.leadership.Check() { + if !skipResetLeader && allocatorGroup.leadership.Check() { allocatorGroup.leadership.Reset() } } diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index f90dc5f26fe..cf82d58f884 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -617,7 +617,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() { return } defer func() { - gta.am.ResetAllocatorGroup(GlobalDCLocation) + gta.am.ResetAllocatorGroup(GlobalDCLocation, false) }() tsoLabel := fmt.Sprintf("TSO Service Group %d", gta.getGroupID()) diff --git a/server/server.go b/server/server.go index 7e0ee36278f..c10d15e90b1 100644 --- a/server/server.go +++ b/server/server.go @@ -1746,7 +1746,7 @@ func (s *Server) campaignLeader() { return } defer func() { - s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation) + s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation, false) failpoint.Inject("updateAfterResetTSO", func() { if err = allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) { log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))