From 25629f2dbd1d293b1a99dde0272621a153ca346a Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Tue, 22 Oct 2024 11:59:36 +0800 Subject: [PATCH 1/4] ms/tso: move `startGlobalAllocatorLoop` outside `NewAllocatorManager` (#8725) ref tikv/pd#4399 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/tso/allocator_manager.go | 18 ++++++++++++++---- pkg/tso/global_allocator.go | 8 ++------ pkg/tso/keyspace_group_manager.go | 3 ++- server/server.go | 2 +- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 56ee8313d57..a02d4884e17 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -202,7 +202,6 @@ func NewAllocatorManager( rootPath string, storage endpoint.TSOStorage, cfg Config, - startGlobalLeaderLoop bool, ) *AllocatorManager { ctx, cancel := context.WithCancel(ctx) am := &AllocatorManager{ @@ -224,7 +223,7 @@ func NewAllocatorManager( am.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn) // Set up the Global TSO Allocator here, it will be initialized once the member campaigns leader successfully. - am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership(), startGlobalLeaderLoop) + am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership()) am.svcLoopWG.Add(1) go am.tsoAllocatorLoop() @@ -234,11 +233,11 @@ func NewAllocatorManager( // SetUpGlobalAllocator is used to set up the global allocator, which will initialize the allocator and put it into // an allocator daemon. An TSO Allocator should only be set once, and may be initialized and reset multiple times // depending on the election. -func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership, startGlobalLeaderLoop bool) { +func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership) { am.mu.Lock() defer am.mu.Unlock() - allocator := NewGlobalTSOAllocator(ctx, am, startGlobalLeaderLoop) + allocator := NewGlobalTSOAllocator(ctx, am) // Create a new allocatorGroup ctx, cancel := context.WithCancel(ctx) am.mu.allocatorGroups[GlobalDCLocation] = &allocatorGroup{ @@ -1389,3 +1388,14 @@ func (am *AllocatorManager) GetLeaderAddr() string { } return leaderAddrs[0] } + +func (am *AllocatorManager) startGlobalAllocatorLoop() { + globalTSOAllocator, ok := am.mu.allocatorGroups[GlobalDCLocation].allocator.(*GlobalTSOAllocator) + if !ok { + // it should never happen + log.Error("failed to start global allocator loop, global allocator not found") + return + } + globalTSOAllocator.wg.Add(1) + go globalTSOAllocator.primaryElectionLoop() +} diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 53a6f65a25d..5c7c905089c 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -97,7 +97,6 @@ type GlobalTSOAllocator struct { func NewGlobalTSOAllocator( ctx context.Context, am *AllocatorManager, - startGlobalLeaderLoop bool, ) Allocator { ctx, cancel := context.WithCancel(ctx) gta := &GlobalTSOAllocator{ @@ -109,11 +108,6 @@ func NewGlobalTSOAllocator( tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(am.getGroupIDStr(), GlobalDCLocation), } - if startGlobalLeaderLoop { - gta.wg.Add(1) - go gta.primaryElectionLoop() - } - return gta } @@ -537,6 +531,8 @@ func (gta *GlobalTSOAllocator) Reset() { gta.timestampOracle.ResetTimestamp() } +// primaryElectionLoop is used to maintain the TSO primary election and TSO's +// running allocator. It is only used in API mode. func (gta *GlobalTSOAllocator) primaryElectionLoop() { defer logutil.LogPanic() defer gta.wg.Done() diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 7c1d3426ce9..c19d790efc5 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -790,7 +790,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro storage = kgm.tsoSvcStorage } // Initialize all kinds of maps. - am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true) + am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg) + am.startGlobalAllocatorLoop() log.Info("created allocator manager", zap.Uint32("keyspace-group-id", group.ID), zap.String("timestamp-path", am.GetTimestampPath(""))) diff --git a/server/server.go b/server/server.go index c79f51d8153..9691633bae2 100644 --- a/server/server.go +++ b/server/server.go @@ -475,7 +475,7 @@ func (s *Server) startServer(ctx context.Context) error { s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize) s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.pdProtoFactory = &tsoutil.PDProtoFactory{} - s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false) + s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s) // When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists. if !s.cfg.EnableLocalTSO { if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil { From 0402e15a5f2c727f99c7923cb1db944612c39c28 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 22 Oct 2024 14:58:56 +0800 Subject: [PATCH 2/4] *: move tso to independent thread (#8720) ref tikv/pd#8477 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 4 +- client/pd_service_discovery.go | 3 + client/tso_service_discovery.go | 3 + server/cluster/cluster.go | 94 ++++++++++++++++++---- server/server.go | 26 +----- tests/integrations/tso/client_test.go | 4 +- tests/integrations/tso/consistency_test.go | 1 + tests/integrations/tso/server_test.go | 1 + tests/server/cluster/cluster_test.go | 6 +- tests/server/tso/allocator_test.go | 3 + tests/server/tso/global_tso_test.go | 3 + tests/server/tso/tso_test.go | 4 +- tools/pd-backup/tests/backup_test.go | 2 + 13 files changed, 105 insertions(+), 49 deletions(-) diff --git a/client/client.go b/client/client.go index f8c8d32cee8..9ced7284153 100644 --- a/client/client.go +++ b/client/client.go @@ -206,9 +206,7 @@ func (k *serviceModeKeeper) close() { k.tsoSvcDiscovery.Close() fallthrough case pdpb.ServiceMode_PD_SVC_MODE: - if k.tsoClient != nil { - k.tsoClient.close() - } + k.tsoClient.close() case pdpb.ServiceMode_UNKNOWN_SVC_MODE: } } diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index f42ae7fea4a..83bc8e612a3 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -634,6 +634,9 @@ func (c *pdServiceDiscovery) checkFollowerHealth(ctx context.Context) { // Close releases all resources. func (c *pdServiceDiscovery) Close() { + if c == nil { + return + } c.closeOnce.Do(func() { log.Info("[pd] close pd service discovery client") c.clientConns.Range(func(key, cc any) bool { diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 617b709ca76..0380ddb4c28 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -226,6 +226,9 @@ func (c *tsoServiceDiscovery) retry( // Close releases all resources func (c *tsoServiceDiscovery) Close() { + if c == nil { + return + } log.Info("closing tso service discovery") c.cancel() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 4cce39fa093..3869308d9dc 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -17,6 +17,7 @@ package cluster import ( "context" "encoding/json" + errorspkg "errors" "fmt" "io" "math" @@ -43,6 +44,7 @@ import ( "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/utils/constant" + "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/memory" "github.com/tikv/pd/pkg/progress" "github.com/tikv/pd/pkg/ratelimit" @@ -56,6 +58,7 @@ import ( "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/syncer" + "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/unsaferecovery" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/keypath" @@ -88,12 +91,13 @@ const ( // nodeStateCheckJobInterval is the interval to run node state check job. nodeStateCheckJobInterval = 10 * time.Second // metricsCollectionJobInterval is the interval to run metrics collection job. - metricsCollectionJobInterval = 10 * time.Second - updateStoreStatsInterval = 9 * time.Millisecond - clientTimeout = 3 * time.Second - defaultChangedRegionsLimit = 10000 - gcTombstoneInterval = 30 * 24 * time.Hour - serviceCheckInterval = 10 * time.Second + metricsCollectionJobInterval = 10 * time.Second + updateStoreStatsInterval = 9 * time.Millisecond + clientTimeout = 3 * time.Second + defaultChangedRegionsLimit = 10000 + gcTombstoneInterval = 30 * 24 * time.Hour + schedulingServiceCheckInterval = 10 * time.Second + tsoServiceCheckInterval = 100 * time.Millisecond // persistLimitRetryTimes is used to reduce the probability of the persistent error // since the once the store is added or removed, we shouldn't return an error even if the store limit is failed to persist. persistLimitRetryTimes = 5 @@ -144,6 +148,7 @@ type RaftCluster struct { cancel context.CancelFunc *core.BasicCluster // cached cluster info + member *member.EmbeddedEtcdMember etcdClient *clientv3.Client httpClient *http.Client @@ -174,6 +179,7 @@ type RaftCluster struct { keyspaceGroupManager *keyspace.GroupManager independentServices sync.Map hbstreams *hbstream.HeartbeatStreams + tsoAllocator *tso.AllocatorManager // heartbeatRunner is used to process the subtree update task asynchronously. heartbeatRunner ratelimit.Runner @@ -194,16 +200,18 @@ type Status struct { } // NewRaftCluster create a new cluster. -func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, - httpClient *http.Client) *RaftCluster { +func NewRaftCluster(ctx context.Context, clusterID uint64, member *member.EmbeddedEtcdMember, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, + httpClient *http.Client, tsoAllocator *tso.AllocatorManager) *RaftCluster { return &RaftCluster{ serverCtx: ctx, clusterID: clusterID, + member: member, regionSyncer: regionSyncer, httpClient: httpClient, etcdClient: etcdClient, BasicCluster: basicCluster, storage: storage, + tsoAllocator: tsoAllocator, heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), @@ -314,11 +322,13 @@ func (c *RaftCluster) Start(s Server) error { if err != nil { return err } + c.checkTSOService() cluster, err := c.LoadClusterInfo() if err != nil { return err } if cluster == nil { + log.Warn("cluster is not bootstrapped") return nil } @@ -351,7 +361,7 @@ func (c *RaftCluster) Start(s Server) error { return err } } - c.checkServices() + c.checkSchedulingService() c.wg.Add(9) go c.runServiceCheckJob() go c.runMetricsCollectionJob() @@ -370,7 +380,7 @@ func (c *RaftCluster) Start(s Server) error { return nil } -func (c *RaftCluster) checkServices() { +func (c *RaftCluster) checkSchedulingService() { if c.isAPIServiceMode { servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), constant.SchedulingServiceName) if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) { @@ -390,25 +400,76 @@ func (c *RaftCluster) checkServices() { } } +// checkTSOService checks the TSO service. +func (c *RaftCluster) checkTSOService() { + if c.isAPIServiceMode { + return + } + + if err := c.startTSOJobs(); err != nil { + // If there is an error, need to wait for the next check. + log.Error("failed to start TSO jobs", errs.ZapError(err)) + return + } +} + func (c *RaftCluster) runServiceCheckJob() { defer logutil.LogPanic() defer c.wg.Done() - ticker := time.NewTicker(serviceCheckInterval) + schedulingTicker := time.NewTicker(schedulingServiceCheckInterval) failpoint.Inject("highFrequencyClusterJobs", func() { - ticker.Reset(time.Millisecond) + schedulingTicker.Reset(time.Millisecond) }) - defer ticker.Stop() + defer schedulingTicker.Stop() for { select { case <-c.ctx.Done(): log.Info("service check job is stopped") return - case <-ticker.C: - c.checkServices() + case <-schedulingTicker.C: + c.checkSchedulingService() + } + } +} + +func (c *RaftCluster) startTSOJobs() error { + allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err != nil { + log.Error("failed to get global TSO allocator", errs.ZapError(err)) + return err + } + if !allocator.IsInitialize() { + log.Info("initializing the global TSO allocator") + if err := allocator.Initialize(0); err != nil { + log.Error("failed to initialize the global TSO allocator", errs.ZapError(err)) + return err } } + return nil +} + +func (c *RaftCluster) stopTSOJobs() error { + allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err != nil { + log.Error("failed to get global TSO allocator", errs.ZapError(err)) + return err + } + if allocator.IsInitialize() { + c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true) + failpoint.Inject("updateAfterResetTSO", func() { + allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) { + log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err)) + } + if allocator.IsInitialize() { + log.Panic("the allocator should be uninitialized after reset") + } + }) + } + + return nil } // startGCTuner @@ -757,6 +818,9 @@ func (c *RaftCluster) Stop() { if !c.IsServiceIndependent(constant.SchedulingServiceName) { c.stopSchedulingJobs() } + if err := c.stopTSOJobs(); err != nil { + log.Error("failed to stop tso jobs", errs.ZapError(err)) + } c.heartbeatRunner.Stop() c.miscRunner.Stop() c.logRunner.Stop() diff --git a/server/server.go b/server/server.go index 9691633bae2..26f8ebb614c 100644 --- a/server/server.go +++ b/server/server.go @@ -17,7 +17,6 @@ package server import ( "bytes" "context" - errorspkg "errors" "fmt" "math/rand" "net/http" @@ -490,7 +489,7 @@ func (s *Server) startServer(ctx context.Context) error { s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg) s.basicCluster = core.NewBasicCluster() - s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient) + s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager) keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{ Client: s.client, RootPath: s.rootPath, @@ -1715,29 +1714,6 @@ func (s *Server) campaignLeader() { s.member.KeepLeader(ctx) log.Info(fmt.Sprintf("campaign %s leader ok", s.mode), zap.String("campaign-leader-name", s.Name())) - if !s.IsAPIServiceMode() { - allocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation) - if err != nil { - log.Error("failed to get the global TSO allocator", errs.ZapError(err)) - return - } - log.Info("initializing the global TSO allocator") - if err := allocator.Initialize(0); err != nil { - log.Error("failed to initialize the global TSO allocator", errs.ZapError(err)) - return - } - defer func() { - s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation, false) - failpoint.Inject("updateAfterResetTSO", func() { - if err = allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) { - log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err)) - } - if allocator.IsInitialize() { - log.Panic("the allocator should be uninitialized after reset") - } - }) - }() - } if err := s.reloadConfigFromKV(); err != nil { log.Error("failed to reload configuration", errs.ZapError(err)) return diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index a669e093200..d1a649cbfa6 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -339,12 +339,12 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() { return err == nil }) // Resign leader to trigger the TSO resetting. - re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/updateAfterResetTSO", "return(true)")) oldLeaderName := suite.cluster.WaitLeader() re.NotEmpty(oldLeaderName) err := suite.cluster.GetServer(oldLeaderName).ResignLeader() re.NoError(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/updateAfterResetTSO")) newLeaderName := suite.cluster.WaitLeader() re.NotEmpty(newLeaderName) re.NotEqual(oldLeaderName, newLeaderName) diff --git a/tests/integrations/tso/consistency_test.go b/tests/integrations/tso/consistency_test.go index 9ebe6dec8af..f82f58ee6c8 100644 --- a/tests/integrations/tso/consistency_test.go +++ b/tests/integrations/tso/consistency_test.go @@ -81,6 +81,7 @@ func (suite *tsoConsistencyTestSuite) SetupSuite() { leaderName := suite.cluster.WaitLeader() re.NotEmpty(leaderName) suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + suite.pdLeaderServer.BootstrapCluster() backendEndpoints := suite.pdLeaderServer.GetAddr() if suite.legacy { suite.pdClient = tu.MustNewGrpcClient(re, backendEndpoints) diff --git a/tests/integrations/tso/server_test.go b/tests/integrations/tso/server_test.go index 5e14611ab65..651a1df96b4 100644 --- a/tests/integrations/tso/server_test.go +++ b/tests/integrations/tso/server_test.go @@ -79,6 +79,7 @@ func (suite *tsoServerTestSuite) SetupSuite() { leaderName := suite.cluster.WaitLeader() re.NotEmpty(leaderName) suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + suite.pdLeaderServer.BootstrapCluster() backendEndpoints := suite.pdLeaderServer.GetAddr() if suite.legacy { suite.pdClient = tu.MustNewGrpcClient(re, backendEndpoints) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index a9be92d19e9..e1a56982f2d 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -912,7 +912,7 @@ func TestLoadClusterInfo(t *testing.T) { tc.WaitLeader() leaderServer := tc.GetLeaderServer() svr := leaderServer.GetServer() - rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) + rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager()) // Cluster is not bootstrapped. rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) @@ -952,7 +952,7 @@ func TestLoadClusterInfo(t *testing.T) { } re.NoError(testStorage.Flush()) - raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) + raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager()) raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) raftCluster, err = raftCluster.LoadClusterInfo() re.NoError(err) @@ -1666,7 +1666,7 @@ func TestTransferLeaderBack(t *testing.T) { tc.WaitLeader() leaderServer := tc.GetLeaderServer() svr := leaderServer.GetServer() - rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) + rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager()) rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) storage := rc.GetStorage() meta := &metapb.Cluster{Id: 123} diff --git a/tests/server/tso/allocator_test.go b/tests/server/tso/allocator_test.go index 692aec490eb..257cd3b6a34 100644 --- a/tests/server/tso/allocator_test.go +++ b/tests/server/tso/allocator_test.go @@ -127,6 +127,9 @@ func TestPriorityAndDifferentLocalTSO(t *testing.T) { re.NoError(cluster.RunInitialServers()) cluster.WaitAllLeaders(re, dcLocationConfig) + leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + leaderServer.BootstrapCluster() // Wait for all nodes becoming healthy. time.Sleep(time.Second * 5) diff --git a/tests/server/tso/global_tso_test.go b/tests/server/tso/global_tso_test.go index d8f64afe871..c340c44d3d2 100644 --- a/tests/server/tso/global_tso_test.go +++ b/tests/server/tso/global_tso_test.go @@ -99,6 +99,7 @@ func TestDelaySyncTimestamp(t *testing.T) { var leaderServer, nextLeaderServer *tests.TestServer leaderServer = cluster.GetLeaderServer() re.NotNil(leaderServer) + leaderServer.BootstrapCluster() for _, s := range cluster.GetServers() { if s.GetConfig().Name != cluster.GetLeader() { nextLeaderServer = s @@ -146,6 +147,8 @@ func TestLogicalOverflow(t *testing.T) { re.NotEmpty(cluster.WaitLeader()) leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + leaderServer.BootstrapCluster() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() diff --git a/tests/server/tso/tso_test.go b/tests/server/tso/tso_test.go index 5be37e293cf..fc2f5999840 100644 --- a/tests/server/tso/tso_test.go +++ b/tests/server/tso/tso_test.go @@ -114,6 +114,8 @@ func TestDisableLocalTSOAfterEnabling(t *testing.T) { re.NoError(cluster.RunInitialServers()) cluster.WaitAllLeaders(re, dcLocationConfig) + leaderServer := cluster.GetLeaderServer() + leaderServer.BootstrapCluster() requestLocalTSOs(re, cluster, dcLocationConfig) // Reboot the cluster. @@ -125,7 +127,7 @@ func TestDisableLocalTSOAfterEnabling(t *testing.T) { re.NotEmpty(cluster.WaitLeader()) // Re-request the global TSOs. - leaderServer := cluster.GetLeaderServer() + leaderServer = cluster.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() req := &pdpb.TsoRequest{ diff --git a/tools/pd-backup/tests/backup_test.go b/tools/pd-backup/tests/backup_test.go index 2a55a790849..05d2b7b92ed 100644 --- a/tools/pd-backup/tests/backup_test.go +++ b/tools/pd-backup/tests/backup_test.go @@ -37,6 +37,8 @@ func TestBackup(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) re.NotEmpty(cluster.WaitLeader()) + leaderServer := cluster.GetLeaderServer() + leaderServer.BootstrapCluster() pdAddr := cluster.GetConfig().GetClientURL() urls := strings.Split(pdAddr, ",") defer cluster.Destroy() From b155a7b2e29449cd8216233debbe21c9c5c3c14f Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Tue, 22 Oct 2024 15:31:38 +0800 Subject: [PATCH 3/4] realclustertest: apply new style for some test cases (#8732) ref tikv/pd#8683 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- .../integrations/realcluster/real_cluster.go | 58 ++- .../realcluster/reboot_pd_test.go | 105 +++--- .../realcluster/scheduler_test.go | 341 ++++++++++-------- tests/integrations/realcluster/ts_test.go | 65 ++-- tests/integrations/realcluster/util.go | 5 - 5 files changed, 314 insertions(+), 260 deletions(-) diff --git a/tests/integrations/realcluster/real_cluster.go b/tests/integrations/realcluster/real_cluster.go index 21284f285b0..1843b78a528 100644 --- a/tests/integrations/realcluster/real_cluster.go +++ b/tests/integrations/realcluster/real_cluster.go @@ -19,6 +19,7 @@ import ( "os" "os/exec" "path/filepath" + "strings" "testing" "time" @@ -37,17 +38,9 @@ type realClusterSuite struct { var ( playgroundLogDir = filepath.Join("tmp", "real_cluster", "playground") - tiupBin string + tiupBin = os.Getenv("HOME") + "/.tiup/bin/tiup" ) -func init() { - var err error - tiupBin, err = exec.LookPath("tiup") - if err != nil { - panic(err) - } -} - // SetupSuite will run before the tests in the suite are run. func (s *realClusterSuite) SetupSuite() { t := s.T() @@ -78,7 +71,9 @@ func (s *realClusterSuite) TearDownSuite() { func (s *realClusterSuite) startRealCluster(t *testing.T) { log.Info("start to deploy a real cluster") - s.deploy(t) + tag := s.tag() + deployTiupPlayground(t, tag) + waitTiupReady(t, tag) s.clusterCnt++ } @@ -94,33 +89,26 @@ func (s *realClusterSuite) tag() string { return fmt.Sprintf("pd_real_cluster_test_%s_%d", s.suiteName, s.clusterCnt) } -// func restartTiUP() { -// log.Info("start to restart TiUP") -// cmd := exec.Command("make", "deploy") -// cmd.Stdout = os.Stdout -// cmd.Stderr = os.Stderr -// err := cmd.Run() -// if err != nil { -// panic(err) -// } -// log.Info("TiUP restart success") -// } - -func (s *realClusterSuite) deploy(t *testing.T) { +func (s *realClusterSuite) restart() { tag := s.tag() - deployTiupPlayground(t, tag) - waitTiupReady(t, tag) + log.Info("start to restart", zap.String("tag", tag)) + s.stopRealCluster(s.T()) + s.startRealCluster(s.T()) + log.Info("TiUP restart success") } func destroy(t *testing.T, tag string) { - cmdStr := fmt.Sprintf("ps -ef | grep 'tiup playground' | grep %s | awk '{print $2}' | head -n 1", tag) + cmdStr := fmt.Sprintf("ps -ef | grep %s | awk '{print $2}'", tag) cmd := exec.Command("sh", "-c", cmdStr) bytes, err := cmd.Output() require.NoError(t, err) - pid := string(bytes) - // nolint:errcheck - runCommand("sh", "-c", "kill -9 "+pid) - log.Info("destroy success", zap.String("pid", pid)) + pids := string(bytes) + pidArr := strings.Split(pids, "\n") + for _, pid := range pidArr { + // nolint:errcheck + runCommand("sh", "-c", "kill -9 "+pid) + } + log.Info("destroy success", zap.String("tag", tag)) } func deployTiupPlayground(t *testing.T, tag string) { @@ -146,11 +134,11 @@ func deployTiupPlayground(t *testing.T, tag string) { go func() { runCommand("sh", "-c", tiupBin+` playground nightly --kv 3 --tiflash 1 --db 1 --pd 3 \ - --without-monitor --tag `+tag+` --pd.binpath ./bin/pd-server \ - --kv.binpath ./third_bin/tikv-server \ - --db.binpath ./third_bin/tidb-server --tiflash.binpath ./third_bin/tiflash \ - --pd.config ./tests/integrations/realcluster/pd.toml \ - > `+filepath.Join(playgroundLogDir, tag+".log")+` 2>&1 & `) + --without-monitor --tag `+tag+` --pd.binpath ./bin/pd-server \ + --kv.binpath ./third_bin/tikv-server \ + --db.binpath ./third_bin/tidb-server --tiflash.binpath ./third_bin/tiflash \ + --pd.config ./tests/integrations/realcluster/pd.toml \ + > `+filepath.Join(playgroundLogDir, tag+".log")+` 2>&1 & `) }() // Avoid to change the dir before execute `tiup playground`. diff --git a/tests/integrations/realcluster/reboot_pd_test.go b/tests/integrations/realcluster/reboot_pd_test.go index 50b4bee2055..e3f37ac0605 100644 --- a/tests/integrations/realcluster/reboot_pd_test.go +++ b/tests/integrations/realcluster/reboot_pd_test.go @@ -14,45 +14,68 @@ package realcluster +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/http" +) + +type rebootPDSuite struct { + realClusterSuite +} + +func TestRebootPD(t *testing.T) { + suite.Run(t, &rebootPDSuite{ + realClusterSuite: realClusterSuite{ + suiteName: "reboot_pd", + }, + }) +} + // https://github.com/tikv/pd/issues/6467 -// func TestReloadLabel(t *testing.T) { -// re := require.New(t) -// ctx := context.Background() - -// resp, err := pdHTTPCli.GetStores(ctx) -// re.NoError(err) -// re.NotEmpty(resp.Stores) -// firstStore := resp.Stores[0] -// // TiFlash labels will be ["engine": "tiflash"] -// // So we need to merge the labels -// storeLabels := map[string]string{ -// "zone": "zone1", -// } -// for _, label := range firstStore.Store.Labels { -// storeLabels[label.Key] = label.Value -// } -// re.NoError(pdHTTPCli.SetStoreLabels(ctx, firstStore.Store.ID, storeLabels)) -// defer func() { -// re.NoError(pdHTTPCli.DeleteStoreLabel(ctx, firstStore.Store.ID, "zone")) -// }() - -// checkLabelsAreEqual := func() { -// resp, err := pdHTTPCli.GetStore(ctx, uint64(firstStore.Store.ID)) -// re.NoError(err) - -// labelsMap := make(map[string]string) -// for _, label := range resp.Store.Labels { -// re.NotNil(label) -// labelsMap[label.Key] = label.Value -// } - -// for key, value := range storeLabels { -// re.Equal(value, labelsMap[key]) -// } -// } -// // Check the label is set -// checkLabelsAreEqual() -// // Restart TiUP to reload the label -// restartTiUP() -// checkLabelsAreEqual() -// } +func (s *rebootPDSuite) TestReloadLabel() { + re := require.New(s.T()) + ctx := context.Background() + + pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) + resp, err := pdHTTPCli.GetStores(ctx) + re.NoError(err) + re.NotEmpty(resp.Stores) + firstStore := resp.Stores[0] + // TiFlash labels will be ["engine": "tiflash"] + // So we need to merge the labels + storeLabels := map[string]string{ + "zone": "zone1", + } + for _, label := range firstStore.Store.Labels { + storeLabels[label.Key] = label.Value + } + re.NoError(pdHTTPCli.SetStoreLabels(ctx, firstStore.Store.ID, storeLabels)) + defer func() { + re.NoError(pdHTTPCli.DeleteStoreLabel(ctx, firstStore.Store.ID, "zone")) + }() + + checkLabelsAreEqual := func() { + resp, err := pdHTTPCli.GetStore(ctx, uint64(firstStore.Store.ID)) + re.NoError(err) + + labelsMap := make(map[string]string) + for _, label := range resp.Store.Labels { + re.NotNil(label) + labelsMap[label.Key] = label.Value + } + + for key, value := range storeLabels { + re.Equal(value, labelsMap[key]) + } + } + // Check the label is set + checkLabelsAreEqual() + // Restart to reload the label + s.restart() + pdHTTPCli = http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) + checkLabelsAreEqual() +} diff --git a/tests/integrations/realcluster/scheduler_test.go b/tests/integrations/realcluster/scheduler_test.go index 7e5087627fb..69da846b491 100644 --- a/tests/integrations/realcluster/scheduler_test.go +++ b/tests/integrations/realcluster/scheduler_test.go @@ -14,161 +14,190 @@ package realcluster +import ( + "context" + "fmt" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/testutil" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/types" +) + +type schedulerSuite struct { + realClusterSuite +} + +func TestScheduler(t *testing.T) { + suite.Run(t, &schedulerSuite{ + realClusterSuite: realClusterSuite{ + suiteName: "scheduler", + }, + }) +} + // https://github.com/tikv/pd/issues/6988#issuecomment-1694924611 // https://github.com/tikv/pd/issues/6897 -// func TestTransferLeader(t *testing.T) { -// re := require.New(t) -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() - -// resp, err := pdHTTPCli.GetLeader(ctx) -// re.NoError(err) -// oldLeader := resp.Name - -// var newLeader string -// for i := 0; i < 2; i++ { -// if resp.Name != fmt.Sprintf("pd-%d", i) { -// newLeader = fmt.Sprintf("pd-%d", i) -// } -// } - -// // record scheduler -// re.NoError(pdHTTPCli.CreateScheduler(ctx, types.EvictLeaderScheduler.String(), 1)) -// defer func() { -// re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String())) -// }() -// res, err := pdHTTPCli.GetSchedulers(ctx) -// re.NoError(err) -// oldSchedulersLen := len(res) - -// re.NoError(pdHTTPCli.TransferLeader(ctx, newLeader)) -// // wait for transfer leader to new leader -// time.Sleep(1 * time.Second) -// resp, err = pdHTTPCli.GetLeader(ctx) -// re.NoError(err) -// re.Equal(newLeader, resp.Name) - -// res, err = pdHTTPCli.GetSchedulers(ctx) -// re.NoError(err) -// re.Len(res, oldSchedulersLen) - -// // transfer leader to old leader -// re.NoError(pdHTTPCli.TransferLeader(ctx, oldLeader)) -// // wait for transfer leader -// time.Sleep(1 * time.Second) -// resp, err = pdHTTPCli.GetLeader(ctx) -// re.NoError(err) -// re.Equal(oldLeader, resp.Name) - -// res, err = pdHTTPCli.GetSchedulers(ctx) -// re.NoError(err) -// re.Len(res, oldSchedulersLen) -// } - -// func TestRegionLabelDenyScheduler(t *testing.T) { -// re := require.New(t) -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() - -// regions, err := pdHTTPCli.GetRegions(ctx) -// re.NoError(err) -// re.NotEmpty(regions.Regions) -// region1 := regions.Regions[0] - -// err = pdHTTPCli.DeleteScheduler(ctx, types.BalanceLeaderScheduler.String()) -// if err == nil { -// defer func() { -// pdHTTPCli.CreateScheduler(ctx, types.BalanceLeaderScheduler.String(), 0) -// }() -// } - -// re.NoError(pdHTTPCli.CreateScheduler(ctx, types.GrantLeaderScheduler.String(), uint64(region1.Leader.StoreID))) -// defer func() { -// pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String()) -// }() - -// // wait leader transfer -// testutil.Eventually(re, func() bool { -// regions, err := pdHTTPCli.GetRegions(ctx) -// re.NoError(err) -// for _, region := range regions.Regions { -// if region.Leader.StoreID != region1.Leader.StoreID { -// return false -// } -// } -// return true -// }, testutil.WithWaitFor(time.Minute)) - -// // disable schedule for region1 -// labelRule := &pd.LabelRule{ -// ID: "rule1", -// Labels: []pd.RegionLabel{{Key: "schedule", Value: "deny"}}, -// RuleType: "key-range", -// Data: labeler.MakeKeyRanges(region1.StartKey, region1.EndKey), -// } -// re.NoError(pdHTTPCli.SetRegionLabelRule(ctx, labelRule)) -// defer func() { -// pdHTTPCli.PatchRegionLabelRules(ctx, &pd.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) -// }() -// labelRules, err := pdHTTPCli.GetAllRegionLabelRules(ctx) -// re.NoError(err) -// re.Len(labelRules, 2) -// sort.Slice(labelRules, func(i, j int) bool { -// return labelRules[i].ID < labelRules[j].ID -// }) -// re.Equal(labelRule.ID, labelRules[1].ID) -// re.Equal(labelRule.Labels, labelRules[1].Labels) -// re.Equal(labelRule.RuleType, labelRules[1].RuleType) - -// // enable evict leader scheduler, and check it works -// re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String())) -// re.NoError(pdHTTPCli.CreateScheduler(ctx, types.EvictLeaderScheduler.String(), uint64(region1.Leader.StoreID))) -// defer func() { -// pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String()) -// }() -// testutil.Eventually(re, func() bool { -// regions, err := pdHTTPCli.GetRegions(ctx) -// re.NoError(err) -// for _, region := range regions.Regions { -// if region.Leader.StoreID == region1.Leader.StoreID { -// return false -// } -// } -// return true -// }, testutil.WithWaitFor(time.Minute)) - -// re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String())) -// re.NoError(pdHTTPCli.CreateScheduler(ctx, types.GrantLeaderScheduler.String(), uint64(region1.Leader.StoreID))) -// defer func() { -// pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String()) -// }() -// testutil.Eventually(re, func() bool { -// regions, err := pdHTTPCli.GetRegions(ctx) -// re.NoError(err) -// for _, region := range regions.Regions { -// if region.ID == region1.ID { -// continue -// } -// if region.Leader.StoreID != region1.Leader.StoreID { -// return false -// } -// } -// return true -// }, testutil.WithWaitFor(time.Minute)) - -// pdHTTPCli.PatchRegionLabelRules(ctx, &pd.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) -// labelRules, err = pdHTTPCli.GetAllRegionLabelRules(ctx) -// re.NoError(err) -// re.Len(labelRules, 1) - -// testutil.Eventually(re, func() bool { -// regions, err := pdHTTPCli.GetRegions(ctx) -// re.NoError(err) -// for _, region := range regions.Regions { -// if region.Leader.StoreID != region1.Leader.StoreID { -// return false -// } -// } -// return true -// }, testutil.WithWaitFor(time.Minute)) -// } +func (s *schedulerSuite) TestTransferLeader() { + re := require.New(s.T()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) + resp, err := pdHTTPCli.GetLeader(ctx) + re.NoError(err) + oldLeader := resp.Name + + var newLeader string + for i := 0; i < 2; i++ { + if resp.Name != fmt.Sprintf("pd-%d", i) { + newLeader = fmt.Sprintf("pd-%d", i) + } + } + + // record scheduler + re.NoError(pdHTTPCli.CreateScheduler(ctx, types.EvictLeaderScheduler.String(), 1)) + defer func() { + re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String())) + }() + res, err := pdHTTPCli.GetSchedulers(ctx) + re.NoError(err) + oldSchedulersLen := len(res) + + re.NoError(pdHTTPCli.TransferLeader(ctx, newLeader)) + // wait for transfer leader to new leader + time.Sleep(1 * time.Second) + resp, err = pdHTTPCli.GetLeader(ctx) + re.NoError(err) + re.Equal(newLeader, resp.Name) + + res, err = pdHTTPCli.GetSchedulers(ctx) + re.NoError(err) + re.Len(res, oldSchedulersLen) + + // transfer leader to old leader + re.NoError(pdHTTPCli.TransferLeader(ctx, oldLeader)) + // wait for transfer leader + time.Sleep(1 * time.Second) + resp, err = pdHTTPCli.GetLeader(ctx) + re.NoError(err) + re.Equal(oldLeader, resp.Name) + + res, err = pdHTTPCli.GetSchedulers(ctx) + re.NoError(err) + re.Len(res, oldSchedulersLen) +} + +func (s *schedulerSuite) TestRegionLabelDenyScheduler() { + re := require.New(s.T()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pdHTTPCli := http.NewClient("pd-real-cluster-test", getPDEndpoints(s.T())) + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + re.NotEmpty(regions.Regions) + region1 := regions.Regions[0] + + err = pdHTTPCli.DeleteScheduler(ctx, types.BalanceLeaderScheduler.String()) + if err == nil { + defer func() { + pdHTTPCli.CreateScheduler(ctx, types.BalanceLeaderScheduler.String(), 0) + }() + } + + re.NoError(pdHTTPCli.CreateScheduler(ctx, types.GrantLeaderScheduler.String(), uint64(region1.Leader.StoreID))) + defer func() { + pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String()) + }() + + // wait leader transfer + testutil.Eventually(re, func() bool { + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + for _, region := range regions.Regions { + if region.Leader.StoreID != region1.Leader.StoreID { + return false + } + } + return true + }, testutil.WithWaitFor(time.Minute)) + + // disable schedule for region1 + labelRule := &http.LabelRule{ + ID: "rule1", + Labels: []http.RegionLabel{{Key: "schedule", Value: "deny"}}, + RuleType: "key-range", + Data: labeler.MakeKeyRanges(region1.StartKey, region1.EndKey), + } + re.NoError(pdHTTPCli.SetRegionLabelRule(ctx, labelRule)) + defer func() { + pdHTTPCli.PatchRegionLabelRules(ctx, &http.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) + }() + labelRules, err := pdHTTPCli.GetAllRegionLabelRules(ctx) + re.NoError(err) + re.Len(labelRules, 2) + sort.Slice(labelRules, func(i, j int) bool { + return labelRules[i].ID < labelRules[j].ID + }) + re.Equal(labelRule.ID, labelRules[1].ID) + re.Equal(labelRule.Labels, labelRules[1].Labels) + re.Equal(labelRule.RuleType, labelRules[1].RuleType) + + // enable evict leader scheduler, and check it works + re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String())) + re.NoError(pdHTTPCli.CreateScheduler(ctx, types.EvictLeaderScheduler.String(), uint64(region1.Leader.StoreID))) + defer func() { + pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String()) + }() + testutil.Eventually(re, func() bool { + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + for _, region := range regions.Regions { + if region.Leader.StoreID == region1.Leader.StoreID { + return false + } + } + return true + }, testutil.WithWaitFor(time.Minute)) + + re.NoError(pdHTTPCli.DeleteScheduler(ctx, types.EvictLeaderScheduler.String())) + re.NoError(pdHTTPCli.CreateScheduler(ctx, types.GrantLeaderScheduler.String(), uint64(region1.Leader.StoreID))) + defer func() { + pdHTTPCli.DeleteScheduler(ctx, types.GrantLeaderScheduler.String()) + }() + testutil.Eventually(re, func() bool { + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + for _, region := range regions.Regions { + if region.ID == region1.ID { + continue + } + if region.Leader.StoreID != region1.Leader.StoreID { + return false + } + } + return true + }, testutil.WithWaitFor(time.Minute)) + + pdHTTPCli.PatchRegionLabelRules(ctx, &http.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) + labelRules, err = pdHTTPCli.GetAllRegionLabelRules(ctx) + re.NoError(err) + re.Len(labelRules, 1) + + testutil.Eventually(re, func() bool { + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + for _, region := range regions.Regions { + if region.Leader.StoreID != region1.Leader.StoreID { + return false + } + } + return true + }, testutil.WithWaitFor(time.Minute)) +} diff --git a/tests/integrations/realcluster/ts_test.go b/tests/integrations/realcluster/ts_test.go index 156e3d63e71..f19124d04a4 100644 --- a/tests/integrations/realcluster/ts_test.go +++ b/tests/integrations/realcluster/ts_test.go @@ -14,26 +14,45 @@ package realcluster -// func TestTS(t *testing.T) { -// re := require.New(t) - -// db := OpenTestDB(t) -// db.MustExec("use test") -// db.MustExec("drop table if exists t") -// db.MustExec("create table t(a int, index i(a))") -// db.MustExec("insert t values (1), (2), (3)") -// var rows int -// err := db.inner.Raw("select count(*) from t").Row().Scan(&rows) -// re.NoError(err) -// re.Equal(3, rows) - -// re.NoError(err) -// re.Equal(3, rows) - -// var ts uint64 -// err = db.inner.Begin().Raw("select @@tidb_current_ts").Scan(&ts).Rollback().Error -// re.NoError(err) -// re.NotEqual(0, GetTimeFromTS(ts)) - -// db.MustClose() -// } +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type tsSuite struct { + realClusterSuite +} + +func TestTS(t *testing.T) { + suite.Run(t, &tsSuite{ + realClusterSuite: realClusterSuite{ + suiteName: "ts", + }, + }) +} + +func (s *tsSuite) TestTS() { + re := require.New(s.T()) + + db := OpenTestDB(s.T()) + db.MustExec("use test") + db.MustExec("drop table if exists t") + db.MustExec("create table t(a int, index i(a))") + db.MustExec("insert t values (1), (2), (3)") + var rows int + err := db.inner.Raw("select count(*) from t").Row().Scan(&rows) + re.NoError(err) + re.Equal(3, rows) + + re.NoError(err) + re.Equal(3, rows) + + var ts uint64 + err = db.inner.Begin().Raw("select @@tidb_current_ts").Scan(&ts).Rollback().Error + re.NoError(err) + re.NotEqual(0, GetTimeFromTS(ts)) + + db.MustClose() +} diff --git a/tests/integrations/realcluster/util.go b/tests/integrations/realcluster/util.go index 013c41da7f3..789ceaa29c2 100644 --- a/tests/integrations/realcluster/util.go +++ b/tests/integrations/realcluster/util.go @@ -22,11 +22,6 @@ import ( const physicalShiftBits = 18 -var ( -// pdAddrs = []string{"http://127.0.0.1:2379"} -// pdHTTPCli = http.NewClient("pd-real-cluster-test", pdAddrs) -) - // GetTimeFromTS extracts time.Time from a timestamp. func GetTimeFromTS(ts uint64) time.Time { ms := ExtractPhysical(ts) From d82e41d09f95b95642059a9a8a073ff2b368cae8 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 23 Oct 2024 09:31:29 +0800 Subject: [PATCH 4/4] statistics: rename `RegionStats` in hot statistics (#8740) ref tikv/pd#4399 Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/cluster.go | 25 ++++++++--------- pkg/mock/mockcluster/mockcluster.go | 16 ++++------- pkg/schedule/coordinator.go | 29 ++++---------------- pkg/schedule/schedulers/hot_region.go | 4 +-- pkg/schedule/schedulers/hot_region_test.go | 32 +++++++++++----------- pkg/statistics/hot_cache.go | 7 +++-- pkg/statistics/hot_peer_cache.go | 6 ++-- pkg/statistics/hot_peer_cache_test.go | 2 +- pkg/statistics/region_stat_informer.go | 8 ++---- server/cluster/cluster_test.go | 22 +++++++-------- server/cluster/scheduling_controller.go | 24 +++++++--------- 11 files changed, 71 insertions(+), 104 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 5885a9cdb84..66cf1e97518 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -184,21 +184,18 @@ func (c *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *sta return c.hotStat.GetHotPeerStat(rw, regionID, storeID) } -// RegionReadStats returns hot region's read stats. +// GetHotPeerStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. // The result only includes peers that are hot enough. -// RegionStats is a thread-safe method -func (c *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { - // As read stats are reported by store heartbeat, the threshold needs to be adjusted. - threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() * - (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) - return c.hotStat.RegionStats(utils.Read, threshold) -} - -// RegionWriteStats returns hot region's write stats. -// The result only includes peers that are hot enough. -func (c *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { - // RegionStats is a thread-safe method - return c.hotStat.RegionStats(utils.Write, c.persistConfig.GetHotRegionCacheHitsThreshold()) +// GetHotPeerStats is a thread-safe method. +func (c *Cluster) GetHotPeerStats(rw utils.RWType) map[uint64][]*statistics.HotPeerStat { + threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() + if rw == utils.Read { + // As read stats are reported by store heartbeat, the threshold needs to be adjusted. + threshold = c.persistConfig.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + } + return c.hotStat.GetHotPeerStats(rw, threshold) } // BucketsStats returns hot region's buckets stats. diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index bbd4fbb6811..8d7317f547b 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -148,13 +148,6 @@ func (mc *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *st return mc.HotCache.GetHotPeerStat(rw, regionID, storeID) } -// RegionReadStats returns hot region's read stats. -// The result only includes peers that are hot enough. -func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { - // We directly use threshold for read stats for mockCluster - return mc.HotCache.RegionStats(utils.Read, mc.GetHotRegionCacheHitsThreshold()) -} - // BucketsStats returns hot region's buckets stats. func (mc *Cluster) BucketsStats(degree int, regions ...uint64) map[uint64][]*buckets.BucketStat { task := buckets.NewCollectBucketStatsTask(degree, regions...) @@ -164,10 +157,11 @@ func (mc *Cluster) BucketsStats(degree int, regions ...uint64) map[uint64][]*buc return task.WaitRet(mc.ctx) } -// RegionWriteStats returns hot region's write stats. +// GetHotPeerStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. // The result only includes peers that are hot enough. -func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { - return mc.HotCache.RegionStats(utils.Write, mc.GetHotRegionCacheHitsThreshold()) +func (mc *Cluster) GetHotPeerStats(rw utils.RWType) map[uint64][]*statistics.HotPeerStat { + return mc.HotCache.GetHotPeerStats(rw, mc.GetHotRegionCacheHitsThreshold()) } // HotRegionsFromStore picks hot regions in specify store. @@ -185,7 +179,7 @@ func (mc *Cluster) HotRegionsFromStore(store uint64, kind utils.RWType) []*core. // hotRegionsFromStore picks hot region in specify store. func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind utils.RWType, minHotDegree int) []*statistics.HotPeerStat { - if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 { + if stats, ok := w.GetHotPeerStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 { return stats } return nil diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 2736c687fdb..344621c8b5b 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -432,16 +432,8 @@ func (c *Coordinator) GetHotRegionsByType(typ utils.RWType) *statistics.StoreHot isTraceFlow := c.cluster.GetSchedulerConfig().IsTraceRegionFlow() storeLoads := c.cluster.GetStoresLoads() stores := c.cluster.GetStores() - var infos *statistics.StoreHotPeersInfos - switch typ { - case utils.Write: - regionStats := c.cluster.RegionWriteStats() - infos = statistics.GetHotStatus(stores, storeLoads, regionStats, utils.Write, isTraceFlow) - case utils.Read: - regionStats := c.cluster.RegionReadStats() - infos = statistics.GetHotStatus(stores, storeLoads, regionStats, utils.Read, isTraceFlow) - default: - } + hotPeerStats := c.cluster.GetHotPeerStats(typ) + infos := statistics.GetHotStatus(stores, storeLoads, hotPeerStats, typ, isTraceFlow) // update params `IsLearner` and `LastUpdateTime` s := []statistics.StoreHotPeersStat{infos.AsLeader, infos.AsPeer} for i, stores := range s { @@ -505,20 +497,9 @@ func (c *Coordinator) CollectHotSpotMetrics() { } func collectHotMetrics(cluster sche.ClusterInformer, stores []*core.StoreInfo, typ utils.RWType) { - var ( - kind string - regionStats map[uint64][]*statistics.HotPeerStat - ) - - switch typ { - case utils.Read: - regionStats = cluster.RegionReadStats() - kind = utils.Read.String() - case utils.Write: - regionStats = cluster.RegionWriteStats() - kind = utils.Write.String() - } - status := statistics.CollectHotPeerInfos(stores, regionStats) // only returns TotalBytesRate,TotalKeysRate,TotalQueryRate,Count + kind := typ.String() + hotPeerStats := cluster.GetHotPeerStats(typ) + status := statistics.CollectHotPeerInfos(stores, hotPeerStats) // only returns TotalBytesRate,TotalKeysRate,TotalQueryRate,Count for _, s := range stores { // TODO: pre-allocate gauge metrics diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index eedbcfe4625..97a558c3fe4 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -130,7 +130,7 @@ func (s *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.Sche // update read statistics // avoid to update read statistics frequently if time.Since(s.updateReadTime) >= statisticsInterval { - regionRead := cluster.RegionReadStats() + regionRead := cluster.GetHotPeerStats(utils.Read) prepare(regionRead, utils.Read, constant.LeaderKind) prepare(regionRead, utils.Read, constant.RegionKind) s.updateReadTime = time.Now() @@ -139,7 +139,7 @@ func (s *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.Sche // update write statistics // avoid to update write statistics frequently if time.Since(s.updateWriteTime) >= statisticsInterval { - regionWrite := cluster.RegionWriteStats() + regionWrite := cluster.GetHotPeerStats(utils.Write) prepare(regionWrite, utils.Write, constant.LeaderKind) prepare(regionWrite, utils.Write, constant.RegionKind) s.updateWriteTime = time.Now() diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index a4b3225312d..195effaecab 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -1297,7 +1297,7 @@ func TestHotReadRegionScheduleByteRateOnly(t *testing.T) { r := tc.HotRegionsFromStore(2, utils.Read) re.Len(r, 3) // check hot items - stats := tc.HotCache.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats, 3) for _, ss := range stats { for _, s := range ss { @@ -1623,7 +1623,7 @@ func TestHotCacheUpdateCache(t *testing.T) { // lower than hot read flow rate, but higher than write flow rate {11, []uint64{1, 2, 3}, 7 * units.KiB, 0, 0}, }) - stats := tc.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 3) re.Len(stats[2], 3) re.Len(stats[3], 3) @@ -1632,7 +1632,7 @@ func TestHotCacheUpdateCache(t *testing.T) { {3, []uint64{2, 1, 3}, 20 * units.KiB, 0, 0}, {11, []uint64{1, 2, 3}, 7 * units.KiB, 0, 0}, }) - stats = tc.RegionStats(utils.Read, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 3) re.Len(stats[2], 3) re.Len(stats[3], 3) @@ -1642,7 +1642,7 @@ func TestHotCacheUpdateCache(t *testing.T) { {5, []uint64{1, 2, 3}, 20 * units.KiB, 0, 0}, {6, []uint64{1, 2, 3}, 0.8 * units.KiB, 0, 0}, }) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 2) re.Len(stats[2], 2) re.Len(stats[3], 2) @@ -1650,7 +1650,7 @@ func TestHotCacheUpdateCache(t *testing.T) { addRegionInfo(tc, utils.Write, []testRegionInfo{ {5, []uint64{1, 2, 5}, 20 * units.KiB, 0, 0}, }) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 2) re.Len(stats[2], 2) @@ -1665,7 +1665,7 @@ func TestHotCacheUpdateCache(t *testing.T) { // lower than hot read flow rate, but higher than write flow rate {31, []uint64{4, 5, 6}, 7 * units.KiB, 0, 0}, }) - stats = tc.RegionStats(utils.Read, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[4], 2) re.Len(stats[5], 1) re.Empty(stats[6]) @@ -1684,13 +1684,13 @@ func TestHotCacheKeyThresholds(t *testing.T) { {1, []uint64{1, 2, 3}, 0, 1, 0}, {2, []uint64{1, 2, 3}, 0, 1 * units.KiB, 0}, }) - stats := tc.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 1) addRegionInfo(tc, utils.Write, []testRegionInfo{ {3, []uint64{4, 5, 6}, 0, 1, 0}, {4, []uint64{4, 5, 6}, 0, 1 * units.KiB, 0}, }) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[4], 1) re.Len(stats[5], 1) re.Len(stats[6], 1) @@ -1716,7 +1716,7 @@ func TestHotCacheKeyThresholds(t *testing.T) { { // read addRegionInfo(tc, utils.Read, regions) - stats := tc.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Greater(len(stats[1]), 500) // for AntiCount @@ -1724,12 +1724,12 @@ func TestHotCacheKeyThresholds(t *testing.T) { addRegionInfo(tc, utils.Read, regions) addRegionInfo(tc, utils.Read, regions) addRegionInfo(tc, utils.Read, regions) - stats = tc.RegionStats(utils.Read, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 500) } { // write addRegionInfo(tc, utils.Write, regions) - stats := tc.RegionStats(utils.Write, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Greater(len(stats[1]), 500) re.Greater(len(stats[2]), 500) re.Greater(len(stats[3]), 500) @@ -1739,7 +1739,7 @@ func TestHotCacheKeyThresholds(t *testing.T) { addRegionInfo(tc, utils.Write, regions) addRegionInfo(tc, utils.Write, regions) addRegionInfo(tc, utils.Write, regions) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 500) re.Len(stats[2], 500) re.Len(stats[3], 500) @@ -1766,7 +1766,7 @@ func TestHotCacheByteAndKey(t *testing.T) { } { // read addRegionInfo(tc, utils.Read, regions) - stats := tc.RegionStats(utils.Read, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 500) addRegionInfo(tc, utils.Read, []testRegionInfo{ @@ -1775,12 +1775,12 @@ func TestHotCacheByteAndKey(t *testing.T) { {10003, []uint64{1, 2, 3}, 10 * units.KiB, 500 * units.KiB, 0}, {10004, []uint64{1, 2, 3}, 500 * units.KiB, 500 * units.KiB, 0}, }) - stats = tc.RegionStats(utils.Read, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Read, 0) re.Len(stats[1], 503) } { // write addRegionInfo(tc, utils.Write, regions) - stats := tc.RegionStats(utils.Write, 0) + stats := tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 500) re.Len(stats[2], 500) re.Len(stats[3], 500) @@ -1790,7 +1790,7 @@ func TestHotCacheByteAndKey(t *testing.T) { {10003, []uint64{1, 2, 3}, 10 * units.KiB, 500 * units.KiB, 0}, {10004, []uint64{1, 2, 3}, 500 * units.KiB, 500 * units.KiB, 0}, }) - stats = tc.RegionStats(utils.Write, 0) + stats = tc.HotCache.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 503) re.Len(stats[2], 503) re.Len(stats[3], 503) diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index 86f7d7d6b08..3c0e45e2199 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -76,11 +76,12 @@ func (w *HotCache) CheckReadAsync(task func(cache *HotPeerCache)) bool { } } -// RegionStats returns hot items according to kind -func (w *HotCache) RegionStats(kind utils.RWType, minHotDegree int) map[uint64][]*HotPeerStat { +// RegionStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. +func (w *HotCache) GetHotPeerStats(kind utils.RWType, minHotDegree int) map[uint64][]*HotPeerStat { ret := make(chan map[uint64][]*HotPeerStat, 1) collectRegionStatsTask := func(cache *HotPeerCache) { - ret <- cache.RegionStats(minHotDegree) + ret <- cache.GetHotPeerStats(minHotDegree) } var succ bool switch kind { diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 8d1f64ca540..89f767577bd 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -84,9 +84,9 @@ func NewHotPeerCache(ctx context.Context, kind utils.RWType) *HotPeerCache { } } -// TODO: rename RegionStats as PeerStats -// RegionStats returns hot items -func (f *HotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { +// GetHotPeerStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. +func (f *HotPeerCache) GetHotPeerStats(minHotDegree int) map[uint64][]*HotPeerStat { res := make(map[uint64][]*HotPeerStat) defaultAntiCount := f.kind.DefaultAntiCount() for storeID, peers := range f.peersOfStore { diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index ce4e352bc3d..38a185fa483 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -39,7 +39,7 @@ func TestStoreTimeUnsync(t *testing.T) { region := buildRegion(utils.Write, 3, interval) checkAndUpdate(re, cache, region, 3) { - stats := cache.RegionStats(0) + stats := cache.GetHotPeerStats(0) re.Len(stats, 3) for _, s := range stats { re.Len(s, 1) diff --git a/pkg/statistics/region_stat_informer.go b/pkg/statistics/region_stat_informer.go index 4fec5b4aacf..c91ba7be317 100644 --- a/pkg/statistics/region_stat_informer.go +++ b/pkg/statistics/region_stat_informer.go @@ -23,10 +23,8 @@ import ( type RegionStatInformer interface { GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *HotPeerStat IsRegionHot(region *core.RegionInfo) bool - // RegionWriteStats return the storeID -> write stat of peers on this store. + // GetHotPeerStats return the read or write statistics for hot regions. + // It returns a map where the keys are store IDs and the values are slices of HotPeerStat. // The result only includes peers that are hot enough. - RegionWriteStats() map[uint64][]*HotPeerStat - // RegionReadStats return the storeID -> read stat of peers on this store. - // The result only includes peers that are hot enough. - RegionReadStats() map[uint64][]*HotPeerStat + GetHotPeerStats(rw utils.RWType) map[uint64][]*HotPeerStat } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index a37f9718bdb..c83f485ad3d 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -161,7 +161,7 @@ func TestStoreHeartbeat(t *testing.T) { re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) re.Equal("v1", cluster.GetStore(1).GetStoreLimit().Version()) time.Sleep(20 * time.Millisecond) - storeStats := cluster.hotStat.RegionStats(utils.Read, 3) + storeStats := cluster.hotStat.GetHotPeerStats(utils.Read, 3) re.Len(storeStats[1], 1) re.Equal(uint64(1), storeStats[1][0].RegionID) interval := float64(hotHeartBeat.Interval.EndTimestamp - hotHeartBeat.Interval.StartTimestamp) @@ -169,15 +169,15 @@ func TestStoreHeartbeat(t *testing.T) { re.Equal(float64(hotHeartBeat.PeerStats[0].ReadBytes)/interval, storeStats[1][0].Loads[utils.ByteDim]) re.Equal(float64(hotHeartBeat.PeerStats[0].ReadKeys)/interval, storeStats[1][0].Loads[utils.KeyDim]) re.Equal(float64(hotHeartBeat.PeerStats[0].QueryStats.Get)/interval, storeStats[1][0].Loads[utils.QueryDim]) - // After cold heartbeat, we won't find region 1 peer in regionStats + // After cold heartbeat, we won't find region 1 peer in HotPeerStats re.NoError(cluster.HandleStoreHeartbeat(coldReq, coldResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 1) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 1) re.Empty(storeStats[1]) // After hot heartbeat, we can find region 1 peer again re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 3) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 3) re.Len(storeStats[1], 1) re.Equal(uint64(1), storeStats[1][0].RegionID) // after several cold heartbeats, and one hot heartbeat, we also can't find region 1 peer @@ -185,19 +185,19 @@ func TestStoreHeartbeat(t *testing.T) { re.NoError(cluster.HandleStoreHeartbeat(coldReq, coldResp)) re.NoError(cluster.HandleStoreHeartbeat(coldReq, coldResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 0) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 0) re.Empty(storeStats[1]) re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 1) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 1) re.Empty(storeStats[1]) - storeStats = cluster.hotStat.RegionStats(utils.Read, 3) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 3) re.Empty(storeStats[1]) // after 2 hot heartbeats, wo can find region 1 peer again re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) re.NoError(cluster.HandleStoreHeartbeat(hotReq, hotResp)) time.Sleep(20 * time.Millisecond) - storeStats = cluster.hotStat.RegionStats(utils.Read, 3) + storeStats = cluster.hotStat.GetHotPeerStats(utils.Read, 3) re.Len(storeStats[1], 1) re.Equal(uint64(1), storeStats[1][0].RegionID) } @@ -642,7 +642,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { re.NoError(err) // wait HotStat to update items time.Sleep(time.Second) - stats := cluster.hotStat.RegionStats(utils.Write, 0) + stats := cluster.hotStat.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 1) re.Len(stats[2], 1) re.Len(stats[3], 1) @@ -655,7 +655,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { re.NoError(err) // wait HotStat to update items time.Sleep(time.Second) - stats = cluster.hotStat.RegionStats(utils.Write, 0) + stats = cluster.hotStat.GetHotPeerStats(utils.Write, 0) re.Len(stats[1], 1) re.Empty(stats[2]) re.Len(stats[3], 1) @@ -2593,7 +2593,7 @@ func TestCollectMetrics(t *testing.T) { rc.collectSchedulingMetrics() } stores := co.GetCluster().GetStores() - regionStats := co.GetCluster().RegionWriteStats() + regionStats := co.GetCluster().GetHotPeerStats(utils.Write) status1 := statistics.CollectHotPeerInfos(stores, regionStats) status2 := statistics.GetHotStatus(stores, co.GetCluster().GetStoresLoads(), regionStats, utils.Write, co.GetCluster().GetSchedulerConfig().IsTraceRegionFlow()) for _, s := range status2.AsLeader { diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 8578b3480d8..49808f4d29d 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -270,21 +270,17 @@ func (sc *schedulingController) GetHotPeerStat(rw utils.RWType, regionID, storeI return sc.hotStat.GetHotPeerStat(rw, regionID, storeID) } -// RegionReadStats returns hot region's read stats. +// GetHotPeerStats returns the read or write statistics for hot regions. +// It returns a map where the keys are store IDs and the values are slices of HotPeerStat. // The result only includes peers that are hot enough. -// RegionStats is a thread-safe method -func (sc *schedulingController) RegionReadStats() map[uint64][]*statistics.HotPeerStat { - // As read stats are reported by store heartbeat, the threshold needs to be adjusted. - threshold := sc.opt.GetHotRegionCacheHitsThreshold() * - (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) - return sc.hotStat.RegionStats(utils.Read, threshold) -} - -// RegionWriteStats returns hot region's write stats. -// The result only includes peers that are hot enough. -func (sc *schedulingController) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { - // RegionStats is a thread-safe method - return sc.hotStat.RegionStats(utils.Write, sc.opt.GetHotRegionCacheHitsThreshold()) +func (sc *schedulingController) GetHotPeerStats(rw utils.RWType) map[uint64][]*statistics.HotPeerStat { + // GetHotPeerStats is a thread-safe method + threshold := sc.opt.GetHotRegionCacheHitsThreshold() + if rw == utils.Read { + threshold = sc.opt.GetHotRegionCacheHitsThreshold() * + (utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval) + } + return sc.hotStat.GetHotPeerStats(rw, threshold) } // BucketsStats returns hot region's buckets stats.