diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index d3c06ad2cc8..6d939fde540 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -21,7 +21,7 @@ import ( // Discover is used to get all the service instances of the specified service name. func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) { - key := discoveryPath(clusterID, serviceName) + "/" + key := ServicePath(clusterID, serviceName) + "/" endKey := clientv3.GetPrefixRangeEnd(key) withRange := clientv3.WithRange(endKey) diff --git a/pkg/mcs/discovery/key_path.go b/pkg/mcs/discovery/key_path.go index 0e53b21c9fe..4eb339dd5db 100644 --- a/pkg/mcs/discovery/key_path.go +++ b/pkg/mcs/discovery/key_path.go @@ -24,15 +24,17 @@ const ( registryKey = "registry" ) -func registryPath(clusterID, serviceName, serviceAddr string) string { +// RegistryPath returns the full path to store microservice addresses. +func RegistryPath(clusterID, serviceName, serviceAddr string) string { return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey, serviceAddr}, "/") } -func discoveryPath(clusterID, serviceName string) string { +// ServicePath returns the path to store microservice addresses. +func ServicePath(clusterID, serviceName string) string { return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/") } // TSOPath returns the path to store TSO addresses. func TSOPath(clusterID uint64) string { - return discoveryPath(strconv.FormatUint(clusterID, 10), "tso") + "/" + return ServicePath(strconv.FormatUint(clusterID, 10), "tso") + "/" } diff --git a/pkg/mcs/discovery/register.go b/pkg/mcs/discovery/register.go index 617c1520b8d..3e08d9b49cf 100644 --- a/pkg/mcs/discovery/register.go +++ b/pkg/mcs/discovery/register.go @@ -41,7 +41,7 @@ type ServiceRegister struct { // NewServiceRegister creates a new ServiceRegister. func NewServiceRegister(ctx context.Context, cli *clientv3.Client, clusterID, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister { cctx, cancel := context.WithCancel(ctx) - serviceKey := registryPath(clusterID, serviceName, serviceAddr) + serviceKey := RegistryPath(clusterID, serviceName, serviceAddr) return &ServiceRegister{ ctx: cctx, cancel: cancel, diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 1b4b5c36374..a0904f4dc7b 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -539,7 +539,8 @@ func (s *Server) startServer() (err error) { tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID) s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( - s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, legacySvcRootPath, tsoSvcRootPath, s.cfg) + s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, + discovery.TSOPath(s.clusterID), legacySvcRootPath, tsoSvcRootPath, s.cfg) if err := s.keyspaceGroupManager.Initialize(); err != nil { return err } diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index 21a4a655afe..c87cec16a64 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -76,5 +76,8 @@ const ( DefaultKeyspaceGroupReplicaCount = 2 // DefaultKeyspaceGroupReplicaPriority is the default priority of a keyspace group replica. + // It's used in keyspace group primary weighted-election to balance primaries' distribution. + // Among multiple replicas of a keyspace group, the higher the priority, the more likely + // the replica is to be elected as primary. DefaultKeyspaceGroupReplicaPriority = 0 ) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 6ed925f971a..a82376430fa 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "math" "net/http" "path" "regexp" @@ -27,6 +28,7 @@ import ( "time" perrors "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/election" @@ -55,6 +57,10 @@ const ( // mergingCheckInterval is the interval for merging check to see if the keyspace groups // merging process could be moved forward. mergingCheckInterval = 5 * time.Second + // defaultPrimaryPriorityCheckInterval is the default interval for checking if the priorities + // of the primaries on this TSO server/pod have changed. A goroutine will periodically check + // do this check and re-distribute the primaries if necessary. + defaultPrimaryPriorityCheckInterval = 10 * time.Second ) type state struct { @@ -153,6 +159,41 @@ func (s *state) getKeyspaceGroupMetaWithCheck( mcsutils.DefaultKeyspaceGroupID, nil } +func (s *state) getNextPrimaryToReset( + groupID int, localAddress string, +) (member ElectionMember, kg *endpoint.KeyspaceGroup, localPriority, nextGroupID int) { + s.RLock() + defer s.RUnlock() + + // Both s.ams and s.kgs are arrays with the fixed size defined by the const value MaxKeyspaceGroupCountInUse. + groupSize := int(mcsutils.MaxKeyspaceGroupCountInUse) + groupID %= groupSize + for j := 0; j < groupSize; groupID, j = (groupID+1)%groupSize, j+1 { + am := s.ams[groupID] + kg := s.kgs[groupID] + if am != nil && kg != nil && am.GetMember().IsLeader() { + maxPriority := math.MinInt32 + localPriority := math.MaxInt32 + for _, member := range kg.Members { + if member.Priority > maxPriority { + maxPriority = member.Priority + } + if member.Address == localAddress { + localPriority = member.Priority + } + } + + if localPriority < maxPriority { + // return here and reset the primary outside of the critical section + // as resetting the primary may take some time. + return am.GetMember(), kg, localPriority, (groupID + 1) % groupSize + } + } + } + + return nil, nil, 0, groupID +} + // kgPrimaryPathBuilder builds the path for keyspace group primary election. // default keyspace group: "/ms/{cluster_id}/tso/00000/primary". // non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary". @@ -198,6 +239,10 @@ type KeyspaceGroupManager struct { // which participate in the election of its keyspace group's primary, in the format of // "electionNamePrefix:keyspace-group-id" electionNamePrefix string + // tsoServiceKey is the path for storing the registered tso servers. + // Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress} + // Value: discover.ServiceRegistryEntry + tsoServiceKey string // legacySvcRootPath defines the legacy root path for all etcd paths which derives from // the PD/API service. It's in the format of "/pd/{cluster_id}". // The main paths for different usages include: @@ -238,17 +283,26 @@ type KeyspaceGroupManager struct { loadKeyspaceGroupsBatchSize int64 loadFromEtcdMaxRetryTimes int - // compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id in the - // keyspace group membership path. + // compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id + // in the keyspace group membership path. compiledKGMembershipIDRegexp *regexp.Regexp // groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry. groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup groupWatcher *etcdutil.LoopWatcher - primaryPathBuilder *kgPrimaryPathBuilder - // mergeCheckerCancelMap is the cancel function map for the merge checker of each keyspace group. mergeCheckerCancelMap sync.Map // GroupID -> context.CancelFunc + + primaryPathBuilder *kgPrimaryPathBuilder + primaryPriorityCheckInterval time.Duration + + // tsoNodes is the registered tso servers. + tsoNodes sync.Map // store as map[string]struct{} + // serviceRegistryMap stores the mapping from the service registry key to the service address. + // Note: it is only used in tsoNodesWatcher. + serviceRegistryMap map[string]string + // tsoNodesWatcher is the watcher for the registered tso servers. + tsoNodesWatcher *etcdutil.LoopWatcher } // NewKeyspaceGroupManager creates a new Keyspace Group Manager. @@ -258,6 +312,7 @@ func NewKeyspaceGroupManager( etcdClient *clientv3.Client, httpClient *http.Client, electionNamePrefix string, + tsoServiceKey string, legacySvcRootPath string, tsoSvcRootPath string, cfg ServiceConfig, @@ -270,16 +325,19 @@ func NewKeyspaceGroupManager( ctx, cancel := context.WithCancel(ctx) kgm := &KeyspaceGroupManager{ - ctx: ctx, - cancel: cancel, - tsoServiceID: tsoServiceID, - etcdClient: etcdClient, - httpClient: httpClient, - electionNamePrefix: electionNamePrefix, - legacySvcRootPath: legacySvcRootPath, - tsoSvcRootPath: tsoSvcRootPath, - cfg: cfg, - groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup), + ctx: ctx, + cancel: cancel, + tsoServiceID: tsoServiceID, + etcdClient: etcdClient, + httpClient: httpClient, + electionNamePrefix: electionNamePrefix, + tsoServiceKey: tsoServiceKey, + legacySvcRootPath: legacySvcRootPath, + tsoSvcRootPath: tsoSvcRootPath, + primaryPriorityCheckInterval: defaultPrimaryPriorityCheckInterval, + cfg: cfg, + groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup), + serviceRegistryMap: make(map[string]string), } kgm.legacySvcStorage = endpoint.NewStorageEndpoint( kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil) @@ -296,6 +354,100 @@ func NewKeyspaceGroupManager( // Initialize this KeyspaceGroupManager func (kgm *KeyspaceGroupManager) Initialize() error { + if err := kgm.InitializeTSOServerWatchLoop(); err != nil { + log.Error("failed to initialize tso server watch loop", zap.Error(err)) + kgm.Close() // Close the manager to clean up the allocated resources. + return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err) + } + if err := kgm.InitializeGroupWatchLoop(); err != nil { + log.Error("failed to initialize group watch loop", zap.Error(err)) + kgm.Close() // Close the manager to clean up the loaded keyspace groups. + return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err) + } + + kgm.wg.Add(1) + go kgm.primaryPriorityCheckLoop() + + return nil +} + +// Close this KeyspaceGroupManager +func (kgm *KeyspaceGroupManager) Close() { + log.Info("closing keyspace group manager") + + // Note: don't change the order. We need to cancel all service loops in the keyspace group manager + // before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups + // during critical periods such as service shutdown and online keyspace group, while the former requires + // snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is + // added/initialized after that. + kgm.cancel() + kgm.wg.Wait() + kgm.state.deinitialize() + + log.Info("keyspace group manager closed") +} + +// GetServiceConfig returns the service config. +func (kgm *KeyspaceGroupManager) GetServiceConfig() ServiceConfig { + return kgm.cfg +} + +// InitializeTSOServerWatchLoop initializes the watch loop monitoring the path for storing the +// registered tso servers. +// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress} +// Value: discover.ServiceRegistryEntry +func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error { + tsoServiceEndKey := clientv3.GetPrefixRangeEnd(kgm.tsoServiceKey) + "/" + + putFn := func(kv *mvccpb.KeyValue) error { + s := &discovery.ServiceRegistryEntry{} + if err := json.Unmarshal(kv.Value, s); err != nil { + log.Warn("failed to unmarshal service registry entry", + zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) + return err + } + kgm.tsoNodes.Store(s.ServiceAddr, struct{}{}) + kgm.serviceRegistryMap[string(kv.Key)] = s.ServiceAddr + return nil + } + deleteFn := func(kv *mvccpb.KeyValue) error { + key := string(kv.Key) + if serviceAddr, ok := kgm.serviceRegistryMap[key]; ok { + delete(kgm.serviceRegistryMap, key) + kgm.tsoNodes.Delete(serviceAddr) + return nil + } + return perrors.Errorf("failed to find the service address for key %s", key) + } + + kgm.tsoNodesWatcher = etcdutil.NewLoopWatcher( + kgm.ctx, + &kgm.wg, + kgm.etcdClient, + "tso-nodes-watcher", + kgm.tsoServiceKey, + putFn, + deleteFn, + func() error { return nil }, + clientv3.WithRange(tsoServiceEndKey), + ) + + kgm.wg.Add(1) + go kgm.tsoNodesWatcher.StartWatchLoop() + + if err := kgm.tsoNodesWatcher.WaitLoad(); err != nil { + log.Error("failed to load the registered tso servers", errs.ZapError(err)) + return err + } + + return nil +} + +// InitializeGroupWatchLoop initializes the watch loop monitoring the path for storing keyspace group +// membership/distribution metadata. +// Key: /pd/{cluster_id}/tso/keyspace_groups/membership/{group} +// Value: endpoint.KeyspaceGroup +func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { rootPath := kgm.legacySvcRootPath startKey := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(mcsutils.DefaultKeyspaceGroupID)}, "/") endKey := strings.Join( @@ -375,20 +527,66 @@ func (kgm *KeyspaceGroupManager) Initialize() error { return nil } -// Close this KeyspaceGroupManager -func (kgm *KeyspaceGroupManager) Close() { - log.Info("closing keyspace group manager") +func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() { + defer logutil.LogPanic() + defer kgm.wg.Done() - // Note: don't change the order. We need to cancel all service loops in the keyspace group manager - // before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups - // during critical periods such as service shutdown and online keyspace group, while the former requires - // snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is - // added/initialized after that. - kgm.cancel() - kgm.wg.Wait() - kgm.state.deinitialize() + failpoint.Inject("fastPrimaryPriorityCheck", func() { + kgm.primaryPriorityCheckInterval = 200 * time.Millisecond + }) - log.Info("keyspace group manager closed") + ctx, cancel := context.WithCancel(kgm.ctx) + defer cancel() + groupID := 0 + for { + select { + case <-ctx.Done(): + log.Info("exit primary priority check loop") + return + case <-time.After(kgm.primaryPriorityCheckInterval): + // Every primaryPriorityCheckInterval, we only reset the primary of one keyspace group + member, kg, localPriority, nextGroupID := kgm.getNextPrimaryToReset(groupID, kgm.tsoServiceID.ServiceAddr) + if member != nil { + aliveTSONodes := make(map[string]struct{}) + kgm.tsoNodes.Range(func(key, _ interface{}) bool { + aliveTSONodes[key.(string)] = struct{}{} + return true + }) + if len(aliveTSONodes) == 0 { + log.Warn("no alive tso node", zap.String("local-address", kgm.tsoServiceID.ServiceAddr)) + continue + } + // If there is a alive member with higher priority, reset the leader. + resetLeader := false + for _, member := range kg.Members { + if member.Priority <= localPriority { + continue + } + if _, ok := aliveTSONodes[member.Address]; ok { + resetLeader = true + break + } + } + if resetLeader { + select { + case <-ctx.Done(): + default: + member.ResetLeader() + log.Info("reset primary", + zap.String("local-address", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("keyspace-group-id", kg.ID), + zap.Int("local-priority", localPriority)) + } + } else { + log.Warn("no need to reset primary as the replicas with higher priority are offline", + zap.String("local-address", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("keyspace-group-id", kg.ID), + zap.Int("local-priority", localPriority)) + } + } + groupID = nextGroupID + } + } } func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool { diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 1e7d072ade3..2e03418bae7 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -22,7 +22,6 @@ import ( "path" "reflect" "sort" - "strconv" "strings" "sync" "testing" @@ -36,8 +35,9 @@ import ( "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" - "github.com/tikv/pd/pkg/utils/memberutil" + "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/utils/tsoutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/goleak" @@ -51,6 +51,7 @@ type keyspaceGroupManagerTestSuite struct { suite.Suite ctx context.Context cancel context.CancelFunc + ClusterID uint64 backendEndpoints string etcdClient *clientv3.Client clean func() @@ -64,13 +65,23 @@ func TestKeyspaceGroupManagerTestSuite(t *testing.T) { func (suite *keyspaceGroupManagerTestSuite) SetupSuite() { t := suite.T() suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.ClusterID = rand.Uint64() suite.backendEndpoints, suite.etcdClient, suite.clean = startEmbeddedEtcd(t) + suite.cfg = suite.createConfig() +} + +func (suite *keyspaceGroupManagerTestSuite) TearDownSuite() { + suite.clean() + suite.cancel() +} - suite.cfg = &TestServiceConfig{ - Name: "tso-test-name", +func (suite *keyspaceGroupManagerTestSuite) createConfig() *TestServiceConfig { + addr := tempurl.Alloc() + return &TestServiceConfig{ + Name: "tso-test-name-default", BackendEndpoints: suite.backendEndpoints, - ListenAddr: "http://127.0.0.1:3379", - AdvertiseListenAddr: "http://127.0.0.1:3379", + ListenAddr: addr, + AdvertiseListenAddr: addr, LeaderLease: mcsutils.DefaultLeaderLease, LocalTSOEnabled: false, TSOUpdatePhysicalInterval: 50 * time.Millisecond, @@ -80,11 +91,6 @@ func (suite *keyspaceGroupManagerTestSuite) SetupSuite() { } } -func (suite *keyspaceGroupManagerTestSuite) TearDownSuite() { - suite.clean() - suite.cancel() -} - // TestNewKeyspaceGroupManager tests the initialization of KeyspaceGroupManager. // It should initialize the allocator manager with the desired configurations and parameters. func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { @@ -92,11 +98,15 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: suite.cfg.AdvertiseListenAddr} guid := uuid.New().String() + tsoServiceKey := discovery.ServicePath(guid, "tso") + "/" legacySvcRootPath := path.Join("/pd", guid) tsoSvcRootPath := path.Join("/ms", guid, "tso") electionNamePrefix := "tso-server-" + guid - kgm := suite.newKeyspaceGroupManager(tsoServiceID, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath) + kgm := NewKeyspaceGroupManager( + suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, + tsoServiceKey, legacySvcRootPath, tsoSvcRootPath, suite.cfg) + defer kgm.Close() err := kgm.Initialize() re.NoError(err) @@ -116,8 +126,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { re.Equal(legacySvcRootPath, am.rootPath) re.Equal(time.Duration(mcsutils.DefaultLeaderLease)*time.Second, am.saveInterval) re.Equal(time.Duration(50)*time.Millisecond, am.updatePhysicalInterval) - - kgm.Close() } // TestLoadKeyspaceGroupsAssignment tests the loading of the keyspace group assignment. @@ -174,8 +182,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() { defer mgr.Close() addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0}) // Set the timeout to 1 second and inject the delayLoad to return 3 seconds to let // the loading sleep 3 seconds. @@ -197,8 +205,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsSucceedWithTem defer mgr.Close() addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0}) // Set the max retry times to 3 and inject the loadTemporaryFail to return 2 to let // loading from etcd fail 2 times but the whole initialization still succeeds. @@ -219,8 +227,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsFailed() { defer mgr.Close() addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0}) // Set the max retry times to 3 and inject the loadTemporaryFail to return 3 to let // loading from etcd fail 3 times which should cause the whole initialization to fail. @@ -388,9 +396,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestGetKeyspaceGroupMetaWithCheck() // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(0), []uint32{0, 1, 2}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0, 1, 2}) err = mgr.Initialize() re.NoError(err) @@ -461,14 +468,12 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 1, 2}) + suite.ctx, suite.etcdClient, mcsutils.DefaultKeyspaceGroupID, rootPath, + []string{svcAddr}, []int{0}, []uint32{mcsutils.DefaultKeyspaceID, 1, 2}) // Create keyspace group 3 which contains keyspace 3, 4. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(3), []uint32{3, 4}) + suite.ctx, suite.etcdClient, uint32(3), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{3, 4}) err = mgr.Initialize() re.NoError(err) @@ -536,14 +541,12 @@ func (suite *keyspaceGroupManagerTestSuite) TestKeyspaceMovementConsistency() { // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 10, 20}) + suite.ctx, suite.etcdClient, mcsutils.DefaultKeyspaceGroupID, + rootPath, []string{svcAddr}, []int{0}, []uint32{mcsutils.DefaultKeyspaceID, 10, 20}) // Create keyspace group 1 which contains keyspace 3, 4. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(1), []uint32{11, 21}) + suite.ctx, suite.etcdClient, uint32(1), rootPath, + []string{svcAddr}, []int{0}, []uint32{11, 21}) err = mgr.Initialize() re.NoError(err) @@ -591,9 +594,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembers // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(0), []uint32{0, 1, 2}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0, 1, 2}) err := mgr.Initialize() re.NoError(err) @@ -681,15 +683,6 @@ func (suite *keyspaceGroupManagerTestSuite) applyEtcdEvents( } } -func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( - tsoServiceID *discovery.ServiceRegistryEntry, - electionNamePrefix, legacySvcRootPath, tsoSvcRootPath string, -) *KeyspaceGroupManager { - return NewKeyspaceGroupManager( - suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, - legacySvcRootPath, tsoSvcRootPath, suite.cfg) -} - // runTestLoadMultipleKeyspaceGroupsAssignment tests the loading of multiple keyspace group assignment. func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( re *require.Assertions, @@ -727,10 +720,16 @@ func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( expectedGroupIDs = append(expectedGroupIDs, uint32(j)) mux.Unlock() } + + svcAddrs := make([]string, 0) + if assignToMe { + svcAddrs = append(svcAddrs, mgr.tsoServiceID.ServiceAddr) + } else { + svcAddrs = append(svcAddrs, uuid.NewString()) + } addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, - assignToMe, mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(j), []uint32{uint32(j)}) + suite.ctx, suite.etcdClient, uint32(j), mgr.legacySvcRootPath, + svcAddrs, []int{0}, []uint32{uint32(j)}) } }(i) } @@ -756,19 +755,27 @@ func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( func (suite *keyspaceGroupManagerTestSuite) newUniqueKeyspaceGroupManager( loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value ) *KeyspaceGroupManager { - tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: suite.cfg.AdvertiseListenAddr} - uniqueID := memberutil.GenerateUniqueID(uuid.New().String()) - uniqueStr := strconv.FormatUint(uniqueID, 10) + return suite.newKeyspaceGroupManager(loadKeyspaceGroupsBatchSize, uuid.New().String(), suite.cfg) +} + +func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( + loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value + uniqueStr string, + cfg *TestServiceConfig, +) *KeyspaceGroupManager { + tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.GetAdvertiseListenAddr()} + tsoServiceKey := discovery.ServicePath(uniqueStr, "tso") + "/" legacySvcRootPath := path.Join("/pd", uniqueStr) tsoSvcRootPath := path.Join("/ms", uniqueStr, "tso") - electionNamePrefix := "kgm-test-" + uniqueStr - - keyspaceGroupManager := suite.newKeyspaceGroupManager(tsoServiceID, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath) + electionNamePrefix := "kgm-test-" + cfg.GetAdvertiseListenAddr() + kgm := NewKeyspaceGroupManager( + suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, + tsoServiceKey, legacySvcRootPath, tsoSvcRootPath, cfg) if loadKeyspaceGroupsBatchSize != 0 { - keyspaceGroupManager.loadKeyspaceGroupsBatchSize = loadKeyspaceGroupsBatchSize + kgm.loadKeyspaceGroupsBatchSize = loadKeyspaceGroupsBatchSize } - return keyspaceGroupManager + return kgm } // putKeyspaceGroupToEtcd puts a keyspace group to etcd. @@ -805,19 +812,21 @@ func deleteKeyspaceGroupInEtcd( // addKeyspaceGroupAssignment adds a keyspace group assignment to etcd. func addKeyspaceGroupAssignment( - ctx context.Context, etcdClient *clientv3.Client, - assignToMe bool, rootPath, svcAddr string, - groupID uint32, keyspaces []uint32, + ctx context.Context, + etcdClient *clientv3.Client, + groupID uint32, + rootPath string, + svcAddrs []string, + priorites []int, + keyspaces []uint32, ) error { - var location string - if assignToMe { - location = svcAddr - } else { - location = uuid.NewString() + members := make([]endpoint.KeyspaceGroupMember, len(svcAddrs)) + for i, svcAddr := range svcAddrs { + members[i] = endpoint.KeyspaceGroupMember{Address: svcAddr, Priority: priorites[i]} } group := &endpoint.KeyspaceGroup{ ID: groupID, - Members: []endpoint.KeyspaceGroupMember{{Address: location}}, + Members: members, Keyspaces: keyspaces, } @@ -990,3 +999,185 @@ func (suite *keyspaceGroupManagerTestSuite) TestGroupSplitUpdateRetry() { return reflect.DeepEqual(expectedGroupIDs, assignedGroupIDs) }) } + +// TestPrimaryPriorityChange tests the case that the primary priority of a keyspace group changes +// and the locations of the primaries should be updated accordingly. +func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastPrimaryPriorityCheck", `return(true)`)) + defer func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastPrimaryPriorityCheck")) + }() + + var err error + defaultPriority := mcsutils.DefaultKeyspaceGroupReplicaPriority + uniqueStr := uuid.New().String() + rootPath := path.Join("/pd", uniqueStr) + cfg1 := suite.createConfig() + cfg2 := suite.createConfig() + svcAddr1 := cfg1.GetAdvertiseListenAddr() + svcAddr2 := cfg2.GetAdvertiseListenAddr() + + // Register TSO server 1 + err = suite.registerTSOServer(re, uniqueStr, svcAddr1, cfg1) + re.NoError(err) + defer func() { + re.NoError(suite.deregisterTSOServer(uniqueStr, svcAddr1)) + }() + + // Create three keyspace groups on two TSO servers with default replica priority. + ids := []uint32{0, mcsutils.MaxKeyspaceGroupCountInUse / 2, mcsutils.MaxKeyspaceGroupCountInUse - 1} + for _, id := range ids { + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, id, rootPath, + []string{svcAddr1, svcAddr2}, []int{defaultPriority, defaultPriority}, []uint32{id}) + } + + // Create the first TSO server which loads all three keyspace groups created above. + // All primaries should be on the first TSO server. + mgr1 := suite.newKeyspaceGroupManager(1, uniqueStr, cfg1) + re.NotNil(mgr1) + defer mgr1.Close() + err = mgr1.Initialize() + re.NoError(err) + // Wait until all keyspace groups are ready for serving tso requests. + waitForPrimariesServing(re, []*KeyspaceGroupManager{mgr1, mgr1, mgr1}, ids) + + // We increase the priority of the TSO server 2 which hasn't started yet. The primaries + // on the TSO server 1 shouldn't move. + for _, id := range ids { + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, id, rootPath, + []string{svcAddr1, svcAddr2}, []int{defaultPriority, defaultPriority + 1}, []uint32{id}) + } + + // And the primaries on TSO Server 1 should continue to serve TSO requests without any failures. + for i := 0; i < 100; i++ { + for _, id := range ids { + _, keyspaceGroupBelongTo, err := mgr1.HandleTSORequest(id, id, GlobalDCLocation, 1) + re.NoError(err) + re.Equal(id, keyspaceGroupBelongTo) + } + } + + // Continually sending TSO requests to the TSO server 1 to make sure the primaries will move back + // to it at the end of test + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + checkTSO(ctx, re, &wg, mgr1, ids) + + // Create the Second TSO server. + err = suite.registerTSOServer(re, uniqueStr, svcAddr2, cfg2) + re.NoError(err) + mgr2 := suite.newKeyspaceGroupManager(1, uniqueStr, cfg2) + re.NotNil(mgr2) + err = mgr2.Initialize() + re.NoError(err) + // All primaries should eventually move to the second TSO server because of the higher priority. + waitForPrimariesServing(re, []*KeyspaceGroupManager{mgr2, mgr2, mgr2}, ids) + + // Shutdown the second TSO server. + mgr2.Close() + re.NoError(suite.deregisterTSOServer(uniqueStr, svcAddr2)) + // The primaries should move back to the first TSO server. + waitForPrimariesServing(re, []*KeyspaceGroupManager{mgr1, mgr1, mgr1}, ids) + + // Restart the Second TSO server. + err = suite.registerTSOServer(re, uniqueStr, svcAddr2, cfg2) + re.NoError(err) + defer func() { + re.NoError(suite.deregisterTSOServer(uniqueStr, svcAddr2)) + }() + mgr2 = suite.newKeyspaceGroupManager(1, uniqueStr, cfg2) + re.NotNil(mgr2) + defer mgr2.Close() + err = mgr2.Initialize() + re.NoError(err) + // All primaries should eventually move to the second TSO server because of the higher priority. + waitForPrimariesServing(re, []*KeyspaceGroupManager{mgr2, mgr2, mgr2}, ids) + + mgrs := []*KeyspaceGroupManager{mgr2, mgr2, mgr2} + for i, id := range ids { + // Set the keyspace group replica on the first TSO server to have higher priority. + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, id, rootPath, + []string{svcAddr1, svcAddr2}, []int{defaultPriority - 1, defaultPriority - 2}, []uint32{id}) + // The primary of this keyspace group should move back to the first TSO server. + mgrs[i] = mgr1 + waitForPrimariesServing(re, mgrs, ids) + } + + cancel() + wg.Wait() +} + +// Register TSO server. +func (suite *keyspaceGroupManagerTestSuite) registerTSOServer( + re *require.Assertions, clusterID, svcAddr string, cfg *TestServiceConfig, +) error { + // Register TSO server 1 + serviceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.GetAdvertiseListenAddr()} + serializedEntry, err := serviceID.Serialize() + re.NoError(err) + serviceKey := discovery.RegistryPath(clusterID, mcsutils.TSOServiceName, svcAddr) + _, err = suite.etcdClient.Put(suite.ctx, serviceKey, serializedEntry) + return err +} + +// Deregister TSO server. +func (suite *keyspaceGroupManagerTestSuite) deregisterTSOServer(clusterID, svcAddr string) error { + serviceKey := discovery.RegistryPath(clusterID, mcsutils.TSOServiceName, svcAddr) + if _, err := suite.etcdClient.Delete(suite.ctx, serviceKey); err != nil { + return err + } + return nil +} + +func checkTSO( + ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, + mgr *KeyspaceGroupManager, ids []uint32, +) { + wg.Add(len(ids)) + for _, id := range ids { + go func(id uint32) { + defer wg.Done() + var ts, lastTS uint64 + for { + select { + case <-ctx.Done(): + // Make sure the lastTS is not empty + re.NotEmpty(lastTS) + return + default: + } + respTS, respGroupID, err := mgr.HandleTSORequest(id, id, GlobalDCLocation, 1) + // omit the error check since there are many kinds of errors during primaries movement + if err != nil { + continue + } + re.Equal(id, respGroupID) + ts = tsoutil.ComposeTS(respTS.Physical, respTS.Logical) + re.Less(lastTS, ts) + lastTS = ts + } + }(id) + } +} + +func waitForPrimariesServing( + re *require.Assertions, mgrs []*KeyspaceGroupManager, ids []uint32, +) { + testutil.Eventually(re, func() bool { + for i := 0; i < 100; i++ { + for j, id := range ids { + if member, err := mgrs[j].GetElectionMember(id, id); err != nil || !member.IsLeader() { + return false + } + if _, _, err := mgrs[j].HandleTSORequest(id, id, GlobalDCLocation, 1); err != nil { + return false + } + } + } + return true + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) +} diff --git a/tests/integrations/mcs/cluster.go b/tests/integrations/mcs/cluster.go index 3cda8c1888a..65a6bf293c3 100644 --- a/tests/integrations/mcs/cluster.go +++ b/tests/integrations/mcs/cluster.go @@ -202,7 +202,8 @@ func (tc *TestTSOCluster) GetServers() map[string]*tso.Server { func (tc *TestTSOCluster) GetKeyspaceGroupMember() (members []endpoint.KeyspaceGroupMember) { for _, server := range tc.servers { members = append(members, endpoint.KeyspaceGroupMember{ - Address: server.GetAddr(), + Address: server.GetAddr(), + Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority, }) } return diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index d265d8fc73b..98c6b90ca28 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -497,7 +497,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) // Init api server config but not start. - tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) { + tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{ "keyspace_a", "keyspace_b", } @@ -536,7 +536,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { // Wait pd clients are ready. testutil.Eventually(re, func() bool { count := 0 - clients.Range(func(key, value interface{}) bool { + clients.Range(func(_, _ interface{}) bool { count++ return true })