diff --git a/pkg/basicserver/basic_server.go b/pkg/basicserver/basic_server.go index afb56c2edd91..28ba3ad08de3 100644 --- a/pkg/basicserver/basic_server.go +++ b/pkg/basicserver/basic_server.go @@ -44,5 +44,5 @@ type Server interface { // IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. IsServing() bool // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. - AddServiceReadyCallback(callbacks ...func(context.Context)) + AddServiceReadyCallback(callbacks ...func(context.Context) error) } diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index a9e53f347fa0..6d1b872575bf 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -101,7 +101,7 @@ func (m *Manager) GetBasicServer() bs.Server { } // Init initializes the resource group manager. -func (m *Manager) Init(ctx context.Context) { +func (m *Manager) Init(ctx context.Context) error { // Todo: If we can modify following configs in the future, we should reload these configs. // Store the controller config into the storage. m.storage.SaveControllerConfig(m.controllerConfig) @@ -156,6 +156,7 @@ func (m *Manager) Init(ctx context.Context) { m.persistLoop(ctx) }() log.Info("resource group manager finishes initialization") + return nil } // AddResourceGroup puts a resource group. diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 40913c2611c5..78685850e86a 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -71,7 +71,7 @@ type Server struct { service *Service // primaryCallbacks will be called after the server becomes leader. - primaryCallbacks []func(context.Context) + primaryCallbacks []func(context.Context) error serviceRegister *discovery.ServiceRegister } @@ -232,7 +232,7 @@ func (s *Server) IsClosed() bool { } // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 18cc55fbc163..19bd891196fa 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -143,12 +143,11 @@ func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConf // AllocID allocates a new ID. func (c *Cluster) AllocID() (uint64, error) { - cli := c.apiServerLeader.Load().(pdpb.PDClient) - if cli == nil { - c.checkMembershipCh <- struct{}{} - return 0, errors.New("API server leader is not found") + client, err := c.getAPIServerLeaderClient() + if err != nil { + return 0, err } - resp, err := cli.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) + resp, err := client.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) if err != nil { c.checkMembershipCh <- struct{}{} return 0, err @@ -156,6 +155,15 @@ func (c *Cluster) AllocID() (uint64, error) { return resp.GetId(), nil } +func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) { + cli := c.apiServerLeader.Load() + if cli == nil { + c.checkMembershipCh <- struct{}{} + return nil, errors.New("API server leader is not found") + } + return cli.(pdpb.PDClient), nil +} + // SwitchAPIServerLeader switches the API server leader. func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { old := c.apiServerLeader.Load() diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index c6b5a5506975..e0e5bb9b6615 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/mcs/utils" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/configutil" "github.com/tikv/pd/pkg/utils/grpcutil" @@ -239,6 +240,18 @@ func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) { o.schedule.Store(cfg) } +// AdjustScheduleCfg adjusts the schedule config. +func (o *PersistConfig) AdjustScheduleCfg(scheduleCfg *sc.ScheduleConfig) { + // In case we add new default schedulers. + for _, ps := range sc.DefaultSchedulers { + if slice.NoneOf(scheduleCfg.Schedulers, func(i int) bool { + return scheduleCfg.Schedulers[i].Type == ps.Type + }) { + scheduleCfg.Schedulers = append(scheduleCfg.Schedulers, ps) + } + } +} + // GetReplicationConfig returns replication configurations. func (o *PersistConfig) GetReplicationConfig() *sc.ReplicationConfig { return o.replication.Load().(*sc.ReplicationConfig) diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index c9010db69a3e..e6e204b86312 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -94,6 +94,7 @@ func (cw *Watcher) initializeConfigWatcher() error { zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) return err } + cw.AdjustScheduleCfg(&cfg.Schedule) cw.SetClusterVersion(&cfg.ClusterVersion) cw.SetScheduleConfig(&cfg.Schedule) cw.SetReplicationConfig(&cfg.Replication) diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index cbc73f79dbf6..1e3aea41aa54 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -87,7 +87,8 @@ type Server struct { checkMembershipCh chan struct{} // primaryCallbacks will be called after the server becomes leader. - primaryCallbacks []func(context.Context) + primaryCallbacks []func(context.Context) error + primaryExitCallbacks []func() // for service registry serviceID *discovery.ServiceRegistryEntry @@ -164,6 +165,9 @@ func (s *Server) updateAPIServerMemberLoop() { case <-ticker.C: case <-s.checkMembershipCh: } + if !s.IsServing() { + continue + } members, err := s.GetClient().MemberList(ctx) if err != nil { log.Warn("failed to list members", errs.ZapError(err)) @@ -247,9 +251,16 @@ func (s *Server) campaignLeader() { log.Info("triggering the primary callback functions") for _, cb := range s.primaryCallbacks { - cb(ctx) + if err := cb(ctx); err != nil { + log.Error("failed to trigger the primary callback functions", errs.ZapError(err)) + return + } } - + defer func() { + for _, cb := range s.primaryExitCallbacks { + cb() + } + }() s.participant.EnableLeader() log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name())) @@ -283,10 +294,6 @@ func (s *Server) Close() { utils.StopHTTPServer(s) utils.StopGRPCServer(s) s.GetListener().Close() - s.GetCoordinator().Stop() - s.ruleWatcher.Close() - s.configWatcher.Close() - s.metaWatcher.Close() s.serverLoopCancel() s.serverLoopWg.Wait() @@ -313,10 +320,15 @@ func (s *Server) IsClosed() bool { } // AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) } +// AddServiceExitCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. +func (s *Server) AddServiceExitCallback(callbacks ...func()) { + s.primaryExitCallbacks = append(s.primaryExitCallbacks, callbacks...) +} + // GetTLSConfig gets the security config. func (s *Server) GetTLSConfig() *grpcutil.TLSConfig { return &s.cfg.Security.TLSConfig @@ -381,20 +393,10 @@ func (s *Server) startServer() (err error) { ListenUrls: []string{s.cfg.AdvertiseListenAddr}, } s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election") - s.basicCluster = core.NewBasicCluster() - err = s.startWatcher() - if err != nil { - return err - } - s.storage = endpoint.NewStorageEndpoint( - kv.NewEtcdKVBase(s.GetClient(), endpoint.PDRootPath(s.clusterID)), nil) - s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster) - s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh) - if err != nil { - return err - } s.service = &Service{Server: s} + s.AddServiceReadyCallback(s.startCluster) + s.AddServiceExitCallback(s.stopCluster) if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil { return err } @@ -406,7 +408,6 @@ func (s *Server) startServer() (err error) { go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.GetListener()) s.checkMembershipCh <- struct{}{} <-serverReadyChan - go s.GetCoordinator().RunUntilStop() // Run callbacks log.Info("triggering the start callback functions") @@ -429,6 +430,29 @@ func (s *Server) startServer() (err error) { return nil } +func (s *Server) startCluster(context.Context) error { + s.basicCluster = core.NewBasicCluster() + err := s.startWatcher() + if err != nil { + return err + } + s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) + s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster) + s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh) + if err != nil { + return err + } + go s.GetCoordinator().RunUntilStop() + return nil +} + +func (s *Server) stopCluster() { + s.GetCoordinator().Stop() + s.ruleWatcher.Close() + s.configWatcher.Close() + s.metaWatcher.Close() +} + func (s *Server) startWatcher() (err error) { s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster) if err != nil { diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 91d5e480eaa4..40958ca463ce 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -236,7 +236,7 @@ func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error { // AddServiceReadyCallback implements basicserver. // It adds callbacks when it's ready for providing tso service. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { // Do nothing here. The primary of each keyspace group assigned to this host // will respond to the requests accordingly. } diff --git a/server/api/min_resolved_ts.go b/server/api/min_resolved_ts.go index ef05e91b9f78..1edf924370f5 100644 --- a/server/api/min_resolved_ts.go +++ b/server/api/min_resolved_ts.go @@ -53,7 +53,7 @@ type minResolvedTS struct { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /min-resolved-ts/{store_id} [get] func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *http.Request) { - c := h.svr.GetRaftCluster() + c := getCluster(r) idStr := mux.Vars(r)["store_id"] storeID, err := strconv.ParseUint(idStr, 10, 64) if err != nil { @@ -84,7 +84,7 @@ func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *h // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /min-resolved-ts [get] func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.Request) { - c := h.svr.GetRaftCluster() + c := getCluster(r) scopeMinResolvedTS := c.GetMinResolvedTS() persistInterval := c.GetPDServerConfig().MinResolvedTSPersistenceInterval diff --git a/server/grpc_service.go b/server/grpc_service.go index 0563371cdc3a..973c45a622f7 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1600,7 +1600,6 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB if rc == nil { return &pdpb.ReportBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil } - _, err := rc.HandleBatchReportSplit(request) if err != nil { return &pdpb.ReportBatchSplitResponse{ @@ -2089,6 +2088,9 @@ func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.S return rsp.(*pdpb.SplitAndScatterRegionsResponse), err } rc := s.GetRaftCluster() + if rc == nil { + return &pdpb.SplitAndScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil + } splitFinishedPercentage, newRegionIDs := rc.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit())) scatterFinishedPercentage, err := scatterRegions(rc, newRegionIDs, request.GetGroup(), int(request.GetRetryLimit()), false) if err != nil { diff --git a/server/handler.go b/server/handler.go index 2e4b88b20e2d..adc1e8ecd318 100644 --- a/server/handler.go +++ b/server/handler.go @@ -961,14 +961,20 @@ func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, _ // SetStoreLimitScene sets the limit values for different scenes func (h *Handler) SetStoreLimitScene(scene *storelimit.Scene, limitType storelimit.Type) { - cluster := h.s.GetRaftCluster() - cluster.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType) + rc := h.s.GetRaftCluster() + if rc == nil { + return + } + rc.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType) } // GetStoreLimitScene returns the limit values for different scenes func (h *Handler) GetStoreLimitScene(limitType storelimit.Type) *storelimit.Scene { - cluster := h.s.GetRaftCluster() - return cluster.GetStoreLimiter().StoreLimitScene(limitType) + rc := h.s.GetRaftCluster() + if rc == nil { + return nil + } + return rc.GetStoreLimiter().StoreLimitScene(limitType) } // GetProgressByID returns the progress details for a given store ID. diff --git a/server/server.go b/server/server.go index a72c1c23f0c3..b74ca5b57b39 100644 --- a/server/server.go +++ b/server/server.go @@ -190,7 +190,7 @@ type Server struct { // startCallbacks will be called after the server is started. startCallbacks []func() // leaderCallbacks will be called after the server becomes leader. - leaderCallbacks []func(context.Context) + leaderCallbacks []func(context.Context) error // closeCallbacks will be called before the server is closed. closeCallbacks []func() @@ -1023,18 +1023,18 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error { } old := s.persistOptions.GetReplicationConfig() if cfg.EnablePlacementRules != old.EnablePlacementRules { - raftCluster := s.GetRaftCluster() - if raftCluster == nil { + rc := s.GetRaftCluster() + if rc == nil { return errs.ErrNotBootstrapped.GenWithStackByArgs() } if cfg.EnablePlacementRules { // initialize rule manager. - if err := raftCluster.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil { + if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil { return err } } else { // NOTE: can be removed after placement rules feature is enabled by default. - for _, s := range raftCluster.GetStores() { + for _, s := range rc.GetStores() { if !s.IsRemoved() && s.IsTiFlash() { return errors.New("cannot disable placement rules with TiFlash nodes") } @@ -1044,8 +1044,12 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error { var rule *placement.Rule if cfg.EnablePlacementRules { + rc := s.GetRaftCluster() + if rc == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } // replication.MaxReplicas won't work when placement rule is enabled and not only have one default rule. - defaultRule := s.GetRaftCluster().GetRuleManager().GetRule("pd", "default") + defaultRule := rc.GetRuleManager().GetRule("pd", "default") CheckInDefaultRule := func() error { // replication config won't work when placement rule is enabled and exceeds one default rule @@ -1071,7 +1075,11 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error { if rule != nil { rule.Count = int(cfg.MaxReplicas) rule.LocationLabels = cfg.LocationLabels - if err := s.GetRaftCluster().GetRuleManager().SetRule(rule); err != nil { + rc := s.GetRaftCluster() + if rc == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + if err := rc.GetRuleManager().SetRule(rule); err != nil { log.Error("failed to update rule count", errs.ZapError(err)) return err @@ -1083,7 +1091,11 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error { s.persistOptions.SetReplicationConfig(old) if rule != nil { rule.Count = int(old.MaxReplicas) - if e := s.GetRaftCluster().GetRuleManager().SetRule(rule); e != nil { + rc := s.GetRaftCluster() + if rc == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + if e := rc.GetRuleManager().SetRule(rule); e != nil { log.Error("failed to roll back count of rule when update replication config", errs.ZapError(e)) } } @@ -1371,18 +1383,18 @@ func (s *Server) GetServerOption() *config.PersistOptions { // GetMetaRegions gets meta regions from cluster. func (s *Server) GetMetaRegions() []*metapb.Region { - cluster := s.GetRaftCluster() - if cluster != nil { - return cluster.GetMetaRegions() + rc := s.GetRaftCluster() + if rc != nil { + return rc.GetMetaRegions() } return nil } // GetRegions gets regions from cluster. func (s *Server) GetRegions() []*core.RegionInfo { - cluster := s.GetRaftCluster() - if cluster != nil { - return cluster.GetRegions() + rc := s.GetRaftCluster() + if rc != nil { + return rc.GetRegions() } return nil } @@ -1519,9 +1531,9 @@ func (s *Server) SetReplicationModeConfig(cfg config.ReplicationModeConfig) erro } log.Info("replication mode config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) - cluster := s.GetRaftCluster() - if cluster != nil { - err := cluster.GetReplicationMode().UpdateConfig(cfg) + rc := s.GetRaftCluster() + if rc != nil { + err := rc.GetReplicationMode().UpdateConfig(cfg) if err != nil { log.Warn("failed to update replication mode", errs.ZapError(err)) // revert to old config @@ -1547,7 +1559,7 @@ func (s *Server) IsServing() bool { } // AddServiceReadyCallback adds callbacks when the server becomes the leader if there is embedded etcd, or the primary otherwise. -func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) { s.leaderCallbacks = append(s.leaderCallbacks, callbacks...) } @@ -1992,7 +2004,11 @@ func (s *Server) RecoverAllocID(ctx context.Context, id uint64) error { // GetExternalTS returns external timestamp. func (s *Server) GetExternalTS() uint64 { - return s.GetRaftCluster().GetExternalTS() + rc := s.GetRaftCluster() + if rc == nil { + return 0 + } + return rc.GetExternalTS() } // SetExternalTS returns external timestamp. @@ -2002,14 +2018,18 @@ func (s *Server) SetExternalTS(externalTS, globalTS uint64) error { log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("global ts", globalTS)) return errors.New(desc) } - currentExternalTS := s.GetRaftCluster().GetExternalTS() + c := s.GetRaftCluster() + if c == nil { + return errs.ErrNotBootstrapped.FastGenByArgs() + } + currentExternalTS := c.GetExternalTS() if tsoutil.CompareTimestampUint64(externalTS, currentExternalTS) != 1 { desc := "the external timestamp should be larger than current external timestamp" log.Error(desc, zap.Uint64("request", externalTS), zap.Uint64("current", currentExternalTS)) return errors.New(desc) } - s.GetRaftCluster().SetExternalTS(externalTS) - return nil + + return c.SetExternalTS(externalTS) } // IsLocalTSOEnabled returns if the local TSO is enabled. diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index ab612869893b..9b5371deb628 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -67,13 +67,16 @@ func (suite *serverTestSuite) TearDownSuite() { func (suite *serverTestSuite) TestAllocID() { re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`)) tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) + time.Sleep(200 * time.Millisecond) id, err := tc.GetPrimaryServer().GetCluster().AllocID() re.NoError(err) re.NotEqual(uint64(0), id) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember")) } func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() { @@ -83,15 +86,32 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() { re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) + time.Sleep(200 * time.Millisecond) cluster := tc.GetPrimaryServer().GetCluster() id, err := cluster.AllocID() re.NoError(err) re.NotEqual(uint64(0), id) suite.cluster.ResignLeader() suite.cluster.WaitLeader() - time.Sleep(time.Second) + time.Sleep(200 * time.Millisecond) id1, err := cluster.AllocID() re.NoError(err) re.Greater(id1, id) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember")) } + +func (suite *serverTestSuite) TestPrimaryChange() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + primary := tc.GetPrimaryServer() + addr := primary.GetAddr() + re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5) + primary.Close() + tc.WaitForPrimaryServing(re) + primary = tc.GetPrimaryServer() + re.NotEqual(addr, primary.GetAddr()) + re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5) +} diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 447cfd1ea13f..a4760d760916 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/spf13/cobra" "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/core" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/versioninfo" @@ -99,7 +100,8 @@ func TestScheduler(t *testing.T) { pdctl.MustPutStore(re, leaderServer.GetServer(), store) } - pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b")) + // note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region. + pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) time.Sleep(3 * time.Second) // scheduler show command diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 623af9b0a822..61d47d7790c2 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -807,6 +807,46 @@ func TestRemovingProgress(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) } +func TestSendApiWhenRestartRaftCluster(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, serverName string) { + conf.Replication.MaxReplicas = 1 + }) + re.NoError(err) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + re.NoError(err) + leader := cluster.GetServer(cluster.WaitLeader()) + + grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr()) + clusterID := leader.GetClusterID() + req := &pdpb.BootstrapRequest{ + Header: testutil.NewRequestHeader(clusterID), + Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"}, + Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, + } + resp, err := grpcPDClient.Bootstrap(context.Background(), req) + re.NoError(err) + re.Nil(resp.GetHeader().GetError()) + + // Mock restart raft cluster + rc := leader.GetRaftCluster() + re.NotNil(rc) + rc.Stop() + + // Mock client-go will still send request + output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/min-resolved-ts", http.MethodGet, http.StatusInternalServerError) + re.Contains(string(output), "TiKV cluster not bootstrapped, please start TiKV first") + + err = rc.Start(leader.GetServer()) + re.NoError(err) + rc = leader.GetRaftCluster() + re.NotNil(rc) +} + func TestPreparingProgress(t *testing.T) { re := require.New(t) re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`))