diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 27dced57791..b3034a86807 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -166,15 +166,7 @@ func (m *Participant) setLeader(member participant) { // unsetLeader unsets the member's leader. func (m *Participant) unsetLeader() { - var leader participant - switch m.serviceName { - case utils.TSOServiceName: - leader = &tsopb.Participant{} - case utils.SchedulingServiceName: - leader = &schedulingpb.Participant{} - case utils.ResourceManagerServiceName: - leader = &resource_manager.Participant{} - } + leader := NewParticipantByService(m.serviceName) m.leader.Store(leader) m.lastLeaderUpdatedTime.Store(time.Now()) } @@ -225,15 +217,7 @@ func (m *Participant) PreCheckLeader() error { // getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). func (m *Participant) getPersistentLeader() (participant, int64, error) { - var leader participant - switch m.serviceName { - case utils.TSOServiceName: - leader = &tsopb.Participant{} - case utils.SchedulingServiceName: - leader = &schedulingpb.Participant{} - case utils.ResourceManagerServiceName: - leader = &resource_manager.Participant{} - } + leader := NewParticipantByService(m.serviceName) ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader) if err != nil { return nil, 0, err @@ -399,3 +383,16 @@ func (m *Participant) campaignCheck() bool { func (m *Participant) SetCampaignChecker(checker leadershipCheckFunc) { m.campaignChecker.Store(checker) } + +// NewParticipantByService creates a new participant by service name. +func NewParticipantByService(serviceName string) (p participant) { + switch serviceName { + case utils.TSOServiceName: + p = &tsopb.Participant{} + case utils.SchedulingServiceName: + p = &schedulingpb.Participant{} + case utils.ResourceManagerServiceName: + p = &resource_manager.Participant{} + } + return p +} diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 85af79203a4..4b67441a5ac 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -325,6 +325,12 @@ func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string { return path.Join(electionPath, utils.PrimaryKey) } +// SchedulingPrimaryPath returns the path of scheduling primary. +// Path: /ms/{cluster_id}/scheduling/primary +func SchedulingPrimaryPath(clusterID uint64) string { + return path.Join(SchedulingSvcRootPath(clusterID), utils.PrimaryKey) +} + // KeyspaceGroupsElectionPath returns the path of keyspace groups election. // default keyspace group: "/ms/{cluster_id}/tso/00000". // non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}". diff --git a/server/api/server.go b/server/api/server.go index 272e76cc60b..1d881022c04 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -20,6 +20,7 @@ import ( "github.com/gorilla/mux" tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" + mcs "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/serverapi" "github.com/tikv/pd/server" @@ -39,7 +40,9 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP router.PathPrefix(apiPrefix).Handler(negroni.New( serverapi.NewRuntimeServiceValidator(svr, group), serverapi.NewRedirector(svr, serverapi.MicroserviceRedirectRule( - apiPrefix+"/api/v1"+"/admin/reset-ts", tsoapi.APIPathPrefix+"/admin/reset-ts", "tso")), + apiPrefix+"/api/v1"+"/admin/reset-ts", + tsoapi.APIPathPrefix+"/admin/reset-ts", + mcs.TSOServiceName)), negroni.Wrap(r)), ) diff --git a/server/server.go b/server/server.go index b74ca5b57b3..2a076923caf 100644 --- a/server/server.go +++ b/server/server.go @@ -226,10 +226,11 @@ type Server struct { auditBackends []audit.Backend - registry *registry.ServiceRegistry - mode string - servicePrimaryMap sync.Map /* Store as map[string]string */ - tsoPrimaryWatcher *etcdutil.LoopWatcher + registry *registry.ServiceRegistry + mode string + servicePrimaryMap sync.Map /* Store as map[string]string */ + tsoPrimaryWatcher *etcdutil.LoopWatcher + schedulingPrimaryWatcher *etcdutil.LoopWatcher } // HandlerBuilder builds a server HTTP handler. @@ -617,7 +618,7 @@ func (s *Server) startServerLoop(ctx context.Context) { go s.encryptionKeyManagerLoop() if s.IsAPIServiceMode() { s.initTSOPrimaryWatcher() - s.tsoPrimaryWatcher.StartWatchLoop() + s.initSchedulingPrimaryWatcher() } } @@ -1962,8 +1963,20 @@ func (s *Server) initTSOPrimaryWatcher() { serviceName := mcs.TSOServiceName tsoRootPath := endpoint.TSOSvcRootPath(s.clusterID) tsoServicePrimaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, mcs.DefaultKeyspaceGroupID) + s.tsoPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, tsoServicePrimaryKey) + s.tsoPrimaryWatcher.StartWatchLoop() +} + +func (s *Server) initSchedulingPrimaryWatcher() { + serviceName := mcs.SchedulingServiceName + primaryKey := endpoint.SchedulingPrimaryPath(s.clusterID) + s.schedulingPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, primaryKey) + s.schedulingPrimaryWatcher.StartWatchLoop() +} + +func (s *Server) initServicePrimaryWatcher(serviceName string, primaryKey string) *etcdutil.LoopWatcher { putFn := func(kv *mvccpb.KeyValue) error { - primary := &tsopb.Participant{} // TODO: use Generics + primary := member.NewParticipantByService(serviceName) if err := proto.Unmarshal(kv.Value, primary); err != nil { return err } @@ -1971,7 +1984,7 @@ func (s *Server) initTSOPrimaryWatcher() { if len(listenUrls) > 0 { // listenUrls[0] is the primary service endpoint of the keyspace group s.servicePrimaryMap.Store(serviceName, listenUrls[0]) - log.Info("update tso primary", zap.String("primary", listenUrls[0])) + log.Info("update service primary", zap.String("service-name", serviceName), zap.String("primary", listenUrls[0])) } return nil } @@ -1981,16 +1994,17 @@ func (s *Server) initTSOPrimaryWatcher() { if ok { oldPrimary = v.(string) } - log.Info("delete tso primary", zap.String("old-primary", oldPrimary)) + log.Info("delete service primary", zap.String("service-name", serviceName), zap.String("old-primary", oldPrimary)) s.servicePrimaryMap.Delete(serviceName) return nil } - s.tsoPrimaryWatcher = etcdutil.NewLoopWatcher( + name := fmt.Sprintf("%s-primary-watcher", serviceName) + return etcdutil.NewLoopWatcher( s.serverLoopCtx, &s.serverLoopWg, s.client, - "tso-primary-watcher", - tsoServicePrimaryKey, + name, + primaryKey, putFn, deleteFn, func() error { return nil }, diff --git a/tests/cluster.go b/tests/cluster.go index 607955cc6a9..ce8293531cd 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -433,6 +433,11 @@ func (s *TestServer) GetTSOAllocatorManager() *tso.AllocatorManager { return s.server.GetTSOAllocatorManager() } +// GetServicePrimaryAddr returns the primary address of the service. +func (s *TestServer) GetServicePrimaryAddr(ctx context.Context, serviceName string) (string, bool) { + return s.server.GetServicePrimaryAddr(ctx, serviceName) +} + // TestCluster is only for test. type TestCluster struct { config *clusterConfig diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 9b5371deb62..54994bbc34b 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/failpoint" "github.com/stretchr/testify/suite" + mcs "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" "go.uber.org/goleak" @@ -107,11 +108,21 @@ func (suite *serverTestSuite) TestPrimaryChange() { defer tc.Destroy() tc.WaitForPrimaryServing(re) primary := tc.GetPrimaryServer() - addr := primary.GetAddr() + oldPrimaryAddr := primary.GetAddr() re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5) + testutil.Eventually(re, func() bool { + watchedAddr, ok := suite.pdLeader.GetServicePrimaryAddr(suite.ctx, mcs.SchedulingServiceName) + return ok && oldPrimaryAddr == watchedAddr + }) + // transfer leader primary.Close() tc.WaitForPrimaryServing(re) primary = tc.GetPrimaryServer() - re.NotEqual(addr, primary.GetAddr()) + newPrimaryAddr := primary.GetAddr() + re.NotEqual(oldPrimaryAddr, newPrimaryAddr) re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5) + testutil.Eventually(re, func() bool { + watchedAddr, ok := suite.pdLeader.GetServicePrimaryAddr(suite.ctx, mcs.SchedulingServiceName) + return ok && newPrimaryAddr == watchedAddr + }) }