From 83369eb08afce224e5bd623993ca24a43907d441 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 11 Sep 2023 14:36:10 +0800 Subject: [PATCH 1/7] mcs: use memory storage (#7061) ref tikv/pd#5839 Signed-off-by: Ryan Leung --- pkg/basicserver/basic_server.go | 2 +- pkg/mcs/resourcemanager/server/manager.go | 3 +- pkg/mcs/resourcemanager/server/server.go | 4 +- pkg/mcs/scheduling/server/cluster.go | 18 +++-- pkg/mcs/scheduling/server/config/config.go | 13 ++++ pkg/mcs/scheduling/server/config/watcher.go | 1 + pkg/mcs/scheduling/server/server.go | 66 +++++++++++++------ pkg/mcs/tso/server/server.go | 2 +- server/server.go | 4 +- .../mcs/scheduling/server_test.go | 22 ++++++- 10 files changed, 101 insertions(+), 34 deletions(-) diff --git a/pkg/basicserver/basic_server.go b/pkg/basicserver/basic_server.go index afb56c2edd9..28ba3ad08de 100644 --- a/pkg/basicserver/basic_server.go +++ b/pkg/basicserver/basic_server.go @@ -44,5 +44,5 @@ type Server interface { // IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. IsServing() bool // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. - AddServiceReadyCallback(callbacks ...func(context.Context)) + AddServiceReadyCallback(callbacks ...func(context.Context) error) } diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index a9e53f347fa..6d1b872575b 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -101,7 +101,7 @@ func (m *Manager) GetBasicServer() bs.Server { } // Init initializes the resource group manager. -func (m *Manager) Init(ctx context.Context) { +func (m *Manager) Init(ctx context.Context) error { // Todo: If we can modify following configs in the future, we should reload these configs. // Store the controller config into the storage. m.storage.SaveControllerConfig(m.controllerConfig) @@ -156,6 +156,7 @@ func (m *Manager) Init(ctx context.Context) { m.persistLoop(ctx) }() log.Info("resource group manager finishes initialization") + return nil } // AddResourceGroup puts a resource group. diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 40913c2611c..78685850e86 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -71,7 +71,7 @@ type Server struct { service *Service // primaryCallbacks will be called after the server becomes leader. - primaryCallbacks []func(context.Context) + primaryCallbacks []func(context.Context) error serviceRegister *discovery.ServiceRegister } @@ -232,7 +232,7 @@ func (s *Server) IsClosed() bool { } // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 18cc55fbc16..19bd891196f 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -143,12 +143,11 @@ func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConf // AllocID allocates a new ID. func (c *Cluster) AllocID() (uint64, error) { - cli := c.apiServerLeader.Load().(pdpb.PDClient) - if cli == nil { - c.checkMembershipCh <- struct{}{} - return 0, errors.New("API server leader is not found") + client, err := c.getAPIServerLeaderClient() + if err != nil { + return 0, err } - resp, err := cli.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) + resp, err := client.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) if err != nil { c.checkMembershipCh <- struct{}{} return 0, err @@ -156,6 +155,15 @@ func (c *Cluster) AllocID() (uint64, error) { return resp.GetId(), nil } +func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) { + cli := c.apiServerLeader.Load() + if cli == nil { + c.checkMembershipCh <- struct{}{} + return nil, errors.New("API server leader is not found") + } + return cli.(pdpb.PDClient), nil +} + // SwitchAPIServerLeader switches the API server leader. func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { old := c.apiServerLeader.Load() diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index c6b5a550697..e0e5bb9b661 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/mcs/utils" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/configutil" "github.com/tikv/pd/pkg/utils/grpcutil" @@ -239,6 +240,18 @@ func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) { o.schedule.Store(cfg) } +// AdjustScheduleCfg adjusts the schedule config. +func (o *PersistConfig) AdjustScheduleCfg(scheduleCfg *sc.ScheduleConfig) { + // In case we add new default schedulers. + for _, ps := range sc.DefaultSchedulers { + if slice.NoneOf(scheduleCfg.Schedulers, func(i int) bool { + return scheduleCfg.Schedulers[i].Type == ps.Type + }) { + scheduleCfg.Schedulers = append(scheduleCfg.Schedulers, ps) + } + } +} + // GetReplicationConfig returns replication configurations. func (o *PersistConfig) GetReplicationConfig() *sc.ReplicationConfig { return o.replication.Load().(*sc.ReplicationConfig) diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index c9010db69a3..e6e204b8631 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -94,6 +94,7 @@ func (cw *Watcher) initializeConfigWatcher() error { zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) return err } + cw.AdjustScheduleCfg(&cfg.Schedule) cw.SetClusterVersion(&cfg.ClusterVersion) cw.SetScheduleConfig(&cfg.Schedule) cw.SetReplicationConfig(&cfg.Replication) diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index cbc73f79dbf..1e3aea41aa5 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -87,7 +87,8 @@ type Server struct { checkMembershipCh chan struct{} // primaryCallbacks will be called after the server becomes leader. - primaryCallbacks []func(context.Context) + primaryCallbacks []func(context.Context) error + primaryExitCallbacks []func() // for service registry serviceID *discovery.ServiceRegistryEntry @@ -164,6 +165,9 @@ func (s *Server) updateAPIServerMemberLoop() { case <-ticker.C: case <-s.checkMembershipCh: } + if !s.IsServing() { + continue + } members, err := s.GetClient().MemberList(ctx) if err != nil { log.Warn("failed to list members", errs.ZapError(err)) @@ -247,9 +251,16 @@ func (s *Server) campaignLeader() { log.Info("triggering the primary callback functions") for _, cb := range s.primaryCallbacks { - cb(ctx) + if err := cb(ctx); err != nil { + log.Error("failed to trigger the primary callback functions", errs.ZapError(err)) + return + } } - + defer func() { + for _, cb := range s.primaryExitCallbacks { + cb() + } + }() s.participant.EnableLeader() log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name())) @@ -283,10 +294,6 @@ func (s *Server) Close() { utils.StopHTTPServer(s) utils.StopGRPCServer(s) s.GetListener().Close() - s.GetCoordinator().Stop() - s.ruleWatcher.Close() - s.configWatcher.Close() - s.metaWatcher.Close() s.serverLoopCancel() s.serverLoopWg.Wait() @@ -313,10 +320,15 @@ func (s *Server) IsClosed() bool { } // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) } +// AddServiceExitCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. +func (s *Server) AddServiceExitCallback(callbacks ...func()) { + s.primaryExitCallbacks = append(s.primaryExitCallbacks, callbacks...) +} + // GetTLSConfig gets the security config. func (s *Server) GetTLSConfig() *grpcutil.TLSConfig { return &s.cfg.Security.TLSConfig @@ -381,20 +393,10 @@ func (s *Server) startServer() (err error) { ListenUrls: []string{s.cfg.AdvertiseListenAddr}, } s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election") - s.basicCluster = core.NewBasicCluster() - err = s.startWatcher() - if err != nil { - return err - } - s.storage = endpoint.NewStorageEndpoint( - kv.NewEtcdKVBase(s.GetClient(), endpoint.PDRootPath(s.clusterID)), nil) - s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster) - s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh) - if err != nil { - return err - } s.service = &Service{Server: s} + s.AddServiceReadyCallback(s.startCluster) + s.AddServiceExitCallback(s.stopCluster) if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil { return err } @@ -406,7 +408,6 @@ func (s *Server) startServer() (err error) { go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.GetListener()) s.checkMembershipCh <- struct{}{} <-serverReadyChan - go s.GetCoordinator().RunUntilStop() // Run callbacks log.Info("triggering the start callback functions") @@ -429,6 +430,29 @@ func (s *Server) startServer() (err error) { return nil } +func (s *Server) startCluster(context.Context) error { + s.basicCluster = core.NewBasicCluster() + err := s.startWatcher() + if err != nil { + return err + } + s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) + s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster) + s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh) + if err != nil { + return err + } + go s.GetCoordinator().RunUntilStop() + return nil +} + +func (s *Server) stopCluster() { + s.GetCoordinator().Stop() + s.ruleWatcher.Close() + s.configWatcher.Close() + s.metaWatcher.Close() +} + func (s *Server) startWatcher() (err error) { s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster) if err != nil { diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 91d5e480eaa..40958ca463c 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -236,7 +236,7 @@ func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error { // AddServiceReadyCallback implements basicserver. // It adds callbacks when it's ready for providing tso service. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { // Do nothing here. The primary of each keyspace group assigned to this host // will respond to the requests accordingly. } diff --git a/server/server.go b/server/server.go index a72c1c23f0c..34a7883cccd 100644 --- a/server/server.go +++ b/server/server.go @@ -190,7 +190,7 @@ type Server struct { // startCallbacks will be called after the server is started. startCallbacks []func() // leaderCallbacks will be called after the server becomes leader. - leaderCallbacks []func(context.Context) + leaderCallbacks []func(context.Context) error // closeCallbacks will be called before the server is closed. closeCallbacks []func() @@ -1547,7 +1547,7 @@ func (s *Server) IsServing() bool { } // AddServiceReadyCallback adds callbacks when the server becomes the leader if there is embedded etcd, or the primary otherwise. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { s.leaderCallbacks = append(s.leaderCallbacks, callbacks...) } diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index ab612869893..9b5371deb62 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -67,13 +67,16 @@ func (suite *serverTestSuite) TearDownSuite() { func (suite *serverTestSuite) TestAllocID() { re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`)) tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) + time.Sleep(200 * time.Millisecond) id, err := tc.GetPrimaryServer().GetCluster().AllocID() re.NoError(err) re.NotEqual(uint64(0), id) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember")) } func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() { @@ -83,15 +86,32 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() { re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) + time.Sleep(200 * time.Millisecond) cluster := tc.GetPrimaryServer().GetCluster() id, err := cluster.AllocID() re.NoError(err) re.NotEqual(uint64(0), id) suite.cluster.ResignLeader() suite.cluster.WaitLeader() - time.Sleep(time.Second) + time.Sleep(200 * time.Millisecond) id1, err := cluster.AllocID() re.NoError(err) re.Greater(id1, id) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember")) } + +func (suite *serverTestSuite) TestPrimaryChange() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + primary := tc.GetPrimaryServer() + addr := primary.GetAddr() + re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5) + primary.Close() + tc.WaitForPrimaryServing(re) + primary = tc.GetPrimaryServer() + re.NotEqual(addr, primary.GetAddr()) + re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5) +} From d03f485c952463bf54f5cb137766884f2f706219 Mon Sep 17 00:00:00 2001 From: Hu# Date: Mon, 11 Sep 2023 15:42:13 +0800 Subject: [PATCH 2/7] *: check raftcluster nil (#7054) close tikv/pd#7053 Signed-off-by: husharp Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/api/min_resolved_ts.go | 4 +-- server/grpc_service.go | 4 ++- server/handler.go | 14 +++++--- server/server.go | 60 +++++++++++++++++++++++------------ tests/server/api/api_test.go | 40 +++++++++++++++++++++++ 5 files changed, 95 insertions(+), 27 deletions(-) diff --git a/server/api/min_resolved_ts.go b/server/api/min_resolved_ts.go index ef05e91b9f7..1edf924370f 100644 --- a/server/api/min_resolved_ts.go +++ b/server/api/min_resolved_ts.go @@ -53,7 +53,7 @@ type minResolvedTS struct { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /min-resolved-ts/{store_id} [get] func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *http.Request) { - c := h.svr.GetRaftCluster() + c := getCluster(r) idStr := mux.Vars(r)["store_id"] storeID, err := strconv.ParseUint(idStr, 10, 64) if err != nil { @@ -84,7 +84,7 @@ func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *h // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /min-resolved-ts [get] func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.Request) { - c := h.svr.GetRaftCluster() + c := getCluster(r) scopeMinResolvedTS := c.GetMinResolvedTS() persistInterval := c.GetPDServerConfig().MinResolvedTSPersistenceInterval diff --git a/server/grpc_service.go b/server/grpc_service.go index 0563371cdc3..973c45a622f 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1600,7 +1600,6 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB if rc == nil { return &pdpb.ReportBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil } - _, err := rc.HandleBatchReportSplit(request) if err != nil { return &pdpb.ReportBatchSplitResponse{ @@ -2089,6 +2088,9 @@ func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.S return rsp.(*pdpb.SplitAndScatterRegionsResponse), err } rc := s.GetRaftCluster() + if rc == nil { + return &pdpb.SplitAndScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil + } splitFinishedPercentage, newRegionIDs := rc.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit())) scatterFinishedPercentage, err := scatterRegions(rc, newRegionIDs, request.GetGroup(), int(request.GetRetryLimit()), false) if err != nil { diff --git a/server/handler.go b/server/handler.go index 2e4b88b20e2..adc1e8ecd31 100644 --- a/server/handler.go +++ b/server/handler.go @@ -961,14 +961,20 @@ func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, _ // SetStoreLimitScene sets the limit values for different scenes func (h *Handler) SetStoreLimitScene(scene *storelimit.Scene, limitType storelimit.Type) { - cluster := h.s.GetRaftCluster() - cluster.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType) + rc := h.s.GetRaftCluster() + if rc == nil { + return + } + rc.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType) } // GetStoreLimitScene returns the limit values for different scenes func (h *Handler) GetStoreLimitScene(limitType storelimit.Type) *storelimit.Scene { - cluster := h.s.GetRaftCluster() - return cluster.GetStoreLimiter().StoreLimitScene(limitType) + rc := h.s.GetRaftCluster() + if rc == nil { + return nil + } + return rc.GetStoreLimiter().StoreLimitScene(limitType) } // GetProgressByID returns the progress details for a given store ID. diff --git a/server/server.go b/server/server.go index 34a7883cccd..b74ca5b57b3 100644 --- a/server/server.go +++ b/server/server.go @@ -1023,18 +1023,18 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error { } old := s.persistOptions.GetReplicationConfig() if cfg.EnablePlacementRules != old.EnablePlacementRules { - raftCluster := s.GetRaftCluster() - if raftCluster == nil { + rc := s.GetRaftCluster() + if rc == nil { return errs.ErrNotBootstrapped.GenWithStackByArgs() } if cfg.EnablePlacementRules { // initialize rule manager. - if err := raftCluster.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil { + if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil { return err } } else { // NOTE: can be removed after placement rules feature is enabled by default. - for _, s := range raftCluster.GetStores() { + for _, s := range rc.GetStores() { if !s.IsRemoved() && s.IsTiFlash() { return errors.New("cannot disable placement rules with TiFlash nodes") } @@ -1044,8 +1044,12 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error { var rule *placement.Rule if cfg.EnablePlacementRules { + rc := s.GetRaftCluster() + if rc == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } // replication.MaxReplicas won't work when placement rule is enabled and not only have one default rule. - defaultRule := s.GetRaftCluster().GetRuleManager().GetRule("pd", "default") + defaultRule := rc.GetRuleManager().GetRule("pd", "default") CheckInDefaultRule := func() error { // replication config won't work when placement rule is enabled and exceeds one default rule @@ -1071,7 +1075,11 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error { if rule != nil { rule.Count = int(cfg.MaxReplicas) rule.LocationLabels = cfg.LocationLabels - if err := s.GetRaftCluster().GetRuleManager().SetRule(rule); err != nil { + rc := s.GetRaftCluster() + if rc == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + if err := rc.GetRuleManager().SetRule(rule); err != nil { log.Error("failed to update rule count", errs.ZapError(err)) return err @@ -1083,7 +1091,11 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error { s.persistOptions.SetReplicationConfig(old) if rule != nil { rule.Count = int(old.MaxReplicas) - if e := s.GetRaftCluster().GetRuleManager().SetRule(rule); e != nil { + rc := s.GetRaftCluster() + if rc == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + if e := rc.GetRuleManager().SetRule(rule); e != nil { log.Error("failed to roll back count of rule when update replication config", errs.ZapError(e)) } } @@ -1371,18 +1383,18 @@ func (s *Server) GetServerOption() *config.PersistOptions { // GetMetaRegions gets meta regions from cluster. func (s *Server) GetMetaRegions() []*metapb.Region { - cluster := s.GetRaftCluster() - if cluster != nil { - return cluster.GetMetaRegions() + rc := s.GetRaftCluster() + if rc != nil { + return rc.GetMetaRegions() } return nil } // GetRegions gets regions from cluster. func (s *Server) GetRegions() []*core.RegionInfo { - cluster := s.GetRaftCluster() - if cluster != nil { - return cluster.GetRegions() + rc := s.GetRaftCluster() + if rc != nil { + return rc.GetRegions() } return nil } @@ -1519,9 +1531,9 @@ func (s *Server) SetReplicationModeConfig(cfg config.ReplicationModeConfig) erro } log.Info("replication mode config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) - cluster := s.GetRaftCluster() - if cluster != nil { - err := cluster.GetReplicationMode().UpdateConfig(cfg) + rc := s.GetRaftCluster() + if rc != nil { + err := rc.GetReplicationMode().UpdateConfig(cfg) if err != nil { log.Warn("failed to update replication mode", errs.ZapError(err)) // revert to old config @@ -1992,7 +2004,11 @@ func (s *Server) RecoverAllocID(ctx context.Context, id uint64) error { // GetExternalTS returns external timestamp. func (s *Server) GetExternalTS() uint64 { - return s.GetRaftCluster().GetExternalTS() + rc := s.GetRaftCluster() + if rc == nil { + return 0 + } + return rc.GetExternalTS() } // SetExternalTS returns external timestamp. @@ -2002,14 +2018,18 @@ func (s *Server) SetExternalTS(externalTS, globalTS uint64) error { log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("global ts", globalTS)) return errors.New(desc) } - currentExternalTS := s.GetRaftCluster().GetExternalTS() + c := s.GetRaftCluster() + if c == nil { + return errs.ErrNotBootstrapped.FastGenByArgs() + } + currentExternalTS := c.GetExternalTS() if tsoutil.CompareTimestampUint64(externalTS, currentExternalTS) != 1 { desc := "the external timestamp should be larger than current external timestamp" log.Error(desc, zap.Uint64("request", externalTS), zap.Uint64("current", currentExternalTS)) return errors.New(desc) } - s.GetRaftCluster().SetExternalTS(externalTS) - return nil + + return c.SetExternalTS(externalTS) } // IsLocalTSOEnabled returns if the local TSO is enabled. diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 623af9b0a82..61d47d7790c 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -807,6 +807,46 @@ func TestRemovingProgress(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) } +func TestSendApiWhenRestartRaftCluster(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, serverName string) { + conf.Replication.MaxReplicas = 1 + }) + re.NoError(err) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + re.NoError(err) + leader := cluster.GetServer(cluster.WaitLeader()) + + grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr()) + clusterID := leader.GetClusterID() + req := &pdpb.BootstrapRequest{ + Header: testutil.NewRequestHeader(clusterID), + Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"}, + Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, + } + resp, err := grpcPDClient.Bootstrap(context.Background(), req) + re.NoError(err) + re.Nil(resp.GetHeader().GetError()) + + // Mock restart raft cluster + rc := leader.GetRaftCluster() + re.NotNil(rc) + rc.Stop() + + // Mock client-go will still send request + output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/min-resolved-ts", http.MethodGet, http.StatusInternalServerError) + re.Contains(string(output), "TiKV cluster not bootstrapped, please start TiKV first") + + err = rc.Start(leader.GetServer()) + re.NoError(err) + rc = leader.GetRaftCluster() + re.NotNil(rc) +} + func TestPreparingProgress(t *testing.T) { re := require.New(t) re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) From 68df4b6dec7ae2930077a5d1ca54a1aaab772493 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Mon, 11 Sep 2023 17:13:42 +0800 Subject: [PATCH 3/7] test: fix unstable test TestScheduler (#7074) close tikv/pd#7062 Signed-off-by: Cabinfever_B Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- tests/pdctl/scheduler/scheduler_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index e1c0a210c01..3e89d8a59aa 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/core" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/versioninfo" @@ -127,7 +128,8 @@ func TestScheduler(t *testing.T) { pdctl.MustPutStore(re, leaderServer.GetServer(), store) } - pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b")) + // note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region. + pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) time.Sleep(3 * time.Second) // scheduler show command From 58113f8d17ccec4690445dad2e47cadbacd42da3 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 11 Sep 2023 17:48:14 +0800 Subject: [PATCH 4/7] *: split the `TestScheduler` (#7064) ref tikv/pd#7062 Signed-off-by: Ryan Leung --- tests/pdctl/scheduler/scheduler_test.go | 270 ++++++++++++++---------- 1 file changed, 156 insertions(+), 114 deletions(-) diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 3e89d8a59aa..a0447642cb6 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/spf13/cobra" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" sc "github.com/tikv/pd/pkg/schedule/config" @@ -67,25 +68,6 @@ func TestScheduler(t *testing.T) { }, } - mustExec := func(args []string, v interface{}) string { - output, err := pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - if v == nil { - return string(output) - } - re.NoError(json.Unmarshal(output, v)) - return "" - } - - mightExec := func(args []string, v interface{}) { - output, err := pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - if v == nil { - return - } - json.Unmarshal(output, v) - } - mustUsage := func(args []string) { output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) @@ -94,10 +76,10 @@ func TestScheduler(t *testing.T) { checkSchedulerCommand := func(args []string, expected map[string]bool) { if args != nil { - mustExec(args, nil) + mustExec(re, cmd, args, nil) } var schedulers []string - mustExec([]string{"-u", pdAddr, "scheduler", "show"}, &schedulers) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, &schedulers) for _, scheduler := range schedulers { re.True(expected[scheduler]) } @@ -105,23 +87,13 @@ func TestScheduler(t *testing.T) { checkSchedulerConfigCommand := func(args []string, expectedConfig map[string]interface{}, schedulerName string) { if args != nil { - mustExec(args, nil) + mustExec(re, cmd, args, nil) } configInfo := make(map[string]interface{}) - mustExec([]string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo) re.Equal(expectedConfig, configInfo) } - checkSchedulerDescribeCommand := func(schedulerName, expectedStatus, expectedSummary string) { - result := make(map[string]interface{}) - testutil.Eventually(re, func() bool { - mightExec([]string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result) - return len(result) != 0 - }, testutil.WithTickInterval(50*time.Millisecond)) - re.Equal(expectedStatus, result["status"]) - re.Equal(expectedSummary, result["summary"]) - } - leaderServer := cluster.GetServer(cluster.GetLeader()) re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { @@ -137,29 +109,21 @@ func TestScheduler(t *testing.T) { "balance-region-scheduler": true, "balance-leader-scheduler": true, "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, "transfer-witness-leader-scheduler": true, "balance-witness-scheduler": true, } checkSchedulerCommand(nil, expected) - echo := mustExec([]string{"-u", pdAddr, "config", "set", "enable-diagnostic", "true"}, nil) - re.Contains(echo, "Success!") - checkSchedulerDescribeCommand("balance-region-scheduler", "pending", "1 store(s) RegionNotMatchRule; ") - // scheduler delete command args := []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"} expected = map[string]bool{ "balance-leader-scheduler": true, "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, "transfer-witness-leader-scheduler": true, "balance-witness-scheduler": true, } checkSchedulerCommand(args, expected) - checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "") - schedulers := []string{"evict-leader-scheduler", "grant-leader-scheduler"} for idx := range schedulers { @@ -168,7 +132,6 @@ func TestScheduler(t *testing.T) { expected = map[string]bool{ "balance-leader-scheduler": true, "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, schedulers[idx]: true, "transfer-witness-leader-scheduler": true, "balance-witness-scheduler": true, @@ -185,7 +148,6 @@ func TestScheduler(t *testing.T) { expected = map[string]bool{ "balance-leader-scheduler": true, "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, schedulers[idx]: true, "transfer-witness-leader-scheduler": true, "balance-witness-scheduler": true, @@ -201,7 +163,6 @@ func TestScheduler(t *testing.T) { expected = map[string]bool{ "balance-leader-scheduler": true, "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, "transfer-witness-leader-scheduler": true, "balance-witness-scheduler": true, } @@ -212,7 +173,6 @@ func TestScheduler(t *testing.T) { expected = map[string]bool{ "balance-leader-scheduler": true, "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, schedulers[idx]: true, "transfer-witness-leader-scheduler": true, "balance-witness-scheduler": true, @@ -224,7 +184,6 @@ func TestScheduler(t *testing.T) { expected = map[string]bool{ "balance-leader-scheduler": true, "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, schedulers[idx]: true, "transfer-witness-leader-scheduler": true, "balance-witness-scheduler": true, @@ -240,7 +199,6 @@ func TestScheduler(t *testing.T) { expected = map[string]bool{ "balance-leader-scheduler": true, "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, schedulers[idx]: true, "transfer-witness-leader-scheduler": true, "balance-witness-scheduler": true, @@ -256,7 +214,6 @@ func TestScheduler(t *testing.T) { expected = map[string]bool{ "balance-leader-scheduler": true, "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, "transfer-witness-leader-scheduler": true, "balance-witness-scheduler": true, } @@ -267,25 +224,23 @@ func TestScheduler(t *testing.T) { checkSchedulerCommand([]string{"-u", pdAddr, "scheduler", "add", "shuffle-region-scheduler"}, map[string]bool{ "balance-leader-scheduler": true, "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, "shuffle-region-scheduler": true, "transfer-witness-leader-scheduler": true, "balance-witness-scheduler": true, }) var roles []string - mustExec([]string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) re.Equal([]string{"leader", "follower", "learner"}, roles) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "set-roles", "learner"}, nil) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "set-roles", "learner"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) re.Equal([]string{"learner"}, roles) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler"}, &roles) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler"}, &roles) re.Equal([]string{"learner"}, roles) // test grant hot region scheduler config checkSchedulerCommand([]string{"-u", pdAddr, "scheduler", "add", "grant-hot-region-scheduler", "1", "1,2,3"}, map[string]bool{ "balance-leader-scheduler": true, "balance-hot-region-scheduler": true, - "split-bucket-scheduler": true, "shuffle-region-scheduler": true, "grant-hot-region-scheduler": true, "transfer-witness-leader-scheduler": true, @@ -296,30 +251,30 @@ func TestScheduler(t *testing.T) { "store-id": []interface{}{float64(1), float64(2), float64(3)}, "store-leader-id": float64(1), } - mustExec([]string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) re.Equal(expected3, conf3) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,2,3"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,2,3"}, nil) expected3["store-leader-id"] = float64(2) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) re.Equal(expected3, conf3) // test balance region config - echo = mustExec([]string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) re.Contains(echo, "Success!") - echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) re.Contains(echo, "Success!") - echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) re.NotContains(echo, "Success!") - echo = mustExec([]string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil) re.Contains(echo, "Success!") - echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) re.Contains(echo, "Success!") - echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) re.Contains(echo, "404") // test hot region config - echo = mustExec([]string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, nil) re.Contains(echo, "[404] scheduler not found") expected1 := map[string]interface{}{ "min-hot-byte-rate": float64(100), @@ -344,63 +299,63 @@ func TestScheduler(t *testing.T) { "split-thresholds": 0.2, } var conf map[string]interface{} - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "list"}, &conf) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "list"}, &conf) re.Equal(expected1, conf) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "show"}, &conf) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "show"}, &conf) re.Equal(expected1, conf) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "src-tolerance-ratio", "1.02"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "src-tolerance-ratio", "1.02"}, nil) expected1["src-tolerance-ratio"] = 1.02 var conf1 map[string]interface{} - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) expected1["read-priorities"] = []interface{}{"byte", "key"} - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) expected1["read-priorities"] = []interface{}{"key", "byte"} - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) // write-priorities is divided into write-leader-priorities and write-peer-priorities - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) expected1["rank-formula-version"] = "v2" - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) expected1["rank-formula-version"] = "v1" - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) expected1["forbid-rw-type"] = "read" - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) // test compatibility @@ -412,69 +367,68 @@ func TestScheduler(t *testing.T) { } re.Equal("5.2.0", leaderServer.GetClusterVersion().String()) // After upgrading, we should not use query. - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(conf1["read-priorities"], []interface{}{"key", "byte"}) // cannot set qps as write-peer-priorities - echo = mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) re.Contains(echo, "query is not allowed to be set in priorities for write-peer-priorities") - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(conf1["write-peer-priorities"], []interface{}{"byte", "key"}) // test remove and add - echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil) re.Contains(echo, "Success") - echo = mustExec([]string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil) re.Contains(echo, "Success") // test balance leader config conf = make(map[string]interface{}) conf1 = make(map[string]interface{}) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "show"}, &conf) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "show"}, &conf) re.Equal(4., conf["batch"]) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "set", "batch", "3"}, nil) - mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, &conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "set", "batch", "3"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, &conf1) re.Equal(3., conf1["batch"]) - echo = mustExec([]string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) re.NotContains(echo, "Success!") - echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) re.Contains(echo, "Success!") - echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) re.Contains(echo, "404") re.Contains(echo, "PD:scheduler:ErrSchedulerNotFound]scheduler not found") - echo = mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, nil) re.Contains(echo, "404") re.Contains(echo, "scheduler not found") - echo = mustExec([]string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) re.Contains(echo, "Success!") // test show scheduler with paused and disabled status. checkSchedulerWithStatusCommand := func(args []string, status string, expected []string) { if args != nil { - mustExec(args, nil) + mustExec(re, cmd, args, nil) } var schedulers []string - mustExec([]string{"-u", pdAddr, "scheduler", "show", "--status", status}, &schedulers) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show", "--status", status}, &schedulers) re.Equal(expected, schedulers) } mustUsage([]string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler"}) - mustExec([]string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) checkSchedulerWithStatusCommand(nil, "paused", []string{ "balance-leader-scheduler", }) result := make(map[string]interface{}) testutil.Eventually(re, func() bool { - mightExec([]string{"-u", pdAddr, "scheduler", "describe", "balance-leader-scheduler"}, &result) + mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", "balance-leader-scheduler"}, &result) return len(result) != 0 && result["status"] == "paused" && result["summary"] == "" }, testutil.WithWaitFor(30*time.Second)) mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"}) - mustExec([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) checkSchedulerWithStatusCommand(nil, "paused", nil) - checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "") // set label scheduler to disabled manually. - echo = mustExec([]string{"-u", pdAddr, "scheduler", "add", "label-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "label-scheduler"}, nil) re.Contains(echo, "Success!") cfg := leaderServer.GetServer().GetScheduleConfig() origin := cfg.Schedulers @@ -488,3 +442,91 @@ func TestScheduler(t *testing.T) { re.NoError(err) checkSchedulerWithStatusCommand(nil, "disabled", nil) } + +func TestSchedulerDiagnostic(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 1) + re.NoError(err) + defer cluster.Destroy() + err = cluster.RunInitialServers() + re.NoError(err) + cluster.WaitLeader() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := pdctlCmd.GetRootCmd() + + checkSchedulerDescribeCommand := func(schedulerName, expectedStatus, expectedSummary string) { + result := make(map[string]interface{}) + testutil.Eventually(re, func() bool { + mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result) + return len(result) != 0 + }, testutil.WithTickInterval(50*time.Millisecond)) + re.Equal(expectedStatus, result["status"]) + re.Equal(expectedSummary, result["summary"]) + } + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + leaderServer := cluster.GetServer(cluster.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + for _, store := range stores { + pdctl.MustPutStore(re, leaderServer.GetServer(), store) + } + + // note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region. + pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) + time.Sleep(3 * time.Second) + + echo := mustExec(re, cmd, []string{"-u", pdAddr, "config", "set", "enable-diagnostic", "true"}, nil) + re.Contains(echo, "Success!") + checkSchedulerDescribeCommand("balance-region-scheduler", "pending", "1 store(s) RegionNotMatchRule; ") + + // scheduler delete command + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + + checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "") + + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "") +} + +func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v interface{}) string { + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + if v == nil { + return string(output) + } + re.NoError(json.Unmarshal(output, v)) + return "" +} + +func mightExec(re *require.Assertions, cmd *cobra.Command, args []string, v interface{}) { + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + if v == nil { + return + } + json.Unmarshal(output, v) +} From ea28fe83ddbbffc57eaff23b1e7147ff92b8aee8 Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Tue, 12 Sep 2023 15:46:41 +0800 Subject: [PATCH 5/7] fix wrong legend name (#7080) close tikv/pd#7081 Signed-off-by: bufferflies <1045931706@qq.com> --- metrics/grafana/pd.json | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index eec379c89ac..3a09bc9e618 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -7094,14 +7094,14 @@ "expr": "-sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-leader-scheduler\"}[1m])) by (source)", "format": "time_series", "intervalFactor": 2, - "legendFormat": "store-{{store}}", + "legendFormat": "store-{{source}}", "refId": "A" }, { "expr": "sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-leader-scheduler\"}[1m])) by (target)", "format": "time_series", "intervalFactor": 2, - "legendFormat": "store-{{store}}", + "legendFormat": "store-{{target}}", "refId": "B" } ], @@ -7198,14 +7198,14 @@ "expr": "-sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-region-scheduler\"}[1m])) by (source)", "format": "time_series", "intervalFactor": 2, - "legendFormat": "store-{{store}}", + "legendFormat": "store-{{source}}", "refId": "A" }, { "expr": "sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-region-scheduler\"}[1m])) by (target)", "format": "time_series", "intervalFactor": 2, - "legendFormat": "store-{{store}}", + "legendFormat": "store-{{target}}", "refId": "B" } ], From 2f57a9f050ebb96863500ccdf386fde1ded9d8c1 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 12 Sep 2023 18:36:10 +0800 Subject: [PATCH 6/7] chore(dashboard): update version to v2023.09.11.1 (#7065) (#7079) close tikv/pd#7066 1. security: improve tidb-dashboard login security, encrypt the login password to avoid transport plain text (required for 6.5 version) 2. debug-api: support pagination for ddl history for debug-api 3. keyvisual: use scanRegions instead of fetch all regions (required for 6.5 version) 4. refine execution plan, now it can be showed as table style Signed-off-by: ti-chi-bot Signed-off-by: nolouch Co-authored-by: Sparkle <1284531+baurine@users.noreply.github.com> Co-authored-by: nolouch Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- tests/integrations/client/go.mod | 2 +- tests/integrations/client/go.sum | 4 ++-- tests/integrations/mcs/go.mod | 2 +- tests/integrations/mcs/go.sum | 4 ++-- tests/integrations/tso/go.mod | 2 +- tests/integrations/tso/go.sum | 4 ++-- 8 files changed, 12 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index 503ac53f33f..6f6be36d883 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 - github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 + github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 github.com/prometheus/client_golang v1.11.1 github.com/prometheus/common v0.26.0 github.com/sasha-s/go-deadlock v0.2.0 diff --git a/go.sum b/go.sum index e826e53af37..8640a98a6b3 100644 --- a/go.sum +++ b/go.sum @@ -446,8 +446,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 h1:K9lZMYuDuAiR5kOjFESwJ8KfSb4ui5zX6vZGbUp58uk= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 h1:oyrCfNlAWmLlUfEr+7YTSBo29SP/J1N8hnxBt5yUABo= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index 72eac02d341..70006a085e9 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -119,7 +119,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index 2e3d6329886..04eb8e8647f 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -410,8 +410,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 h1:K9lZMYuDuAiR5kOjFESwJ8KfSb4ui5zX6vZGbUp58uk= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 h1:oyrCfNlAWmLlUfEr+7YTSBo29SP/J1N8hnxBt5yUABo= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 62b4b022ead..98c8a420238 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -119,7 +119,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index a759c68761d..801d9b926cc 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -414,8 +414,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 h1:K9lZMYuDuAiR5kOjFESwJ8KfSb4ui5zX6vZGbUp58uk= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 h1:oyrCfNlAWmLlUfEr+7YTSBo29SP/J1N8hnxBt5yUABo= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index 84825b5a466..a9788e01ed2 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -117,7 +117,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 6f9c7f8f2cd..891bc42d24f 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -408,8 +408,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 h1:K9lZMYuDuAiR5kOjFESwJ8KfSb4ui5zX6vZGbUp58uk= -github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 h1:oyrCfNlAWmLlUfEr+7YTSBo29SP/J1N8hnxBt5yUABo= +github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= From b8d9c6e04992d233ee60bc0213cc9251c0b74110 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 13 Sep 2023 12:12:07 +0800 Subject: [PATCH 7/7] mcs: watch scheduling service's primary address (#7072) ref tikv/pd#5839 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/member/participant.go | 33 ++++++++--------- pkg/storage/endpoint/key_path.go | 6 ++++ server/api/server.go | 5 ++- server/server.go | 36 +++++++++++++------ tests/cluster.go | 5 +++ .../mcs/scheduling/server_test.go | 15 ++++++-- 6 files changed, 68 insertions(+), 32 deletions(-) diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 27dced57791..b3034a86807 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -166,15 +166,7 @@ func (m *Participant) setLeader(member participant) { // unsetLeader unsets the member's leader. func (m *Participant) unsetLeader() { - var leader participant - switch m.serviceName { - case utils.TSOServiceName: - leader = &tsopb.Participant{} - case utils.SchedulingServiceName: - leader = &schedulingpb.Participant{} - case utils.ResourceManagerServiceName: - leader = &resource_manager.Participant{} - } + leader := NewParticipantByService(m.serviceName) m.leader.Store(leader) m.lastLeaderUpdatedTime.Store(time.Now()) } @@ -225,15 +217,7 @@ func (m *Participant) PreCheckLeader() error { // getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). func (m *Participant) getPersistentLeader() (participant, int64, error) { - var leader participant - switch m.serviceName { - case utils.TSOServiceName: - leader = &tsopb.Participant{} - case utils.SchedulingServiceName: - leader = &schedulingpb.Participant{} - case utils.ResourceManagerServiceName: - leader = &resource_manager.Participant{} - } + leader := NewParticipantByService(m.serviceName) ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader) if err != nil { return nil, 0, err @@ -399,3 +383,16 @@ func (m *Participant) campaignCheck() bool { func (m *Participant) SetCampaignChecker(checker leadershipCheckFunc) { m.campaignChecker.Store(checker) } + +// NewParticipantByService creates a new participant by service name. +func NewParticipantByService(serviceName string) (p participant) { + switch serviceName { + case utils.TSOServiceName: + p = &tsopb.Participant{} + case utils.SchedulingServiceName: + p = &schedulingpb.Participant{} + case utils.ResourceManagerServiceName: + p = &resource_manager.Participant{} + } + return p +} diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 85af79203a4..4b67441a5ac 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -325,6 +325,12 @@ func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string { return path.Join(electionPath, utils.PrimaryKey) } +// SchedulingPrimaryPath returns the path of scheduling primary. +// Path: /ms/{cluster_id}/scheduling/primary +func SchedulingPrimaryPath(clusterID uint64) string { + return path.Join(SchedulingSvcRootPath(clusterID), utils.PrimaryKey) +} + // KeyspaceGroupsElectionPath returns the path of keyspace groups election. // default keyspace group: "/ms/{cluster_id}/tso/00000". // non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}". diff --git a/server/api/server.go b/server/api/server.go index 272e76cc60b..1d881022c04 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -20,6 +20,7 @@ import ( "github.com/gorilla/mux" tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" + mcs "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/serverapi" "github.com/tikv/pd/server" @@ -39,7 +40,9 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP router.PathPrefix(apiPrefix).Handler(negroni.New( serverapi.NewRuntimeServiceValidator(svr, group), serverapi.NewRedirector(svr, serverapi.MicroserviceRedirectRule( - apiPrefix+"/api/v1"+"/admin/reset-ts", tsoapi.APIPathPrefix+"/admin/reset-ts", "tso")), + apiPrefix+"/api/v1"+"/admin/reset-ts", + tsoapi.APIPathPrefix+"/admin/reset-ts", + mcs.TSOServiceName)), negroni.Wrap(r)), ) diff --git a/server/server.go b/server/server.go index b74ca5b57b3..2a076923caf 100644 --- a/server/server.go +++ b/server/server.go @@ -226,10 +226,11 @@ type Server struct { auditBackends []audit.Backend - registry *registry.ServiceRegistry - mode string - servicePrimaryMap sync.Map /* Store as map[string]string */ - tsoPrimaryWatcher *etcdutil.LoopWatcher + registry *registry.ServiceRegistry + mode string + servicePrimaryMap sync.Map /* Store as map[string]string */ + tsoPrimaryWatcher *etcdutil.LoopWatcher + schedulingPrimaryWatcher *etcdutil.LoopWatcher } // HandlerBuilder builds a server HTTP handler. @@ -617,7 +618,7 @@ func (s *Server) startServerLoop(ctx context.Context) { go s.encryptionKeyManagerLoop() if s.IsAPIServiceMode() { s.initTSOPrimaryWatcher() - s.tsoPrimaryWatcher.StartWatchLoop() + s.initSchedulingPrimaryWatcher() } } @@ -1962,8 +1963,20 @@ func (s *Server) initTSOPrimaryWatcher() { serviceName := mcs.TSOServiceName tsoRootPath := endpoint.TSOSvcRootPath(s.clusterID) tsoServicePrimaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, mcs.DefaultKeyspaceGroupID) + s.tsoPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, tsoServicePrimaryKey) + s.tsoPrimaryWatcher.StartWatchLoop() +} + +func (s *Server) initSchedulingPrimaryWatcher() { + serviceName := mcs.SchedulingServiceName + primaryKey := endpoint.SchedulingPrimaryPath(s.clusterID) + s.schedulingPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, primaryKey) + s.schedulingPrimaryWatcher.StartWatchLoop() +} + +func (s *Server) initServicePrimaryWatcher(serviceName string, primaryKey string) *etcdutil.LoopWatcher { putFn := func(kv *mvccpb.KeyValue) error { - primary := &tsopb.Participant{} // TODO: use Generics + primary := member.NewParticipantByService(serviceName) if err := proto.Unmarshal(kv.Value, primary); err != nil { return err } @@ -1971,7 +1984,7 @@ func (s *Server) initTSOPrimaryWatcher() { if len(listenUrls) > 0 { // listenUrls[0] is the primary service endpoint of the keyspace group s.servicePrimaryMap.Store(serviceName, listenUrls[0]) - log.Info("update tso primary", zap.String("primary", listenUrls[0])) + log.Info("update service primary", zap.String("service-name", serviceName), zap.String("primary", listenUrls[0])) } return nil } @@ -1981,16 +1994,17 @@ func (s *Server) initTSOPrimaryWatcher() { if ok { oldPrimary = v.(string) } - log.Info("delete tso primary", zap.String("old-primary", oldPrimary)) + log.Info("delete service primary", zap.String("service-name", serviceName), zap.String("old-primary", oldPrimary)) s.servicePrimaryMap.Delete(serviceName) return nil } - s.tsoPrimaryWatcher = etcdutil.NewLoopWatcher( + name := fmt.Sprintf("%s-primary-watcher", serviceName) + return etcdutil.NewLoopWatcher( s.serverLoopCtx, &s.serverLoopWg, s.client, - "tso-primary-watcher", - tsoServicePrimaryKey, + name, + primaryKey, putFn, deleteFn, func() error { return nil }, diff --git a/tests/cluster.go b/tests/cluster.go index 607955cc6a9..ce8293531cd 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -433,6 +433,11 @@ func (s *TestServer) GetTSOAllocatorManager() *tso.AllocatorManager { return s.server.GetTSOAllocatorManager() } +// GetServicePrimaryAddr returns the primary address of the service. +func (s *TestServer) GetServicePrimaryAddr(ctx context.Context, serviceName string) (string, bool) { + return s.server.GetServicePrimaryAddr(ctx, serviceName) +} + // TestCluster is only for test. type TestCluster struct { config *clusterConfig diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 9b5371deb62..54994bbc34b 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/failpoint" "github.com/stretchr/testify/suite" + mcs "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" "go.uber.org/goleak" @@ -107,11 +108,21 @@ func (suite *serverTestSuite) TestPrimaryChange() { defer tc.Destroy() tc.WaitForPrimaryServing(re) primary := tc.GetPrimaryServer() - addr := primary.GetAddr() + oldPrimaryAddr := primary.GetAddr() re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5) + testutil.Eventually(re, func() bool { + watchedAddr, ok := suite.pdLeader.GetServicePrimaryAddr(suite.ctx, mcs.SchedulingServiceName) + return ok && oldPrimaryAddr == watchedAddr + }) + // transfer leader primary.Close() tc.WaitForPrimaryServing(re) primary = tc.GetPrimaryServer() - re.NotEqual(addr, primary.GetAddr()) + newPrimaryAddr := primary.GetAddr() + re.NotEqual(oldPrimaryAddr, newPrimaryAddr) re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5) + testutil.Eventually(re, func() bool { + watchedAddr, ok := suite.pdLeader.GetServicePrimaryAddr(suite.ctx, mcs.SchedulingServiceName) + return ok && newPrimaryAddr == watchedAddr + }) }