diff --git a/pkg/basicserver/basic_server.go b/pkg/basicserver/basic_server.go index afb56c2edd9..28ba3ad08de 100644 --- a/pkg/basicserver/basic_server.go +++ b/pkg/basicserver/basic_server.go @@ -44,5 +44,5 @@ type Server interface { // IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. IsServing() bool // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. - AddServiceReadyCallback(callbacks ...func(context.Context)) + AddServiceReadyCallback(callbacks ...func(context.Context) error) } diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index a9e53f347fa..6d1b872575b 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -101,7 +101,7 @@ func (m *Manager) GetBasicServer() bs.Server { } // Init initializes the resource group manager. -func (m *Manager) Init(ctx context.Context) { +func (m *Manager) Init(ctx context.Context) error { // Todo: If we can modify following configs in the future, we should reload these configs. // Store the controller config into the storage. m.storage.SaveControllerConfig(m.controllerConfig) @@ -156,6 +156,7 @@ func (m *Manager) Init(ctx context.Context) { m.persistLoop(ctx) }() log.Info("resource group manager finishes initialization") + return nil } // AddResourceGroup puts a resource group. diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 40913c2611c..78685850e86 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -71,7 +71,7 @@ type Server struct { service *Service // primaryCallbacks will be called after the server becomes leader. - primaryCallbacks []func(context.Context) + primaryCallbacks []func(context.Context) error serviceRegister *discovery.ServiceRegister } @@ -232,7 +232,7 @@ func (s *Server) IsClosed() bool { } // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 18cc55fbc16..19bd891196f 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -143,12 +143,11 @@ func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConf // AllocID allocates a new ID. func (c *Cluster) AllocID() (uint64, error) { - cli := c.apiServerLeader.Load().(pdpb.PDClient) - if cli == nil { - c.checkMembershipCh <- struct{}{} - return 0, errors.New("API server leader is not found") + client, err := c.getAPIServerLeaderClient() + if err != nil { + return 0, err } - resp, err := cli.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) + resp, err := client.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) if err != nil { c.checkMembershipCh <- struct{}{} return 0, err @@ -156,6 +155,15 @@ func (c *Cluster) AllocID() (uint64, error) { return resp.GetId(), nil } +func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) { + cli := c.apiServerLeader.Load() + if cli == nil { + c.checkMembershipCh <- struct{}{} + return nil, errors.New("API server leader is not found") + } + return cli.(pdpb.PDClient), nil +} + // SwitchAPIServerLeader switches the API server leader. func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { old := c.apiServerLeader.Load() diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index c6b5a550697..e0e5bb9b661 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/mcs/utils" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/configutil" "github.com/tikv/pd/pkg/utils/grpcutil" @@ -239,6 +240,18 @@ func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) { o.schedule.Store(cfg) } +// AdjustScheduleCfg adjusts the schedule config. +func (o *PersistConfig) AdjustScheduleCfg(scheduleCfg *sc.ScheduleConfig) { + // In case we add new default schedulers. + for _, ps := range sc.DefaultSchedulers { + if slice.NoneOf(scheduleCfg.Schedulers, func(i int) bool { + return scheduleCfg.Schedulers[i].Type == ps.Type + }) { + scheduleCfg.Schedulers = append(scheduleCfg.Schedulers, ps) + } + } +} + // GetReplicationConfig returns replication configurations. func (o *PersistConfig) GetReplicationConfig() *sc.ReplicationConfig { return o.replication.Load().(*sc.ReplicationConfig) diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index c9010db69a3..e6e204b8631 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -94,6 +94,7 @@ func (cw *Watcher) initializeConfigWatcher() error { zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) return err } + cw.AdjustScheduleCfg(&cfg.Schedule) cw.SetClusterVersion(&cfg.ClusterVersion) cw.SetScheduleConfig(&cfg.Schedule) cw.SetReplicationConfig(&cfg.Replication) diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index cbc73f79dbf..1e3aea41aa5 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -87,7 +87,8 @@ type Server struct { checkMembershipCh chan struct{} // primaryCallbacks will be called after the server becomes leader. - primaryCallbacks []func(context.Context) + primaryCallbacks []func(context.Context) error + primaryExitCallbacks []func() // for service registry serviceID *discovery.ServiceRegistryEntry @@ -164,6 +165,9 @@ func (s *Server) updateAPIServerMemberLoop() { case <-ticker.C: case <-s.checkMembershipCh: } + if !s.IsServing() { + continue + } members, err := s.GetClient().MemberList(ctx) if err != nil { log.Warn("failed to list members", errs.ZapError(err)) @@ -247,9 +251,16 @@ func (s *Server) campaignLeader() { log.Info("triggering the primary callback functions") for _, cb := range s.primaryCallbacks { - cb(ctx) + if err := cb(ctx); err != nil { + log.Error("failed to trigger the primary callback functions", errs.ZapError(err)) + return + } } - + defer func() { + for _, cb := range s.primaryExitCallbacks { + cb() + } + }() s.participant.EnableLeader() log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name())) @@ -283,10 +294,6 @@ func (s *Server) Close() { utils.StopHTTPServer(s) utils.StopGRPCServer(s) s.GetListener().Close() - s.GetCoordinator().Stop() - s.ruleWatcher.Close() - s.configWatcher.Close() - s.metaWatcher.Close() s.serverLoopCancel() s.serverLoopWg.Wait() @@ -313,10 +320,15 @@ func (s *Server) IsClosed() bool { } // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) } +// AddServiceExitCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. +func (s *Server) AddServiceExitCallback(callbacks ...func()) { + s.primaryExitCallbacks = append(s.primaryExitCallbacks, callbacks...) +} + // GetTLSConfig gets the security config. func (s *Server) GetTLSConfig() *grpcutil.TLSConfig { return &s.cfg.Security.TLSConfig @@ -381,20 +393,10 @@ func (s *Server) startServer() (err error) { ListenUrls: []string{s.cfg.AdvertiseListenAddr}, } s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election") - s.basicCluster = core.NewBasicCluster() - err = s.startWatcher() - if err != nil { - return err - } - s.storage = endpoint.NewStorageEndpoint( - kv.NewEtcdKVBase(s.GetClient(), endpoint.PDRootPath(s.clusterID)), nil) - s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster) - s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh) - if err != nil { - return err - } s.service = &Service{Server: s} + s.AddServiceReadyCallback(s.startCluster) + s.AddServiceExitCallback(s.stopCluster) if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil { return err } @@ -406,7 +408,6 @@ func (s *Server) startServer() (err error) { go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.GetListener()) s.checkMembershipCh <- struct{}{} <-serverReadyChan - go s.GetCoordinator().RunUntilStop() // Run callbacks log.Info("triggering the start callback functions") @@ -429,6 +430,29 @@ func (s *Server) startServer() (err error) { return nil } +func (s *Server) startCluster(context.Context) error { + s.basicCluster = core.NewBasicCluster() + err := s.startWatcher() + if err != nil { + return err + } + s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) + s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster) + s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh) + if err != nil { + return err + } + go s.GetCoordinator().RunUntilStop() + return nil +} + +func (s *Server) stopCluster() { + s.GetCoordinator().Stop() + s.ruleWatcher.Close() + s.configWatcher.Close() + s.metaWatcher.Close() +} + func (s *Server) startWatcher() (err error) { s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster) if err != nil { diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 91d5e480eaa..40958ca463c 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -236,7 +236,7 @@ func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error { // AddServiceReadyCallback implements basicserver. // It adds callbacks when it's ready for providing tso service. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { // Do nothing here. The primary of each keyspace group assigned to this host // will respond to the requests accordingly. } diff --git a/server/server.go b/server/server.go index a72c1c23f0c..34a7883cccd 100644 --- a/server/server.go +++ b/server/server.go @@ -190,7 +190,7 @@ type Server struct { // startCallbacks will be called after the server is started. startCallbacks []func() // leaderCallbacks will be called after the server becomes leader. - leaderCallbacks []func(context.Context) + leaderCallbacks []func(context.Context) error // closeCallbacks will be called before the server is closed. closeCallbacks []func() @@ -1547,7 +1547,7 @@ func (s *Server) IsServing() bool { } // AddServiceReadyCallback adds callbacks when the server becomes the leader if there is embedded etcd, or the primary otherwise. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { s.leaderCallbacks = append(s.leaderCallbacks, callbacks...) } diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index ab612869893..9b5371deb62 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -67,13 +67,16 @@ func (suite *serverTestSuite) TearDownSuite() { func (suite *serverTestSuite) TestAllocID() { re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`)) tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) + time.Sleep(200 * time.Millisecond) id, err := tc.GetPrimaryServer().GetCluster().AllocID() re.NoError(err) re.NotEqual(uint64(0), id) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember")) } func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() { @@ -83,15 +86,32 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() { re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) + time.Sleep(200 * time.Millisecond) cluster := tc.GetPrimaryServer().GetCluster() id, err := cluster.AllocID() re.NoError(err) re.NotEqual(uint64(0), id) suite.cluster.ResignLeader() suite.cluster.WaitLeader() - time.Sleep(time.Second) + time.Sleep(200 * time.Millisecond) id1, err := cluster.AllocID() re.NoError(err) re.Greater(id1, id) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember")) } + +func (suite *serverTestSuite) TestPrimaryChange() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + primary := tc.GetPrimaryServer() + addr := primary.GetAddr() + re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5) + primary.Close() + tc.WaitForPrimaryServing(re) + primary = tc.GetPrimaryServer() + re.NotEqual(addr, primary.GetAddr()) + re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5) +}