diff --git a/pkg/core/basic_cluster.go b/pkg/core/basic_cluster.go index d70b620db3b..2392b7ddac6 100644 --- a/pkg/core/basic_cluster.go +++ b/pkg/core/basic_cluster.go @@ -14,218 +14,43 @@ package core -import ( - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/tikv/pd/pkg/core/storelimit" - "github.com/tikv/pd/pkg/utils/syncutil" -) - // BasicCluster provides basic data member and interface for a tikv cluster. type BasicCluster struct { - Stores struct { - mu syncutil.RWMutex - *StoresInfo - } - + *StoresInfo *RegionsInfo } // NewBasicCluster creates a BasicCluster. func NewBasicCluster() *BasicCluster { return &BasicCluster{ - Stores: struct { - mu syncutil.RWMutex - *StoresInfo - }{StoresInfo: NewStoresInfo()}, - + StoresInfo: NewStoresInfo(), RegionsInfo: NewRegionsInfo(), } } -/* Stores read operations */ - -// GetStores returns all Stores in the cluster. -func (bc *BasicCluster) GetStores() []*StoreInfo { - bc.Stores.mu.RLock() - defer bc.Stores.mu.RUnlock() - return bc.Stores.GetStores() -} - -// GetMetaStores gets a complete set of metapb.Store. -func (bc *BasicCluster) GetMetaStores() []*metapb.Store { - bc.Stores.mu.RLock() - defer bc.Stores.mu.RUnlock() - return bc.Stores.GetMetaStores() -} - -// GetStore searches for a store by ID. -func (bc *BasicCluster) GetStore(storeID uint64) *StoreInfo { - bc.Stores.mu.RLock() - defer bc.Stores.mu.RUnlock() - return bc.Stores.GetStore(storeID) -} - -// GetRegionStores returns all Stores that contains the region's peer. -func (bc *BasicCluster) GetRegionStores(region *RegionInfo) []*StoreInfo { - bc.Stores.mu.RLock() - defer bc.Stores.mu.RUnlock() - var Stores []*StoreInfo - for id := range region.GetStoreIDs() { - if store := bc.Stores.GetStore(id); store != nil { - Stores = append(Stores, store) - } - } - return Stores -} - -// GetNonWitnessVoterStores returns all Stores that contains the non-witness's voter peer. -func (bc *BasicCluster) GetNonWitnessVoterStores(region *RegionInfo) []*StoreInfo { - bc.Stores.mu.RLock() - defer bc.Stores.mu.RUnlock() - var Stores []*StoreInfo - for id := range region.GetNonWitnessVoters() { - if store := bc.Stores.GetStore(id); store != nil { - Stores = append(Stores, store) - } - } - return Stores -} - -// GetFollowerStores returns all Stores that contains the region's follower peer. -func (bc *BasicCluster) GetFollowerStores(region *RegionInfo) []*StoreInfo { - bc.Stores.mu.RLock() - defer bc.Stores.mu.RUnlock() - var Stores []*StoreInfo - for id := range region.GetFollowers() { - if store := bc.Stores.GetStore(id); store != nil { - Stores = append(Stores, store) - } - } - return Stores -} - -// GetLeaderStore returns all Stores that contains the region's leader peer. -func (bc *BasicCluster) GetLeaderStore(region *RegionInfo) *StoreInfo { - bc.Stores.mu.RLock() - defer bc.Stores.mu.RUnlock() - return bc.Stores.GetStore(region.GetLeader().GetStoreId()) -} - -// GetStoreCount returns the total count of storeInfo. -func (bc *BasicCluster) GetStoreCount() int { - bc.Stores.mu.RLock() - defer bc.Stores.mu.RUnlock() - return bc.Stores.GetStoreCount() -} - -/* Stores Write operations */ - -// PauseLeaderTransfer prevents the store from been selected as source or -// target store of TransferLeader. -func (bc *BasicCluster) PauseLeaderTransfer(storeID uint64) error { - bc.Stores.mu.Lock() - defer bc.Stores.mu.Unlock() - return bc.Stores.PauseLeaderTransfer(storeID) -} - -// ResumeLeaderTransfer cleans a store's pause state. The store can be selected -// as source or target of TransferLeader again. -func (bc *BasicCluster) ResumeLeaderTransfer(storeID uint64) { - bc.Stores.mu.Lock() - defer bc.Stores.mu.Unlock() - bc.Stores.ResumeLeaderTransfer(storeID) -} - -// SlowStoreEvicted marks a store as a slow store and prevents transferring -// leader to the store -func (bc *BasicCluster) SlowStoreEvicted(storeID uint64) error { - bc.Stores.mu.Lock() - defer bc.Stores.mu.Unlock() - return bc.Stores.SlowStoreEvicted(storeID) -} - -// SlowTrendEvicted marks a store as a slow store by trend and prevents transferring -// leader to the store -func (bc *BasicCluster) SlowTrendEvicted(storeID uint64) error { - bc.Stores.mu.Lock() - defer bc.Stores.mu.Unlock() - return bc.Stores.SlowTrendEvicted(storeID) -} - -// SlowTrendRecovered cleans the evicted by slow trend state of a store. -func (bc *BasicCluster) SlowTrendRecovered(storeID uint64) { - bc.Stores.mu.Lock() - defer bc.Stores.mu.Unlock() - bc.Stores.SlowTrendRecovered(storeID) -} - -// SlowStoreRecovered cleans the evicted state of a store. -func (bc *BasicCluster) SlowStoreRecovered(storeID uint64) { - bc.Stores.mu.Lock() - defer bc.Stores.mu.Unlock() - bc.Stores.SlowStoreRecovered(storeID) -} - -// ResetStoreLimit resets the limit for a specific store. -func (bc *BasicCluster) ResetStoreLimit(storeID uint64, limitType storelimit.Type, ratePerSec ...float64) { - bc.Stores.mu.Lock() - defer bc.Stores.mu.Unlock() - bc.Stores.ResetStoreLimit(storeID, limitType, ratePerSec...) -} - // UpdateStoreStatus updates the information of the store. func (bc *BasicCluster) UpdateStoreStatus(storeID uint64) { - leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize := bc.RegionsInfo.GetStoreStats(storeID) - bc.Stores.mu.Lock() - defer bc.Stores.mu.Unlock() - bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize) -} - -// PutStore put a store. -func (bc *BasicCluster) PutStore(store *StoreInfo) { - bc.Stores.mu.Lock() - defer bc.Stores.mu.Unlock() - bc.Stores.SetStore(store) -} - -// ResetStores resets the store cache. -func (bc *BasicCluster) ResetStores() { - bc.Stores.mu.Lock() - defer bc.Stores.mu.Unlock() - bc.Stores.StoresInfo = NewStoresInfo() -} - -// DeleteStore deletes a store. -func (bc *BasicCluster) DeleteStore(store *StoreInfo) { - bc.Stores.mu.Lock() - defer bc.Stores.mu.Unlock() - bc.Stores.DeleteStore(store) + leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize := bc.GetStoreStats(storeID) + bc.StoresInfo.UpdateStoreStatus(storeID, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize) } /* Regions read operations */ // GetLeaderStoreByRegionID returns the leader store of the given region. func (bc *BasicCluster) GetLeaderStoreByRegionID(regionID uint64) *StoreInfo { - region := bc.RegionsInfo.GetRegion(regionID) + region := bc.GetRegion(regionID) if region == nil || region.GetLeader() == nil { return nil } - bc.Stores.mu.RLock() - defer bc.Stores.mu.RUnlock() - return bc.Stores.GetStore(region.GetLeader().GetStoreId()) + return bc.GetStore(region.GetLeader().GetStoreId()) } func (bc *BasicCluster) getWriteRate( f func(storeID uint64) (bytesRate, keysRate float64), ) (storeIDs []uint64, bytesRates, keysRates []float64) { - bc.Stores.mu.RLock() - count := len(bc.Stores.stores) - storeIDs = make([]uint64, 0, count) - for _, store := range bc.Stores.stores { - storeIDs = append(storeIDs, store.GetID()) - } - bc.Stores.mu.RUnlock() + storeIDs = bc.GetStoreIDs() + count := len(storeIDs) bytesRates = make([]float64, 0, count) keysRates = make([]float64, 0, count) for _, id := range storeIDs { @@ -238,12 +63,12 @@ func (bc *BasicCluster) getWriteRate( // GetStoresLeaderWriteRate get total write rate of each store's leaders. func (bc *BasicCluster) GetStoresLeaderWriteRate() (storeIDs []uint64, bytesRates, keysRates []float64) { - return bc.getWriteRate(bc.RegionsInfo.GetStoreLeaderWriteRate) + return bc.getWriteRate(bc.GetStoreLeaderWriteRate) } // GetStoresWriteRate get total write rate of each store's regions. func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, keysRates []float64) { - return bc.getWriteRate(bc.RegionsInfo.GetStoreWriteRate) + return bc.getWriteRate(bc.GetStoreWriteRate) } // UpdateAllStoreStatus updates the information of all stores. diff --git a/pkg/core/store.go b/pkg/core/store.go index 9b660754496..5baedafdb05 100644 --- a/pkg/core/store.go +++ b/pkg/core/store.go @@ -26,6 +26,7 @@ import ( "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/typeutil" "go.uber.org/zap" ) @@ -639,6 +640,7 @@ func MergeLabels(origin []*metapb.StoreLabel, labels []*metapb.StoreLabel) []*me // StoresInfo contains information about all stores. type StoresInfo struct { + syncutil.RWMutex stores map[uint64]*StoreInfo } @@ -649,8 +651,12 @@ func NewStoresInfo() *StoresInfo { } } +/* Stores read operations */ + // GetStore returns a copy of the StoreInfo with the specified storeID. func (s *StoresInfo) GetStore(storeID uint64) *StoreInfo { + s.RLock() + defer s.RUnlock() store, ok := s.stores[storeID] if !ok { return nil @@ -658,13 +664,121 @@ func (s *StoresInfo) GetStore(storeID uint64) *StoreInfo { return store } -// SetStore sets a StoreInfo with storeID. -func (s *StoresInfo) SetStore(store *StoreInfo) { +// GetStores gets a complete set of StoreInfo. +func (s *StoresInfo) GetStores() []*StoreInfo { + s.RLock() + defer s.RUnlock() + stores := make([]*StoreInfo, 0, len(s.stores)) + for _, store := range s.stores { + stores = append(stores, store) + } + return stores +} + +// GetMetaStores gets a complete set of metapb.Store. +func (s *StoresInfo) GetMetaStores() []*metapb.Store { + s.RLock() + defer s.RUnlock() + stores := make([]*metapb.Store, 0, len(s.stores)) + for _, store := range s.stores { + stores = append(stores, store.GetMeta()) + } + return stores +} + +// GetStoreIDs returns a list of store ids. +func (s *StoresInfo) GetStoreIDs() []uint64 { + s.RLock() + defer s.RUnlock() + count := len(s.stores) + storeIDs := make([]uint64, 0, count) + for _, store := range s.stores { + storeIDs = append(storeIDs, store.GetID()) + } + return storeIDs +} + +// GetFollowerStores returns all Stores that contains the region's follower peer. +func (s *StoresInfo) GetFollowerStores(region *RegionInfo) []*StoreInfo { + s.RLock() + defer s.RUnlock() + var stores []*StoreInfo + for id := range region.GetFollowers() { + if store, ok := s.stores[id]; ok && store != nil { + stores = append(stores, store) + } + } + return stores +} + +// GetRegionStores returns all Stores that contains the region's peer. +func (s *StoresInfo) GetRegionStores(region *RegionInfo) []*StoreInfo { + s.RLock() + defer s.RUnlock() + var stores []*StoreInfo + for id := range region.GetStoreIDs() { + if store, ok := s.stores[id]; ok && store != nil { + stores = append(stores, store) + } + } + return stores +} + +// GetLeaderStore returns all Stores that contains the region's leader peer. +func (s *StoresInfo) GetLeaderStore(region *RegionInfo) *StoreInfo { + s.RLock() + defer s.RUnlock() + if store, ok := s.stores[region.GetLeader().GetStoreId()]; ok && store != nil { + return store + } + return nil +} + +// GetStoreCount returns the total count of storeInfo. +func (s *StoresInfo) GetStoreCount() int { + s.RLock() + defer s.RUnlock() + return len(s.stores) +} + +// GetNonWitnessVoterStores returns all Stores that contains the non-witness's voter peer. +func (s *StoresInfo) GetNonWitnessVoterStores(region *RegionInfo) []*StoreInfo { + s.RLock() + defer s.RUnlock() + var stores []*StoreInfo + for id := range region.GetNonWitnessVoters() { + if store, ok := s.stores[id]; ok && store != nil { + stores = append(stores, store) + } + } + return stores +} + +/* Stores write operations */ + +// PutStore sets a StoreInfo with storeID. +func (s *StoresInfo) PutStore(store *StoreInfo) { + s.Lock() + defer s.Unlock() + s.putStoreLocked(store) +} + +// putStoreLocked sets a StoreInfo with storeID. +func (s *StoresInfo) putStoreLocked(store *StoreInfo) { s.stores[store.GetID()] = store } +// ResetStores resets the store cache. +func (s *StoresInfo) ResetStores() { + s.Lock() + defer s.Unlock() + s.stores = make(map[uint64]*StoreInfo) +} + // PauseLeaderTransfer pauses a StoreInfo with storeID. func (s *StoresInfo) PauseLeaderTransfer(storeID uint64) error { + s.Lock() + defer s.Unlock() store, ok := s.stores[storeID] if !ok { return errs.ErrStoreNotFound.FastGenByArgs(storeID) @@ -679,6 +793,8 @@ func (s *StoresInfo) PauseLeaderTransfer(storeID uint64) error { // ResumeLeaderTransfer cleans a store's pause state. The store can be selected // as source or target of TransferLeader again. func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64) { + s.Lock() + defer s.Unlock() store, ok := s.stores[storeID] if !ok { log.Warn("try to clean a store's pause state, but it is not found. It may be cleanup", @@ -691,6 +807,8 @@ func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64) { // SlowStoreEvicted marks a store as a slow store and prevents transferring // leader to the store func (s *StoresInfo) SlowStoreEvicted(storeID uint64) error { + s.Lock() + defer s.Unlock() store, ok := s.stores[storeID] if !ok { return errs.ErrStoreNotFound.FastGenByArgs(storeID) @@ -704,6 +822,8 @@ func (s *StoresInfo) SlowStoreEvicted(storeID uint64) error { // SlowStoreRecovered cleans the evicted state of a store. func (s *StoresInfo) SlowStoreRecovered(storeID uint64) { + s.Lock() + defer s.Unlock() store, ok := s.stores[storeID] if !ok { log.Warn("try to clean a store's evicted as a slow store state, but it is not found. It may be cleanup", @@ -716,6 +836,8 @@ func (s *StoresInfo) SlowStoreRecovered(storeID uint64) { // SlowTrendEvicted marks a store as a slow trend and prevents transferring // leader to the store func (s *StoresInfo) SlowTrendEvicted(storeID uint64) error { + s.Lock() + defer s.Unlock() store, ok := s.stores[storeID] if !ok { return errs.ErrStoreNotFound.FastGenByArgs(storeID) @@ -729,6 +851,8 @@ func (s *StoresInfo) SlowTrendEvicted(storeID uint64) error { // SlowTrendRecovered cleans the evicted by trend state of a store. func (s *StoresInfo) SlowTrendRecovered(storeID uint64) { + s.Lock() + defer s.Unlock() store, ok := s.stores[storeID] if !ok { log.Warn("try to clean a store's evicted by trend as a slow store state, but it is not found. It may be cleanup", @@ -740,76 +864,24 @@ func (s *StoresInfo) SlowTrendRecovered(storeID uint64) { // ResetStoreLimit resets the limit for a specific store. func (s *StoresInfo) ResetStoreLimit(storeID uint64, limitType storelimit.Type, ratePerSec ...float64) { + s.Lock() + defer s.Unlock() if store, ok := s.stores[storeID]; ok { s.stores[storeID] = store.Clone(ResetStoreLimit(limitType, ratePerSec...)) } } -// GetStores gets a complete set of StoreInfo. -func (s *StoresInfo) GetStores() []*StoreInfo { - stores := make([]*StoreInfo, 0, len(s.stores)) - for _, store := range s.stores { - stores = append(stores, store) - } - return stores -} - -// GetMetaStores gets a complete set of metapb.Store. -func (s *StoresInfo) GetMetaStores() []*metapb.Store { - stores := make([]*metapb.Store, 0, len(s.stores)) - for _, store := range s.stores { - stores = append(stores, store.GetMeta()) - } - return stores -} - // DeleteStore deletes tombstone record form store func (s *StoresInfo) DeleteStore(store *StoreInfo) { + s.Lock() + defer s.Unlock() delete(s.stores, store.GetID()) } -// GetStoreCount returns the total count of storeInfo. -func (s *StoresInfo) GetStoreCount() int { - return len(s.stores) -} - -// SetLeaderCount sets the leader count to a storeInfo. -func (s *StoresInfo) SetLeaderCount(storeID uint64, leaderCount int) { - if store, ok := s.stores[storeID]; ok { - s.stores[storeID] = store.Clone(SetLeaderCount(leaderCount)) - } -} - -// SetRegionCount sets the region count to a storeInfo. -func (s *StoresInfo) SetRegionCount(storeID uint64, regionCount int) { - if store, ok := s.stores[storeID]; ok { - s.stores[storeID] = store.Clone(SetRegionCount(regionCount)) - } -} - -// SetPendingPeerCount sets the pending count to a storeInfo. -func (s *StoresInfo) SetPendingPeerCount(storeID uint64, pendingPeerCount int) { - if store, ok := s.stores[storeID]; ok { - s.stores[storeID] = store.Clone(SetPendingPeerCount(pendingPeerCount)) - } -} - -// SetLeaderSize sets the leader size to a storeInfo. -func (s *StoresInfo) SetLeaderSize(storeID uint64, leaderSize int64) { - if store, ok := s.stores[storeID]; ok { - s.stores[storeID] = store.Clone(SetLeaderSize(leaderSize)) - } -} - -// SetRegionSize sets the region size to a storeInfo. -func (s *StoresInfo) SetRegionSize(storeID uint64, regionSize int64) { - if store, ok := s.stores[storeID]; ok { - s.stores[storeID] = store.Clone(SetRegionSize(regionSize)) - } -} - // UpdateStoreStatus updates the information of the store. func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount int, leaderSize int64, regionSize int64) { + s.Lock() + defer s.Unlock() if store, ok := s.stores[storeID]; ok { newStore := store.ShallowClone(SetLeaderCount(leaderCount), SetRegionCount(regionCount), @@ -818,7 +890,7 @@ func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount, regionCount, SetPendingPeerCount(pendingPeerCount), SetLeaderSize(leaderSize), SetRegionSize(regionSize)) - s.SetStore(newStore) + s.putStoreLocked(newStore) } } diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index d84b3698f69..b37ec7f0fca 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -343,20 +343,20 @@ func (manager *Manager) splitKeyspaceRegion(id uint32, waitRegionSplit bool) (er for { select { case <-ticker.C: - regionsInfo := manager.cluster.GetBasicCluster().RegionsInfo - region := regionsInfo.GetRegionByKey(rawLeftBound) + c := manager.cluster.GetBasicCluster() + region := c.GetRegionByKey(rawLeftBound) if region == nil || !bytes.Equal(region.GetStartKey(), rawLeftBound) { continue } - region = regionsInfo.GetRegionByKey(rawRightBound) + region = c.GetRegionByKey(rawRightBound) if region == nil || !bytes.Equal(region.GetStartKey(), rawRightBound) { continue } - region = regionsInfo.GetRegionByKey(txnLeftBound) + region = c.GetRegionByKey(txnLeftBound) if region == nil || !bytes.Equal(region.GetStartKey(), txnLeftBound) { continue } - region = regionsInfo.GetRegionByKey(txnRightBound) + region = c.GetRegionByKey(txnRightBound) if region == nil || !bytes.Equal(region.GetStartKey(), txnRightBound) { continue } diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index be3277f3fc6..39aa11927ca 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -272,7 +272,7 @@ func deleteAllRegionCache(c *gin.Context) { c.String(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error()) return } - cluster.DropCacheAllRegion() + cluster.ResetRegionCache() c.String(http.StatusOK, "All regions are removed from server cache.") } @@ -297,7 +297,7 @@ func deleteRegionCacheByID(c *gin.Context) { c.String(http.StatusBadRequest, err.Error()) return } - cluster.DropCacheRegion(regionID) + cluster.RemoveRegionIfExist(regionID) c.String(http.StatusOK, "The region is removed from server cache.") } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index d711ab2d4f6..caaafe42c87 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -69,9 +69,9 @@ const ( collectWaitTime = time.Minute // heartbeat relative const - heartbeatTaskRunner = "heartbeat-task-runner" - statisticsTaskRunner = "statistics-task-runner" - logTaskRunner = "log-task-runner" + heartbeatTaskRunner = "heartbeat-task-runner" + miscTaskRunner = "misc-task-runner" + logTaskRunner = "log-task-runner" ) var syncRunner = ratelimit.NewSyncRunner() @@ -100,7 +100,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, checkMembershipCh: checkMembershipCh, heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) @@ -521,7 +521,7 @@ func (c *Cluster) collectMetrics() { // collect hot cache metrics c.hotStat.CollectMetrics() // collect the lock metrics - c.RegionsInfo.CollectWaitLockMetrics() + c.CollectWaitLockMetrics() } func resetMetrics() { @@ -688,16 +688,6 @@ func (c *Cluster) SetPrepared() { c.coordinator.GetPrepareChecker().SetPrepared() } -// DropCacheAllRegion removes all cached regions. -func (c *Cluster) DropCacheAllRegion() { - c.ResetRegionCache() -} - -// DropCacheRegion removes a region from the cache. -func (c *Cluster) DropCacheRegion(id uint64) { - c.RemoveRegionIfExist(id) -} - // IsSchedulingHalted returns whether the scheduling is halted. // Currently, the microservice scheduling is halted when: // - The `HaltScheduling` persist option is set to true. diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 3f9710c48fd..5d3aba2d2e8 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -138,11 +138,6 @@ func (mc *Cluster) GetStoresLoads() map[uint64][]float64 { return mc.HotStat.GetStoresLoads() } -// GetStore gets a store with a given store ID. -func (mc *Cluster) GetStore(storeID uint64) *core.StoreInfo { - return mc.Stores.GetStore(storeID) -} - // IsRegionHot checks if the region is hot. func (mc *Cluster) IsRegionHot(region *core.RegionInfo) bool { return mc.HotCache.IsRegionHot(region, mc.GetHotRegionCacheHitsThreshold()) @@ -561,11 +556,6 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo( return items } -// DropCacheAllRegion removes all regions from the cache. -func (mc *Cluster) DropCacheAllRegion() { - mc.ResetRegionCache() -} - // UpdateStoreLeaderWeight updates store leader weight. func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) { store := mc.GetStore(storeID) @@ -752,7 +742,7 @@ func (mc *Cluster) UpdateStoreStatus(id uint64) { pendingPeerCount := mc.GetStorePendingPeerCount(id) leaderSize := mc.GetStoreLeaderRegionSize(id) regionSize := mc.GetStoreRegionSize(id) - store := mc.Stores.GetStore(id) + store := mc.GetStore(id) stats := &pdpb.StoreStats{} stats.Capacity = defaultStoreCapacity stats.Available = stats.Capacity - uint64(store.GetRegionSize()*units.MiB) diff --git a/pkg/schedule/checker/rule_checker_test.go b/pkg/schedule/checker/rule_checker_test.go index e69b956134b..e1cc702fd36 100644 --- a/pkg/schedule/checker/rule_checker_test.go +++ b/pkg/schedule/checker/rule_checker_test.go @@ -1980,7 +1980,7 @@ func makeStores() placement.StoreSet { if zone == 1 && host == 1 { labels["type"] = "read" } - stores.SetStore(core.NewStoreInfoWithLabel(id, labels).Clone(core.SetLastHeartbeatTS(now), core.SetStoreState(metapb.StoreState_Up))) + stores.PutStore(core.NewStoreInfoWithLabel(id, labels).Clone(core.SetLastHeartbeatTS(now), core.SetStoreState(metapb.StoreState_Up))) } } } diff --git a/pkg/schedule/placement/fit_test.go b/pkg/schedule/placement/fit_test.go index aa5c66059f7..cc49d25640c 100644 --- a/pkg/schedule/placement/fit_test.go +++ b/pkg/schedule/placement/fit_test.go @@ -47,7 +47,7 @@ func makeStores() StoreSet { if id == 1111 || id == 2111 || id == 3111 { labels["disk"] = "ssd" } - stores.SetStore(core.NewStoreInfoWithLabel(id, labels).Clone(core.SetLastHeartbeatTS(now))) + stores.PutStore(core.NewStoreInfoWithLabel(id, labels).Clone(core.SetLastHeartbeatTS(now))) } } } diff --git a/pkg/schedule/scatter/region_scatterer_test.go b/pkg/schedule/scatter/region_scatterer_test.go index b0027e0e415..89e55e5c9c7 100644 --- a/pkg/schedule/scatter/region_scatterer_test.go +++ b/pkg/schedule/scatter/region_scatterer_test.go @@ -216,7 +216,7 @@ func scatterSpecial(re *require.Assertions, numOrdinaryStores, numSpecialStores, leaderStoreID := region.GetLeader().GetStoreId() for _, peer := range region.GetPeers() { storeID := peer.GetStoreId() - store := tc.Stores.GetStore(storeID) + store := tc.GetStore(storeID) if store.GetLabelValue("engine") == "tiflash" { countSpecialPeers[storeID]++ } else { diff --git a/pkg/schedule/schedulers/balance_test.go b/pkg/schedule/schedulers/balance_test.go index 234acfd6d26..26214ed5456 100644 --- a/pkg/schedule/schedulers/balance_test.go +++ b/pkg/schedule/schedulers/balance_test.go @@ -697,7 +697,7 @@ func (suite *balanceLeaderRangeSchedulerTestSuite) TestReSortStores() { suite.tc.AddLeaderStore(4, 100) suite.tc.AddLeaderStore(5, 100) suite.tc.AddLeaderStore(6, 0) - stores := suite.tc.Stores.GetStores() + stores := suite.tc.GetStores() sort.Slice(stores, func(i, j int) bool { return stores[i].GetID() < stores[j].GetID() }) diff --git a/pkg/storage/leveldb_backend.go b/pkg/storage/leveldb_backend.go old mode 100644 new mode 100755 diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 4525ec6091c..460489ecd10 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -100,7 +100,7 @@ func TestLoadStores(t *testing.T) { n := 10 stores := mustSaveStores(re, storage, n) - re.NoError(storage.LoadStores(cache.SetStore)) + re.NoError(storage.LoadStores(cache.PutStore)) re.Equal(n, cache.GetStoreCount()) for _, store := range cache.GetMetaStores() { @@ -117,7 +117,7 @@ func TestStoreWeight(t *testing.T) { mustSaveStores(re, storage, n) re.NoError(storage.SaveStoreWeight(1, 2.0, 3.0)) re.NoError(storage.SaveStoreWeight(2, 0.2, 0.3)) - re.NoError(storage.LoadStores(cache.SetStore)) + re.NoError(storage.LoadStores(cache.PutStore)) leaderWeights := []float64{1.0, 2.0, 0.2} regionWeights := []float64{1.0, 3.0, 0.3} for i := 0; i < n; i++ { diff --git a/pkg/unsaferecovery/unsafe_recovery_controller.go b/pkg/unsaferecovery/unsafe_recovery_controller.go index d2f6125c3f3..89cd6e6393c 100644 --- a/pkg/unsaferecovery/unsafe_recovery_controller.go +++ b/pkg/unsaferecovery/unsafe_recovery_controller.go @@ -107,7 +107,7 @@ const ( type cluster interface { core.StoreSetInformer - DropCacheAllRegion() + ResetRegionCache() AllocID() (uint64, error) BuryStore(storeID uint64, forceBury bool) error GetSchedulerConfig() sc.SchedulerConfigProvider @@ -544,7 +544,7 @@ func (u *Controller) changeStage(stage stage) { case Finished: if u.step > 1 { // == 1 means no operation has done, no need to invalid cache - u.cluster.DropCacheAllRegion() + u.cluster.ResetRegionCache() } output.Info = "Unsafe recovery Finished" output.Details = u.getAffectedTableDigest() diff --git a/server/api/admin.go b/server/api/admin.go index ab5ba882287..dd81985b514 100644 --- a/server/api/admin.go +++ b/server/api/admin.go @@ -60,7 +60,7 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request) h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - rc.DropCacheRegion(regionID) + rc.RemoveRegionIfExist(regionID) if h.svr.IsServiceIndependent(utils.SchedulingServiceName) { err = h.DeleteRegionCacheInSchedulingServer(regionID) } @@ -100,7 +100,7 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques return } // Remove region from cache. - rc.DropCacheRegion(regionID) + rc.RemoveRegionIfExist(regionID) if h.svr.IsServiceIndependent(utils.SchedulingServiceName) { err = h.DeleteRegionCacheInSchedulingServer(regionID) } @@ -116,7 +116,7 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Request) { var err error rc := getCluster(r) - rc.DropCacheAllRegion() + rc.ResetRegionCache() if h.svr.IsServiceIndependent(utils.SchedulingServiceName) { err = h.DeleteRegionCacheInSchedulingServer() } diff --git a/server/api/stats.go b/server/api/stats.go index 915d33ddfdf..5aa8fcb72a6 100644 --- a/server/api/stats.go +++ b/server/api/stats.go @@ -47,7 +47,7 @@ func (h *statsHandler) GetRegionStatus(w http.ResponseWriter, r *http.Request) { startKey, endKey := r.URL.Query().Get("start_key"), r.URL.Query().Get("end_key") var stats *statistics.RegionStats if r.URL.Query().Has("count") { - stats = rc.GetRegionCount([]byte(startKey), []byte(endKey)) + stats = rc.GetRegionStatsCount([]byte(startKey), []byte(endKey)) } else { stats = rc.GetRegionStatsByRange([]byte(startKey), []byte(endKey)) } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 057814b718b..70d6b46b980 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -107,9 +107,9 @@ const ( minSnapshotDurationSec = 5 // heartbeat relative const - heartbeatTaskRunner = "heartbeat-async" - statisticsTaskRunner = "statistics-async" - logTaskRunner = "log-async" + heartbeatTaskRunner = "heartbeat-async" + miscTaskRunner = "misc-async" + logTaskRunner = "log-async" ) // Server is the interface for cluster. @@ -143,6 +143,8 @@ type RaftCluster struct { ctx context.Context cancel context.CancelFunc + *core.BasicCluster // cached cluster info + etcdClient *clientv3.Client httpClient *http.Client @@ -159,7 +161,6 @@ type RaftCluster struct { // This below fields are all read-only, we cannot update itself after the raft cluster starts. clusterID uint64 id id.Allocator - core *core.BasicCluster // cached cluster info opt *config.PersistOptions limiter *StoreLimiter *schedulingController @@ -201,10 +202,10 @@ func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.Ba regionSyncer: regionSyncer, httpClient: httpClient, etcdClient: etcdClient, - core: basicCluster, + BasicCluster: basicCluster, storage: storage, heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } } @@ -251,10 +252,10 @@ func (c *RaftCluster) LoadClusterStatus() (*Status, error) { } func (c *RaftCluster) isInitialized() bool { - if c.core.GetTotalRegionCount() > 1 { + if c.GetTotalRegionCount() > 1 { return true } - region := c.core.GetRegionByKey(nil) + region := c.GetRegionByKey(nil) return region != nil && len(region.GetVoters()) >= int(c.opt.GetReplicationConfig().MaxReplicas) && len(region.GetPendingPeers()) == 0 @@ -295,7 +296,7 @@ func (c *RaftCluster) InitCluster( return err } } - c.schedulingController = newSchedulingController(c.ctx, c.core, c.opt, c.ruleManager) + c.schedulingController = newSchedulingController(c.ctx, c.BasicCluster, c.opt, c.ruleManager) return nil } @@ -644,9 +645,9 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { return nil, nil } - c.core.ResetStores() + c.ResetStores() start := time.Now() - if err := c.storage.LoadStores(c.core.PutStore); err != nil { + if err := c.storage.LoadStores(c.PutStore); err != nil { return nil, err } log.Info("load stores", @@ -657,11 +658,11 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { start = time.Now() // used to load region from kv storage to cache storage. - if err = storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil { + if err = storage.TryLoadRegionsOnce(c.ctx, c.storage, c.CheckAndPutRegion); err != nil { return nil, err } log.Info("load regions", - zap.Int("count", c.core.GetTotalRegionCount()), + zap.Int("count", c.GetTotalRegionCount()), zap.Duration("cost", time.Since(start)), ) @@ -729,7 +730,7 @@ func (c *RaftCluster) runUpdateStoreStats() { case <-ticker.C: // Update related stores. start := time.Now() - c.core.UpdateAllStoreStatus() + c.UpdateAllStoreStatus() updateStoreStatsGauge.Set(time.Since(start).Seconds()) } } @@ -868,8 +869,6 @@ func (c *RaftCluster) GetUnsafeRecoveryController() *unsaferecovery.Controller { func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest, resp *pdpb.StoreHeartbeatResponse) error { stats := heartbeat.GetStats() storeID := stats.GetStoreId() - c.Lock() - defer c.Unlock() store := c.GetStore(storeID) if store == nil { return errors.Errorf("store %v not found", storeID) @@ -917,10 +916,10 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest newStore = newStore.Clone(core.SetLastPersistTime(nowTime)) } } - if store := c.core.GetStore(storeID); store != nil { + if store := c.GetStore(storeID); store != nil { statistics.UpdateStoreHeartbeatMetrics(store) } - c.core.PutStore(newStore) + c.PutStore(newStore) var ( regions map[uint64]*core.RegionInfo interval uint64 @@ -989,7 +988,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest // processReportBuckets update the bucket information. func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error { - region := c.core.GetRegion(buckets.GetRegionId()) + region := c.GetRegion(buckets.GetRegionId()) if region == nil { regionCacheMissCounter.Inc() return errors.Errorf("region %v not found", buckets.GetRegionId()) @@ -1022,7 +1021,7 @@ var syncRunner = ratelimit.NewSyncRunner() // processRegionHeartbeat updates the region information. func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *core.RegionInfo) error { tracer := ctx.Tracer - origin, _, err := c.core.PreCheckPutRegion(region) + origin, _, err := c.PreCheckPutRegion(region) tracer.OnPreCheckFinished() if err != nil { return err @@ -1082,7 +1081,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // check its validation again here. // // However, it can't solve the race condition of concurrent heartbeats from the same region. - if overlaps, err = c.core.CheckAndPutRootTree(ctx, region); err != nil { + if overlaps, err = c.CheckAndPutRootTree(ctx, region); err != nil { tracer.OnSaveCacheFinished() return err } @@ -1173,158 +1172,7 @@ func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error { // GetBasicCluster returns the basic cluster. func (c *RaftCluster) GetBasicCluster() *core.BasicCluster { - return c.core -} - -// GetRegionByKey gets regionInfo by region key from cluster. -func (c *RaftCluster) GetRegionByKey(regionKey []byte) *core.RegionInfo { - return c.core.GetRegionByKey(regionKey) -} - -// GetPrevRegionByKey gets previous region and leader peer by the region key from cluster. -func (c *RaftCluster) GetPrevRegionByKey(regionKey []byte) *core.RegionInfo { - return c.core.GetPrevRegionByKey(regionKey) -} - -// ScanRegions scans region with start key, until the region contains endKey, or -// total number greater than limit. -func (c *RaftCluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo { - return c.core.ScanRegions(startKey, endKey, limit) -} - -// GetRegion searches for a region by ID. -func (c *RaftCluster) GetRegion(regionID uint64) *core.RegionInfo { - return c.core.GetRegion(regionID) -} - -// GetMetaRegions gets regions from cluster. -func (c *RaftCluster) GetMetaRegions() []*metapb.Region { - return c.core.GetMetaRegions() -} - -// GetRegions returns all regions' information in detail. -func (c *RaftCluster) GetRegions() []*core.RegionInfo { - return c.core.GetRegions() -} - -// ValidRegion is used to decide if the region is valid. -func (c *RaftCluster) ValidRegion(region *metapb.Region) error { - return c.core.ValidRegion(region) -} - -// GetTotalRegionCount returns total count of regions -func (c *RaftCluster) GetTotalRegionCount() int { - return c.core.GetTotalRegionCount() -} - -// GetStoreRegions returns all regions' information with a given storeID. -func (c *RaftCluster) GetStoreRegions(storeID uint64) []*core.RegionInfo { - return c.core.GetStoreRegions(storeID) -} - -// GetStoreRegions returns all regions' information with a given storeID. -func (c *RaftCluster) GetStoreRegionsByType(storeID uint64) []*core.RegionInfo { - return c.core.GetStoreRegions(storeID) -} - -// RandLeaderRegions returns some random regions that has leader on the store. -func (c *RaftCluster) RandLeaderRegions(storeID uint64, ranges []core.KeyRange) []*core.RegionInfo { - return c.core.RandLeaderRegions(storeID, ranges) -} - -// RandFollowerRegions returns some random regions that has a follower on the store. -func (c *RaftCluster) RandFollowerRegions(storeID uint64, ranges []core.KeyRange) []*core.RegionInfo { - return c.core.RandFollowerRegions(storeID, ranges) -} - -// RandPendingRegions returns some random regions that has a pending peer on the store. -func (c *RaftCluster) RandPendingRegions(storeID uint64, ranges []core.KeyRange) []*core.RegionInfo { - return c.core.RandPendingRegions(storeID, ranges) -} - -// RandLearnerRegions returns some random regions that has a learner peer on the store. -func (c *RaftCluster) RandLearnerRegions(storeID uint64, ranges []core.KeyRange) []*core.RegionInfo { - return c.core.RandLearnerRegions(storeID, ranges) -} - -// RandWitnessRegions returns some random regions that has a witness peer on the store. -func (c *RaftCluster) RandWitnessRegions(storeID uint64, ranges []core.KeyRange) []*core.RegionInfo { - return c.core.RandWitnessRegions(storeID, ranges) -} - -// GetLeaderStore returns all stores that contains the region's leader peer. -func (c *RaftCluster) GetLeaderStore(region *core.RegionInfo) *core.StoreInfo { - return c.core.GetLeaderStore(region) -} - -// GetNonWitnessVoterStores returns all stores that contains the region's non-witness voter peer. -func (c *RaftCluster) GetNonWitnessVoterStores(region *core.RegionInfo) []*core.StoreInfo { - return c.core.GetNonWitnessVoterStores(region) -} - -// GetFollowerStores returns all stores that contains the region's follower peer. -func (c *RaftCluster) GetFollowerStores(region *core.RegionInfo) []*core.StoreInfo { - return c.core.GetFollowerStores(region) -} - -// GetRegionStores returns all stores that contains the region's peer. -func (c *RaftCluster) GetRegionStores(region *core.RegionInfo) []*core.StoreInfo { - return c.core.GetRegionStores(region) -} - -// GetStoreCount returns the count of stores. -func (c *RaftCluster) GetStoreCount() int { - return c.core.GetStoreCount() -} - -// GetStoreRegionCount returns the number of regions for a given store. -func (c *RaftCluster) GetStoreRegionCount(storeID uint64) int { - return c.core.GetStoreRegionCount(storeID) -} - -// GetAverageRegionSize returns the average region approximate size. -func (c *RaftCluster) GetAverageRegionSize() int64 { - return c.core.GetAverageRegionSize() -} - -// DropCacheRegion removes a region from the cache. -func (c *RaftCluster) DropCacheRegion(id uint64) { - c.core.RemoveRegionIfExist(id) -} - -// DropCacheAllRegion removes all regions from the cache. -func (c *RaftCluster) DropCacheAllRegion() { - c.core.ResetRegionCache() -} - -// GetMetaStores gets stores from cluster. -func (c *RaftCluster) GetMetaStores() []*metapb.Store { - return c.core.GetMetaStores() -} - -// GetStores returns all stores in the cluster. -func (c *RaftCluster) GetStores() []*core.StoreInfo { - return c.core.GetStores() -} - -// GetLeaderStoreByRegionID returns the leader store of the given region. -func (c *RaftCluster) GetLeaderStoreByRegionID(regionID uint64) *core.StoreInfo { - return c.core.GetLeaderStoreByRegionID(regionID) -} - -// GetStore gets store from cluster. -func (c *RaftCluster) GetStore(storeID uint64) *core.StoreInfo { - return c.core.GetStore(storeID) -} - -// GetAdjacentRegions returns regions' information that are adjacent with the specific region ID. -func (c *RaftCluster) GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo) { - return c.core.GetAdjacentRegions(region) -} - -// GetRangeHoles returns all range holes, i.e the key ranges without any region info. -func (c *RaftCluster) GetRangeHoles() [][]string { - return c.core.GetRangeHoles() + return c.BasicCluster } // UpdateStoreLabels updates a store's location labels @@ -1360,8 +1208,8 @@ func (c *RaftCluster) DeleteStoreLabel(storeID uint64, labelKey string) error { return c.putStoreImpl(newStore, true) } -// PutStore puts a store. -func (c *RaftCluster) PutStore(store *metapb.Store) error { +// PutMetaStore puts a store. +func (c *RaftCluster) PutMetaStore(store *metapb.Store) error { if err := c.putStoreImpl(store, false); err != nil { return err } @@ -1374,9 +1222,6 @@ func (c *RaftCluster) PutStore(store *metapb.Store) error { // If 'force' is true, the store's labels will overwrite those labels which already existed in the store. // If 'force' is false, the store's labels will merge into those labels which already existed in the store. func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool) error { - c.Lock() - defer c.Unlock() - if store.GetId() == 0 { return errors.Errorf("invalid put store %v", store) } @@ -1418,7 +1263,7 @@ func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool) error { if err := c.checkStoreLabels(s); err != nil { return err } - return c.putStoreLocked(s) + return c.setStore(s) } func (c *RaftCluster) checkStoreVersion(store *metapb.Store) error { @@ -1463,9 +1308,6 @@ func (c *RaftCluster) checkStoreLabels(s *core.StoreInfo) error { // RemoveStore marks a store as offline in cluster. // State transition: Up -> Offline. func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) error { - c.Lock() - defer c.Unlock() - store := c.GetStore(storeID) if store == nil { return errs.ErrStoreNotFound.FastGenByArgs(storeID) @@ -1490,9 +1332,9 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro zap.Uint64("store-id", storeID), zap.String("store-address", newStore.GetAddress()), zap.Bool("physically-destroyed", newStore.IsPhysicallyDestroyed())) - err := c.putStoreLocked(newStore) + err := c.setStore(newStore) if err == nil { - regionSize := float64(c.core.GetStoreRegionSize(storeID)) + regionSize := float64(c.GetStoreRegionSize(storeID)) c.resetProgress(storeID, store.GetAddress()) c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval, progress.WindowDurationOption(c.GetCoordinator().GetPatrolRegionsDuration())) // record the current store limit in memory @@ -1555,9 +1397,6 @@ func (c *RaftCluster) getUpStores() []uint64 { // BuryStore marks a store as tombstone in cluster. // If forceBury is false, the store should be offlined and emptied before calling this func. func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error { - c.Lock() - defer c.Unlock() - store := c.GetStore(storeID) if store == nil { return errs.ErrStoreNotFound.FastGenByArgs(storeID) @@ -1582,8 +1421,8 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error { zap.String("store-address", newStore.GetAddress()), zap.String("state", store.GetState().String()), zap.Bool("physically-destroyed", store.IsPhysicallyDestroyed())) - err := c.putStoreLocked(newStore) - c.onStoreVersionChangeLocked() + err := c.setStore(newStore) + c.OnStoreVersionChange() if err == nil { // clean up the residual information. delete(c.prevStoreLimit, storeID) @@ -1599,40 +1438,6 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error { return err } -// PauseLeaderTransfer prevents the store from been selected as source or -// target store of TransferLeader. -func (c *RaftCluster) PauseLeaderTransfer(storeID uint64) error { - return c.core.PauseLeaderTransfer(storeID) -} - -// ResumeLeaderTransfer cleans a store's pause state. The store can be selected -// as source or target of TransferLeader again. -func (c *RaftCluster) ResumeLeaderTransfer(storeID uint64) { - c.core.ResumeLeaderTransfer(storeID) -} - -// SlowStoreEvicted marks a store as a slow store and prevents transferring -// leader to the store -func (c *RaftCluster) SlowStoreEvicted(storeID uint64) error { - return c.core.SlowStoreEvicted(storeID) -} - -// SlowTrendEvicted marks a store as a slow store by trend and prevents transferring -// leader to the store -func (c *RaftCluster) SlowTrendEvicted(storeID uint64) error { - return c.core.SlowTrendEvicted(storeID) -} - -// SlowTrendRecovered cleans the evicted by slow trend state of a store. -func (c *RaftCluster) SlowTrendRecovered(storeID uint64) { - c.core.SlowTrendRecovered(storeID) -} - -// SlowStoreRecovered cleans the evicted state of a store. -func (c *RaftCluster) SlowStoreRecovered(storeID uint64) { - c.core.SlowStoreRecovered(storeID) -} - // NeedAwakenAllRegionsInStore checks whether we should do AwakenRegions operation. func (c *RaftCluster) NeedAwakenAllRegionsInStore(storeID uint64) (needAwaken bool, slowStoreIDs []uint64) { store := c.GetStore(storeID) @@ -1664,9 +1469,6 @@ func (c *RaftCluster) NeedAwakenAllRegionsInStore(storeID uint64) (needAwaken bo // UpStore up a store from offline func (c *RaftCluster) UpStore(storeID uint64) error { - c.Lock() - defer c.Unlock() - store := c.GetStore(storeID) if store == nil { return errs.ErrStoreNotFound.FastGenByArgs(storeID) @@ -1697,7 +1499,7 @@ func (c *RaftCluster) UpStore(storeID uint64) error { log.Warn("store has been up", zap.Uint64("store-id", storeID), zap.String("store-address", newStore.GetAddress())) - err := c.putStoreLocked(newStore) + err := c.setStore(newStore) if err == nil { if exist { // persist the store limit @@ -1711,9 +1513,6 @@ func (c *RaftCluster) UpStore(storeID uint64) error { // ReadyToServe change store's node state to Serving. func (c *RaftCluster) ReadyToServe(storeID uint64) error { - c.Lock() - defer c.Unlock() - store := c.GetStore(storeID) if store == nil { return errs.ErrStoreNotFound.FastGenByArgs(storeID) @@ -1735,7 +1534,7 @@ func (c *RaftCluster) ReadyToServe(storeID uint64) error { log.Info("store has changed to serving", zap.Uint64("store-id", storeID), zap.String("store-address", newStore.GetAddress())) - err := c.putStoreLocked(newStore) + err := c.setStore(newStore) if err == nil { c.resetProgress(storeID, store.GetAddress()) } @@ -1758,16 +1557,16 @@ func (c *RaftCluster) SetStoreWeight(storeID uint64, leaderWeight, regionWeight core.SetRegionWeight(regionWeight), ) - return c.putStoreLocked(newStore) + return c.setStore(newStore) } -func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error { +func (c *RaftCluster) setStore(store *core.StoreInfo) error { if c.storage != nil { if err := c.storage.SaveStoreMeta(store.GetMeta()); err != nil { return err } } - c.core.PutStore(store) + c.PutStore(store) if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { c.updateStoreStatistics(store.GetID(), store.IsSlow()) } @@ -1833,11 +1632,11 @@ func (c *RaftCluster) checkStores() { offlineStore := store.GetMeta() id := offlineStore.GetId() - regionSize := c.core.GetStoreRegionSize(id) + regionSize := c.GetStoreRegionSize(id) if c.IsPrepared() { c.updateProgress(id, store.GetAddress(), removingAction, float64(regionSize), float64(regionSize), false /* dec */) } - regionCount := c.core.GetStoreRegionCount(id) + regionCount := c.GetStoreRegionCount(id) // If the store is empty, it can be buried. if regionCount == 0 { if err := c.BuryStore(id, false); err != nil { @@ -1865,7 +1664,7 @@ func (c *RaftCluster) checkStores() { func (c *RaftCluster) getThreshold(stores []*core.StoreInfo, store *core.StoreInfo) float64 { start := time.Now() if !c.opt.IsPlacementRulesEnabled() { - regionSize := c.core.GetRegionSizeByRange([]byte(""), []byte("")) * int64(c.opt.GetMaxReplicas()) + regionSize := c.GetRegionSizeByRange([]byte(""), []byte("")) * int64(c.opt.GetMaxReplicas()) weight := getStoreTopoWeight(store, stores, c.opt.GetLocationLabels(), c.opt.GetMaxReplicas()) return float64(regionSize) * weight * 0.9 } @@ -1905,7 +1704,7 @@ func (c *RaftCluster) calculateRange(stores []*core.StoreInfo, store *core.Store matchStores = append(matchStores, s) } } - regionSize := c.core.GetRegionSizeByRange(startKey, endKey) * int64(rule.Count) + regionSize := c.GetRegionSizeByRange(startKey, endKey) * int64(rule.Count) weight := getStoreTopoWeight(store, matchStores, rule.LocationLabels, rule.Count) storeSize += float64(regionSize) * weight log.Debug("calculate range result", @@ -2071,13 +1870,10 @@ func encodePreparingProgressKey(storeID uint64) string { // RemoveTombStoneRecords removes the tombStone Records. func (c *RaftCluster) RemoveTombStoneRecords() error { - c.Lock() - defer c.Unlock() - var failedStores []uint64 for _, store := range c.GetStores() { if store.IsRemoved() { - if c.core.GetStoreRegionCount(store.GetID()) > 0 { + if c.GetStoreRegionCount(store.GetID()) > 0 { log.Warn("skip removing tombstone", zap.Stringer("store", store.GetMeta())) failedStores = append(failedStores, store.GetID()) continue @@ -2115,7 +1911,7 @@ func (c *RaftCluster) deleteStore(store *core.StoreInfo) error { return err } } - c.core.DeleteStore(store) + c.DeleteStore(store) return nil } @@ -2156,12 +1952,6 @@ func (c *RaftCluster) resetProgressIndicator() { // OnStoreVersionChange changes the version of the cluster when needed. func (c *RaftCluster) OnStoreVersionChange() { - c.RLock() - defer c.RUnlock() - c.onStoreVersionChangeLocked() -} - -func (c *RaftCluster) onStoreVersionChangeLocked() { var minVersion *semver.Version stores := c.GetStores() for _, s := range stores { @@ -2219,13 +2009,13 @@ func (c *RaftCluster) PutMetaCluster(meta *metapb.Cluster) error { // GetRegionStatsByRange returns region statistics from cluster. func (c *RaftCluster) GetRegionStatsByRange(startKey, endKey []byte) *statistics.RegionStats { - return statistics.GetRegionStats(c.core.ScanRegions(startKey, endKey, -1)) + return statistics.GetRegionStats(c.ScanRegions(startKey, endKey, -1)) } -// GetRegionCount returns the number of regions in the range. -func (c *RaftCluster) GetRegionCount(startKey, endKey []byte) *statistics.RegionStats { +// GetRegionStatsCount returns the number of regions in the range. +func (c *RaftCluster) GetRegionStatsCount(startKey, endKey []byte) *statistics.RegionStats { stats := &statistics.RegionStats{} - stats.Count = c.core.GetRegionCount(startKey, endKey) + stats.Count = c.GetRegionCount(startKey, endKey) return stats } @@ -2237,7 +2027,7 @@ func (c *RaftCluster) putRegion(region *core.RegionInfo) error { return err } } - c.core.PutRegion(region) + c.PutRegion(region) return nil } @@ -2292,7 +2082,7 @@ func (c *RaftCluster) AddStoreLimit(store *metapb.Store) { func (c *RaftCluster) RemoveStoreLimit(storeID uint64) { cfg := c.opt.GetScheduleConfig().Clone() for _, limitType := range storelimit.TypeNameValue { - c.core.ResetStoreLimit(storeID, limitType) + c.ResetStoreLimit(storeID, limitType) } delete(cfg.StoreLimit, storeID) c.opt.SetScheduleConfig(cfg) @@ -2312,16 +2102,13 @@ func (c *RaftCluster) RemoveStoreLimit(storeID uint64) { // SetMinResolvedTS sets up a store with min resolved ts. func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error { - c.Lock() - defer c.Unlock() - store := c.GetStore(storeID) if store == nil { return errs.ErrStoreNotFound.FastGenByArgs(storeID) } newStore := store.Clone(core.SetMinResolvedTS(minResolvedTS)) - c.core.PutStore(newStore) + c.PutStore(newStore) return nil } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 0f08153c8ae..ee7c477476b 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -93,7 +93,7 @@ func TestStoreHeartbeat(t *testing.T) { } re.Error(cluster.HandleStoreHeartbeat(req, resp)) - re.NoError(cluster.putStoreLocked(store)) + re.NoError(cluster.setStore(store)) re.Equal(i+1, cluster.GetStoreCount()) re.Equal(int64(0), store.GetLastHeartbeatTS().UnixNano()) @@ -215,7 +215,7 @@ func TestFilterUnhealthyStore(t *testing.T) { Available: 50, RegionCount: 1, } - re.NoError(cluster.putStoreLocked(store)) + re.NoError(cluster.setStore(store)) re.NoError(cluster.HandleStoreHeartbeat(req, resp)) re.NotNil(cluster.hotStat.GetRollingStoreStats(store.GetID())) } @@ -228,7 +228,7 @@ func TestFilterUnhealthyStore(t *testing.T) { RegionCount: 1, } newStore := store.Clone(core.SetStoreState(metapb.StoreState_Tombstone)) - re.NoError(cluster.putStoreLocked(newStore)) + re.NoError(cluster.setStore(newStore)) re.NoError(cluster.HandleStoreHeartbeat(req, resp)) re.Nil(cluster.hotStat.GetRollingStoreStats(store.GetID())) } @@ -253,7 +253,7 @@ func TestSetOfflineStore(t *testing.T) { // Put 6 stores. for _, store := range newTestStores(6, "2.0.0") { - re.NoError(cluster.PutStore(store.GetMeta())) + re.NoError(cluster.PutMetaStore(store.GetMeta())) } // store 1: up -> offline @@ -295,7 +295,7 @@ func TestSetOfflineStore(t *testing.T) { // test clean up tombstone store toCleanStore := cluster.GetStore(1).Clone().GetMeta() toCleanStore.LastHeartbeat = time.Now().Add(-40 * 24 * time.Hour).UnixNano() - cluster.PutStore(toCleanStore) + cluster.PutMetaStore(toCleanStore) cluster.checkStores() re.Nil(cluster.GetStore(1)) } @@ -312,7 +312,7 @@ func TestSetOfflineWithReplica(t *testing.T) { // Put 4 stores. for _, store := range newTestStores(4, "2.0.0") { - re.NoError(cluster.PutStore(store.GetMeta())) + re.NoError(cluster.PutMetaStore(store.GetMeta())) } re.NoError(cluster.RemoveStore(2, false)) @@ -351,7 +351,7 @@ func TestSetOfflineStoreWithEvictLeader(t *testing.T) { // Put 3 stores. for _, store := range newTestStores(3, "2.0.0") { - re.NoError(cluster.PutStore(store.GetMeta())) + re.NoError(cluster.PutMetaStore(store.GetMeta())) } _, err = addEvictLeaderScheduler(cluster, 1) @@ -378,7 +378,7 @@ func TestForceBuryStore(t *testing.T) { stores := newTestStores(2, "5.3.0") stores[1] = stores[1].Clone(core.SetLastHeartbeatTS(time.Now())) for _, store := range stores { - re.NoError(cluster.PutStore(store.GetMeta())) + re.NoError(cluster.PutMetaStore(store.GetMeta())) } re.NoError(cluster.BuryStore(uint64(1), true)) re.Error(cluster.BuryStore(uint64(2), true)) @@ -396,7 +396,7 @@ func TestReuseAddress(t *testing.T) { cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) // Put 4 stores. for _, store := range newTestStores(4, "2.0.0") { - re.NoError(cluster.PutStore(store.GetMeta())) + re.NoError(cluster.PutMetaStore(store.GetMeta())) } // store 1: up // store 2: offline @@ -420,9 +420,9 @@ func TestReuseAddress(t *testing.T) { if storeInfo.IsPhysicallyDestroyed() || storeInfo.IsRemoved() { // try to start a new store with the same address with store which is physically destroyed or tombstone should be success - re.NoError(cluster.PutStore(newStore)) + re.NoError(cluster.PutMetaStore(newStore)) } else { - re.Error(cluster.PutStore(newStore)) + re.Error(cluster.PutMetaStore(newStore)) } } } @@ -450,7 +450,7 @@ func TestUpStore(t *testing.T) { // Put 5 stores. for _, store := range newTestStores(5, "5.0.0") { - re.NoError(cluster.PutStore(store.GetMeta())) + re.NoError(cluster.PutMetaStore(store.GetMeta())) } // set store 1 offline @@ -490,7 +490,7 @@ func TestRemovingProcess(t *testing.T) { // Put 5 stores. stores := newTestStores(5, "5.0.0") for _, store := range stores { - re.NoError(cluster.PutStore(store.GetMeta())) + re.NoError(cluster.PutMetaStore(store.GetMeta())) } regions := newTestRegions(100, 5, 1) var regionInStore1 []*core.RegionInfo @@ -518,7 +518,7 @@ func TestRemovingProcess(t *testing.T) { if i >= 5 { break } - cluster.DropCacheRegion(region.GetID()) + cluster.RemoveRegionIfExist(region.GetID()) i++ } cluster.checkStores() @@ -553,13 +553,13 @@ func TestDeleteStoreUpdatesClusterVersion(t *testing.T) { // Put 3 new 4.0.9 stores. for _, store := range newTestStores(3, "4.0.9") { - re.NoError(cluster.PutStore(store.GetMeta())) + re.NoError(cluster.PutMetaStore(store.GetMeta())) } re.Equal("4.0.9", cluster.GetClusterVersion()) // Upgrade 2 stores to 5.0.0. for _, store := range newTestStores(2, "5.0.0") { - re.NoError(cluster.PutStore(store.GetMeta())) + re.NoError(cluster.PutMetaStore(store.GetMeta())) } re.Equal("4.0.9", cluster.GetClusterVersion()) @@ -582,14 +582,14 @@ func TestStoreClusterVersion(t *testing.T) { s1.Version = "5.0.1" s2.Version = "5.0.3" s3.Version = "5.0.5" - re.NoError(cluster.PutStore(s2)) + re.NoError(cluster.PutMetaStore(s2)) re.Equal(s2.Version, cluster.GetClusterVersion()) - re.NoError(cluster.PutStore(s1)) + re.NoError(cluster.PutMetaStore(s1)) // the cluster version should be 5.0.1(the min one) re.Equal(s1.Version, cluster.GetClusterVersion()) - re.NoError(cluster.PutStore(s3)) + re.NoError(cluster.PutMetaStore(s3)) // the cluster version should be 5.0.1(the min one) re.Equal(s1.Version, cluster.GetClusterVersion()) } @@ -679,7 +679,7 @@ func TestBucketHeartbeat(t *testing.T) { n, np := uint64(2), uint64(2) regions := newTestRegions(n, n, np) for _, store := range stores { - re.NoError(cluster.putStoreLocked(store)) + re.NoError(cluster.setStore(store)) } re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), regions[0])) @@ -729,31 +729,31 @@ func TestRegionHeartbeat(t *testing.T) { regions := newTestRegions(n, n, np) for _, store := range stores { - re.NoError(cluster.putStoreLocked(store)) + re.NoError(cluster.setStore(store)) } for i, region := range regions { // region does not exist. re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is the same, not updated. re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) origin := region // region is updated. region = origin.Clone(core.WithIncVersion()) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is stale (Version). stale := origin.Clone(core.WithIncConfVer()) re.Error(cluster.processRegionHeartbeat(core.ContextTODO(), stale)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is updated @@ -763,13 +763,13 @@ func TestRegionHeartbeat(t *testing.T) { ) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is stale (ConfVer). stale = origin.Clone(core.WithIncConfVer()) re.Error(cluster.processRegionHeartbeat(core.ContextTODO(), stale)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // Add a down peer. @@ -781,38 +781,38 @@ func TestRegionHeartbeat(t *testing.T) { })) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) // Add a pending peer. region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[rand.Intn(len(region.GetPeers()))]})) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) // Clear down peers. region = region.Clone(core.WithDownPeers(nil)) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) // Clear pending peers. region = region.Clone(core.WithPendingPeers(nil)) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) // Remove peers. origin = region region = origin.Clone(core.SetPeers(region.GetPeers()[:1])) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // Add peers. region = origin regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // Change one peer to witness @@ -822,47 +822,47 @@ func TestRegionHeartbeat(t *testing.T) { ) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) // Change leader. region = region.Clone(core.WithLeader(region.GetPeers()[1])) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) // Change ApproximateSize. region = region.Clone(core.SetApproximateSize(144)) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) // Change ApproximateKeys. region = region.Clone(core.SetApproximateKeys(144000)) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) // Change bytes written. region = region.Clone(core.SetWrittenBytes(24000)) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) // Change bytes read. region = region.Clone(core.SetReadBytes(1080000)) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) // Flashback region = region.Clone(core.WithFlashback(true, 1)) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) region = region.Clone(core.WithFlashback(false, 0)) regions[i] = region re.NoError(cluster.processRegionHeartbeat(core.ContextTODO(), region)) - checkRegions(re, cluster.core, regions[:i+1]) + checkRegions(re, cluster.BasicCluster, regions[:i+1]) } regionCounts := make(map[uint64]int) @@ -894,10 +894,10 @@ func TestRegionHeartbeat(t *testing.T) { time.Sleep(50 * time.Millisecond) for _, store := range cluster.GetStores() { - re.Equal(cluster.core.GetStoreLeaderCount(store.GetID()), store.GetLeaderCount()) - re.Equal(cluster.core.GetStoreRegionCount(store.GetID()), store.GetRegionCount()) - re.Equal(cluster.core.GetStoreLeaderRegionSize(store.GetID()), store.GetLeaderSize()) - re.Equal(cluster.core.GetStoreRegionSize(store.GetID()), store.GetRegionSize()) + re.Equal(cluster.GetStoreLeaderCount(store.GetID()), store.GetLeaderCount()) + re.Equal(cluster.GetStoreRegionCount(store.GetID()), store.GetRegionCount()) + re.Equal(cluster.GetStoreLeaderRegionSize(store.GetID()), store.GetLeaderSize()) + re.Equal(cluster.GetStoreRegionSize(store.GetID()), store.GetRegionSize()) } // Test with storage. @@ -1133,7 +1133,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) { State: metapb.StoreState_Up, Labels: labels, } - re.NoError(cluster.putStoreLocked(core.NewStoreInfo(store))) + re.NoError(cluster.setStore(core.NewStoreInfo(store))) } peers := make([]*metapb.Peer, 0, 4) @@ -1296,7 +1296,7 @@ func TestOfflineAndMerge(t *testing.T) { // Put 4 stores. for _, store := range newTestStores(4, "5.0.0") { - re.NoError(cluster.PutStore(store.GetMeta())) + re.NoError(cluster.PutMetaStore(store.GetMeta())) } peers := []*metapb.Peer{ @@ -1351,7 +1351,7 @@ func TestStoreConfigUpdate(t *testing.T) { tc := newTestCluster(ctx, opt) stores := newTestStores(5, "2.0.0") for _, s := range stores { - re.NoError(tc.putStoreLocked(s)) + re.NoError(tc.setStore(s)) } re.Len(tc.getUpStores(), 5) // Case1: big region. @@ -1436,7 +1436,7 @@ func TestSyncConfigContext(t *testing.T) { })) stores := newTestStores(1, "2.0.0") for _, s := range stores { - re.NoError(tc.putStoreLocked(s)) + re.NoError(tc.setStore(s)) } // trip schema header now := time.Now() @@ -1458,7 +1458,7 @@ func TestStoreConfigSync(t *testing.T) { tc := newTestCluster(ctx, opt) stores := newTestStores(5, "2.0.0") for _, s := range stores { - re.NoError(tc.putStoreLocked(s)) + re.NoError(tc.setStore(s)) } re.Len(tc.getUpStores(), 5) @@ -1503,7 +1503,7 @@ func TestUpdateStorePendingPeerCount(t *testing.T) { tc.RaftCluster.coordinator = schedule.NewCoordinator(ctx, tc.RaftCluster, nil) stores := newTestStores(5, "2.0.0") for _, s := range stores { - re.NoError(tc.putStoreLocked(s)) + re.NoError(tc.setStore(s)) } tc.RaftCluster.wg.Add(1) go tc.RaftCluster.runUpdateStoreStats() @@ -1678,7 +1678,7 @@ func TestCalculateStoreSize1(t *testing.T) { }, }...) s := store.Clone(core.SetStoreLabels(labels)) - re.NoError(cluster.PutStore(s.GetMeta())) + re.NoError(cluster.PutMetaStore(s.GetMeta())) } cluster.ruleManager.SetRule( @@ -1762,7 +1762,7 @@ func TestCalculateStoreSize2(t *testing.T) { } labels = append(labels, []*metapb.StoreLabel{{Key: "rack", Value: "r1"}, {Key: "host", Value: "h1"}}...) s := store.Clone(core.SetStoreLabels(labels)) - re.NoError(cluster.PutStore(s.GetMeta())) + re.NoError(cluster.PutMetaStore(s.GetMeta())) } cluster.ruleManager.SetRule( @@ -1812,7 +1812,7 @@ func TestStores(t *testing.T) { id := store.GetID() re.Nil(cache.GetStore(id)) re.Error(cache.PauseLeaderTransfer(id)) - cache.SetStore(store) + cache.PutStore(store) re.Equal(store, cache.GetStore(id)) re.Equal(i+1, cache.GetStoreCount()) re.NoError(cache.PauseLeaderTransfer(id)) @@ -1843,7 +1843,7 @@ func Test(t *testing.T) { _, opts, err := newTestScheduleConfig() re.NoError(err) tc := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opts, storage.NewStorageWithMemoryBackend()) - cache := tc.core + cache := tc.BasicCluster for i := uint64(0); i < n; i++ { region := regions[i] @@ -1961,7 +1961,7 @@ func TestAwakenStore(t *testing.T) { stores := newTestStores(n, "6.5.0") re.True(stores[0].NeedAwakenStore()) for _, store := range stores { - re.NoError(cluster.PutStore(store.GetMeta())) + re.NoError(cluster.PutMetaStore(store.GetMeta())) } for i := uint64(1); i <= n; i++ { re.False(cluster.slowStat.ExistsSlowStores()) @@ -1971,7 +1971,7 @@ func TestAwakenStore(t *testing.T) { now := time.Now() store4 := stores[0].Clone(core.SetLastHeartbeatTS(now), core.SetLastAwakenTime(now.Add(-11*time.Minute))) - re.NoError(cluster.putStoreLocked(store4)) + re.NoError(cluster.setStore(store4)) store1 := cluster.GetStore(1) re.True(store1.NeedAwakenStore()) @@ -2013,7 +2013,7 @@ func TestUpdateAndDeleteLabel(t *testing.T) { cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) stores := newTestStores(1, "6.5.1") for _, store := range stores { - re.NoError(cluster.PutStore(store.GetMeta())) + re.NoError(cluster.PutMetaStore(store.GetMeta())) } re.Empty(cluster.GetStore(1).GetLabels()) // Update label. @@ -2105,7 +2105,7 @@ func TestUpdateAndDeleteLabel(t *testing.T) { newStore := typeutil.DeepClone(cluster.GetStore(1).GetMeta(), core.StoreFactory) newStore.Labels = nil // Store rebooting will call PutStore. - err = cluster.PutStore(newStore) + err = cluster.PutMetaStore(newStore) re.NoError(err) // Check the label after rebooting. re.Equal([]*metapb.StoreLabel{{Key: "mode", Value: "readonly"}}, cluster.GetStore(1).GetLabels()) @@ -2142,7 +2142,7 @@ func newTestRaftCluster( s storage.Storage, ) *RaftCluster { opt.GetScheduleConfig().EnableHeartbeatConcurrentRunner = false - rc := &RaftCluster{serverCtx: ctx, core: core.NewBasicCluster(), storage: s} + rc := &RaftCluster{serverCtx: ctx, BasicCluster: core.NewBasicCluster(), storage: s} rc.InitCluster(id, opt, nil, nil) rc.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), rc, opt) if opt.IsPlacementRulesEnabled() { @@ -2151,7 +2151,7 @@ func newTestRaftCluster( panic(err) } } - rc.schedulingController = newSchedulingController(rc.ctx, rc.core, rc.opt, rc.ruleManager) + rc.schedulingController = newSchedulingController(rc.ctx, rc.BasicCluster, rc.opt, rc.ruleManager) return rc } @@ -2324,7 +2324,7 @@ func (c *testCluster) addRegionStore(storeID uint64, regionCount int, regionSize c.SetStoreLimit(storeID, storelimit.RemovePeer, 60) c.Lock() defer c.Unlock() - return c.putStoreLocked(newStore) + return c.setStore(newStore) } func (c *testCluster) addLeaderRegion(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) error { @@ -2347,7 +2347,7 @@ func (c *testCluster) updateLeaderCount(storeID uint64, leaderCount int) error { ) c.Lock() defer c.Unlock() - return c.putStoreLocked(newStore) + return c.setStore(newStore) } func (c *testCluster) addLeaderStore(storeID uint64, leaderCount int) error { @@ -2363,7 +2363,7 @@ func (c *testCluster) addLeaderStore(storeID uint64, leaderCount int) error { c.SetStoreLimit(storeID, storelimit.RemovePeer, 60) c.Lock() defer c.Unlock() - return c.putStoreLocked(newStore) + return c.setStore(newStore) } func (c *testCluster) setStoreDown(storeID uint64) error { @@ -2374,7 +2374,7 @@ func (c *testCluster) setStoreDown(storeID uint64) error { ) c.Lock() defer c.Unlock() - return c.putStoreLocked(newStore) + return c.setStore(newStore) } func (c *testCluster) setStoreOffline(storeID uint64) error { @@ -2382,7 +2382,7 @@ func (c *testCluster) setStoreOffline(storeID uint64) error { newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline, false)) c.Lock() defer c.Unlock() - return c.putStoreLocked(newStore) + return c.setStore(newStore) } func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) error { @@ -2966,7 +2966,7 @@ func TestShouldRun(t *testing.T) { nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) re.Error(tc.processRegionHeartbeat(core.ContextTODO(), newRegion)) - re.Equal(7, tc.core.GetClusterNotFromStorageRegionsCnt()) + re.Equal(7, tc.GetClusterNotFromStorageRegionsCnt()) } func TestShouldRunWithNonLeaderRegions(t *testing.T) { @@ -3009,7 +3009,7 @@ func TestShouldRunWithNonLeaderRegions(t *testing.T) { nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}} newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) re.Error(tc.processRegionHeartbeat(core.ContextTODO(), newRegion)) - re.Equal(9, tc.core.GetClusterNotFromStorageRegionsCnt()) + re.Equal(9, tc.GetClusterNotFromStorageRegionsCnt()) // Now, after server is prepared, there exist some regions with no leader. re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId()) diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 20d5a6bceae..ca846eaa885 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -195,7 +195,7 @@ func (sc *schedulingController) collectSchedulingMetrics() { // collect hot cache metrics sc.hotStat.CollectMetrics() // collect the lock metrics - sc.RegionsInfo.CollectWaitLockMetrics() + sc.CollectWaitLockMetrics() } func (sc *schedulingController) removeStoreStatistics(storeID uint64) { diff --git a/server/grpc_service.go b/server/grpc_service.go index 2b3ee232686..acfc87fcf71 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -826,7 +826,7 @@ func (s *GrpcServer) PutStore(ctx context.Context, request *pdpb.PutStoreRequest }, nil } - if err := rc.PutStore(store); err != nil { + if err := rc.PutMetaStore(store); err != nil { return &pdpb.PutStoreResponse{ Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), }, nil diff --git a/server/server.go b/server/server.go index af9f48f8c9b..1d38a5ee495 100644 --- a/server/server.go +++ b/server/server.go @@ -1555,8 +1555,6 @@ func (s *Server) UpdateGRPCServiceRateLimiter(serviceLabel string, opts ...ratel // GetClusterStatus gets cluster status. func (s *Server) GetClusterStatus() (*cluster.Status, error) { - s.cluster.Lock() - defer s.cluster.Unlock() return s.cluster.LoadClusterStatus() } diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index cf2c6dd2508..365ab1ca493 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -498,19 +498,19 @@ func (suite *apiTestSuite) checkAdminRegionCacheForward(cluster *tests.TestClust apiServer := cluster.GetLeaderServer().GetServer() schedulingServer := cluster.GetSchedulingPrimaryServer() re.Equal(3, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) - re.Equal(3, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count) + re.Equal(3, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{})) addr := cluster.GetLeaderServer().GetAddr() urlPrefix := fmt.Sprintf("%s/pd/api/v1/admin/cache/region", addr) err := testutil.CheckDelete(tests.TestDialClient, fmt.Sprintf("%s/%s", urlPrefix, "30"), testutil.StatusOK(re)) re.NoError(err) re.Equal(2, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) - re.Equal(2, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count) + re.Equal(2, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{})) err = testutil.CheckDelete(tests.TestDialClient, urlPrefix+"s", testutil.StatusOK(re)) re.NoError(err) re.Equal(0, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) - re.Equal(0, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count) + re.Equal(0, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{})) } func (suite *apiTestSuite) TestFollowerForward() { diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index d7883379731..54622d5c515 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -175,7 +175,7 @@ func (suite *configTestSuite) TestSchedulerConfigWatch() { }) assertEvictLeaderStoreIDs(re, storage, []uint64{1}) // Update the scheduler by adding a store. - err = suite.pdLeaderServer.GetServer().GetRaftCluster().PutStore( + err = suite.pdLeaderServer.GetServer().GetRaftCluster().PutMetaStore( &metapb.Store{ Id: 2, Address: "mock://2", diff --git a/tests/integrations/mcs/scheduling/meta_test.go b/tests/integrations/mcs/scheduling/meta_test.go index abc1efd9021..11782590ab9 100644 --- a/tests/integrations/mcs/scheduling/meta_test.go +++ b/tests/integrations/mcs/scheduling/meta_test.go @@ -79,7 +79,7 @@ func (suite *metaTestSuite) TestStoreWatch() { ) re.NoError(err) for i := uint64(1); i <= 4; i++ { - suite.pdLeaderServer.GetServer().GetRaftCluster().PutStore( + suite.pdLeaderServer.GetServer().GetRaftCluster().PutMetaStore( &metapb.Store{Id: i, Address: fmt.Sprintf("mock-%d", i), State: metapb.StoreState_Up, NodeState: metapb.NodeState_Serving, LastHeartbeat: time.Now().UnixNano()}, ) } @@ -102,7 +102,7 @@ func (suite *metaTestSuite) TestStoreWatch() { }) // test synchronized store labels - suite.pdLeaderServer.GetServer().GetRaftCluster().PutStore( + suite.pdLeaderServer.GetServer().GetRaftCluster().PutMetaStore( &metapb.Store{Id: 5, Address: "mock-5", State: metapb.StoreState_Up, NodeState: metapb.NodeState_Serving, LastHeartbeat: time.Now().UnixNano(), Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}}}, ) testutil.Eventually(re, func() bool { diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 38c1cc6a41b..82da47d18f3 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -310,7 +310,7 @@ func (suite *serverTestSuite) TestSchedulerSync() { checkEvictLeaderSchedulerExist(re, schedulersController, true) checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1}) // Add a store_id to the evict-leader-scheduler through the API server. - err = suite.pdLeader.GetServer().GetRaftCluster().PutStore( + err = suite.pdLeader.GetServer().GetRaftCluster().PutMetaStore( &metapb.Store{ Id: 2, Address: "mock://2", diff --git a/tests/server/api/region_test.go b/tests/server/api/region_test.go index 2ff0b5d4b86..23ebceaefd6 100644 --- a/tests/server/api/region_test.go +++ b/tests/server/api/region_test.go @@ -407,7 +407,7 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) func checkRegionCount(re *require.Assertions, cluster *tests.TestCluster, count uint64) { leader := cluster.GetLeaderServer() tu.Eventually(re, func() bool { - return leader.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count == int(count) + return leader.GetRaftCluster().GetRegionCount([]byte{}, []byte{}) == int(count) }) if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { tu.Eventually(re, func() bool { diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 61a4561c55a..07bcf3ee2a1 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -601,7 +601,7 @@ func TestRaftClusterMultipleRestart(t *testing.T) { store := newMetaStore(storeID, "127.0.0.1:4", "2.1.0", metapb.StoreState_Offline, getTestDeployPath(storeID)) rc := leaderServer.GetRaftCluster() re.NotNil(rc) - err = rc.PutStore(store) + err = rc.PutMetaStore(store) re.NoError(err) re.NotNil(tc) rc.Stop()