From b23924f7ec951a4020e09d73f5a25ab8fa6593b4 Mon Sep 17 00:00:00 2001 From: husharp Date: Mon, 18 Sep 2023 17:19:46 +0800 Subject: [PATCH] change to healthy region and move to region tree Signed-off-by: husharp --- pkg/core/region.go | 53 ++++++++-------------------- pkg/core/region_tree.go | 21 +++++++++++ pkg/schedule/prepare_checker.go | 4 +-- pkg/storage/endpoint/meta.go | 21 +++++------ pkg/storage/storage.go | 10 +++--- pkg/storage/storage_test.go | 35 +++++------------- pkg/syncer/client.go | 11 ++---- server/cluster/cluster.go | 6 +--- server/cluster/cluster_test.go | 1 - tests/server/cluster/cluster_test.go | 4 +-- 10 files changed, 65 insertions(+), 101 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 4b1de26f0454..07574d995a24 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -813,22 +813,19 @@ type RegionsInfo struct { learners map[uint64]*regionTree // storeID -> sub regionTree witnesses map[uint64]*regionTree // storeID -> sub regionTree pendingPeers map[uint64]*regionTree // storeID -> sub regionTree - // count the stale meta regions - staleRegionCnt int64 } // NewRegionsInfo creates RegionsInfo with tree, regions, leaders and followers func NewRegionsInfo() *RegionsInfo { return &RegionsInfo{ - tree: newRegionTree(), - regions: make(map[uint64]*regionItem), - subRegions: make(map[uint64]*regionItem), - leaders: make(map[uint64]*regionTree), - followers: make(map[uint64]*regionTree), - learners: make(map[uint64]*regionTree), - witnesses: make(map[uint64]*regionTree), - pendingPeers: make(map[uint64]*regionTree), - staleRegionCnt: 0, + tree: newRegionTree(), + regions: make(map[uint64]*regionItem), + subRegions: make(map[uint64]*regionItem), + leaders: make(map[uint64]*regionTree), + followers: make(map[uint64]*regionTree), + learners: make(map[uint64]*regionTree), + witnesses: make(map[uint64]*regionTree), + pendingPeers: make(map[uint64]*regionTree), } } @@ -882,29 +879,6 @@ func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*reg return origin, overlaps, err } -// GetStaleRegionCnt returns the stale region count. -func (r *RegionsInfo) GetStaleRegionCnt() int64 { - return atomic.LoadInt64(&r.staleRegionCnt) -} - -// AtomicAddStaleRegionCnt atomically adds the stale region count. -func (r *RegionsInfo) AtomicAddStaleRegionCnt() { - atomic.AddInt64(&r.staleRegionCnt, 1) -} - -// AtomicBatchAddStaleRegionCnt atomically batch adds the stale region count. -func (r *RegionsInfo) AtomicBatchAddStaleRegionCnt(num int64) { - atomic.AddInt64(&r.staleRegionCnt, num) -} - -// AtomicSubStaleRegionCnt atomically subtracts the stale region count. -func (r *RegionsInfo) AtomicSubStaleRegionCnt() { - if atomic.LoadInt64(&r.staleRegionCnt) == 0 { - return - } - atomic.AddInt64(&r.staleRegionCnt, -1) -} - // AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put. func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo, error) { r.t.Lock() @@ -918,10 +892,6 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo r.t.Unlock() return nil, err } - // If origin is stale, need to sub the stale region count. - if origin != nil && origin.IsSourceStale() && region.IsSourceFresh() { - r.AtomicSubStaleRegionCnt() - } origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...) r.t.Unlock() r.UpdateSubTree(region, origin, overlaps, rangeChanged) @@ -1333,6 +1303,13 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo return } +// GetClusterHealthyRegionsCnt get healthy region count of cluster +func (r *RegionsInfo) GetClusterHealthyRegionsCnt() int64 { + r.st.RLock() + defer r.st.RUnlock() + return r.tree.healthyRegionsCnt +} + // GetMetaRegions gets a set of metapb.Region from regionMap func (r *RegionsInfo) GetMetaRegions() []*metapb.Region { r.t.RLock() diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index db1c8c28fc71..3c7991b3234c 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -61,6 +61,8 @@ type regionTree struct { totalSize int64 totalWriteBytesRate float64 totalWriteKeysRate float64 + // count the healthy meta regions + healthyRegionsCnt int64 } func newRegionTree() *regionTree { @@ -69,6 +71,7 @@ func newRegionTree() *regionTree { totalSize: 0, totalWriteBytesRate: 0, totalWriteKeysRate: 0, + healthyRegionsCnt: 0, } } @@ -112,6 +115,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate() t.totalWriteBytesRate += regionWriteBytesRate t.totalWriteKeysRate += regionWriteKeysRate + if region.IsSourceFresh() { + t.healthyRegionsCnt++ + } if !withOverlaps { overlaps = t.overlaps(item) @@ -133,6 +139,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re regionWriteBytesRate, regionWriteKeysRate = old.GetWriteRate() t.totalWriteBytesRate -= regionWriteBytesRate t.totalWriteKeysRate -= regionWriteKeysRate + if old.IsSourceFresh() { + t.healthyRegionsCnt-- + } } return result @@ -149,6 +158,15 @@ func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) { regionWriteBytesRate, regionWriteKeysRate = origin.GetWriteRate() t.totalWriteBytesRate -= regionWriteBytesRate t.totalWriteKeysRate -= regionWriteKeysRate + + // If origin is stale, need to add the healthy region count. + if origin.IsSourceStale() && region.IsSourceFresh() { + t.healthyRegionsCnt++ + } + // If origin is healthy, need to sub the healthy region count. + if origin.IsSourceFresh() && region.IsSourceStale() { + t.healthyRegionsCnt-- + } } // remove removes a region if the region is in the tree. @@ -168,6 +186,9 @@ func (t *regionTree) remove(region *RegionInfo) { regionWriteBytesRate, regionWriteKeysRate := result.GetWriteRate() t.totalWriteBytesRate -= regionWriteBytesRate t.totalWriteKeysRate -= regionWriteKeysRate + if region.IsSourceFresh() { + t.healthyRegionsCnt-- + } t.tree.Delete(item) } diff --git a/pkg/schedule/prepare_checker.go b/pkg/schedule/prepare_checker.go index bb5aed590c7f..6cae93de0cb6 100644 --- a/pkg/schedule/prepare_checker.go +++ b/pkg/schedule/prepare_checker.go @@ -49,8 +49,8 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool { checker.prepared = true return true } - if float64(c.GetStaleRegionCnt()) < float64(c.GetTotalRegionCount())*(1-collectFactor) { - log.Info("stale meta region number is satisfied, finish prepare checker", zap.Int64("stale-region", c.GetStaleRegionCnt()), zap.Int("total-region", c.GetTotalRegionCount())) + if float64(c.GetClusterHealthyRegionsCnt()) > float64(c.GetTotalRegionCount())*collectFactor { + log.Info("healthy meta region number is satisfied, finish prepare checker", zap.Int64("stale-region", c.GetClusterHealthyRegionsCnt()), zap.Int("total-region", c.GetTotalRegionCount())) checker.prepared = true return true } diff --git a/pkg/storage/endpoint/meta.go b/pkg/storage/endpoint/meta.go index f2ddc1003226..b8463f90ac58 100644 --- a/pkg/storage/endpoint/meta.go +++ b/pkg/storage/endpoint/meta.go @@ -43,7 +43,7 @@ type MetaStorage interface { // RegionStorage defines the storage operations on the Region meta info. type RegionStorage interface { LoadRegion(regionID uint64, region *metapb.Region) (ok bool, err error) - LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) (int64, error) + LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error SaveRegion(region *metapb.Region) error DeleteRegion(region *metapb.Region) error Flush() error @@ -166,7 +166,7 @@ func (se *StorageEndpoint) LoadRegion(regionID uint64, region *metapb.Region) (o } // LoadRegions loads all regions from storage to RegionsInfo. -func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) (int64, error) { +func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error { nextID := uint64(0) endKey := RegionPath(math.MaxUint64) @@ -174,7 +174,6 @@ func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core. // the message packet to exceed the grpc message size limit (4MB). Here we use // a variable rangeLimit to work around. rangeLimit := MaxKVRangeLimit - regionsNum := int64(0) for { failpoint.Inject("slowLoadRegion", func() { rangeLimit = 1 @@ -186,36 +185,34 @@ func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core. if rangeLimit /= 2; rangeLimit >= MinKVRangeLimit { continue } - return 0, err + return err } select { case <-ctx.Done(): - return 0, ctx.Err() + return ctx.Err() default: } for _, r := range res { region := &metapb.Region{} if err := region.Unmarshal([]byte(r)); err != nil { - return 0, errs.ErrProtoUnmarshal.Wrap(err).GenWithStackByArgs() + return errs.ErrProtoUnmarshal.Wrap(err).GenWithStackByArgs() } if err = encryption.DecryptRegion(region, se.encryptionKeyManager); err != nil { - return 0, err + return err } nextID = region.GetId() + 1 - overlaps := f(core.NewRegionInfo(region, nil)) - regionsNum += 1 + overlaps := f(core.NewRegionInfo(region, nil, core.SetSource(core.FromStorage))) for _, item := range overlaps { if err := se.DeleteRegion(item.GetMeta()); err != nil { - return 0, err + return err } - regionsNum -= 1 } } if len(res) < rangeLimit { - return regionsNum, nil + return nil } } } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 382d28fd7cdf..aba01dfa8063 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -123,7 +123,7 @@ func TrySwitchRegionStorage(s Storage, useLocalRegionStorage bool) endpoint.Regi // TryLoadRegionsOnce loads all regions from storage to RegionsInfo. // If the underlying storage is the local region storage, it will only load once. -func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.RegionInfo) []*core.RegionInfo) (regionsNum int64, err error) { +func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.RegionInfo) []*core.RegionInfo) error { ps, ok := s.(*coreStorage) if !ok { return s.LoadRegions(ctx, f) @@ -136,12 +136,12 @@ func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.Regi ps.mu.Lock() defer ps.mu.Unlock() if !ps.regionLoaded { - if regionsNum, err = ps.regionStorage.LoadRegions(ctx, f); err != nil { - return 0, err + if err := ps.regionStorage.LoadRegions(ctx, f); err != nil { + return err } ps.regionLoaded = true } - return + return nil } // LoadRegion loads one region from storage. @@ -153,7 +153,7 @@ func (ps *coreStorage) LoadRegion(regionID uint64, region *metapb.Region) (ok bo } // LoadRegions loads all regions from storage to RegionsInfo. -func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) (int64, error) { +func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error { if atomic.LoadInt32(&ps.useRegionStorage) > 0 { return ps.regionStorage.LoadRegions(ctx, f) } diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index b5af92aa86ce..768df059f280 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -216,9 +216,7 @@ func TestLoadRegions(t *testing.T) { n := 10 regions := mustSaveRegions(re, storage, n) - regionsNum, err := storage.LoadRegions(context.Background(), cache.CheckAndPutRegion) - re.NoError(err) - re.Equal(int64(n), regionsNum) + re.NoError(storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)) re.Equal(n, cache.GetTotalRegionCount()) for _, region := range cache.GetMetaRegions() { @@ -255,9 +253,7 @@ func TestLoadRegionsToCache(t *testing.T) { n := 10 regions := mustSaveRegions(re, storage, n) - regionsNum, err := storage.LoadRegions(context.Background(), cache.CheckAndPutRegion) - re.NoError(err) - re.Equal(int64(n), regionsNum) + re.NoError(storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)) re.Equal(n, cache.GetTotalRegionCount()) for _, region := range cache.GetMetaRegions() { @@ -266,9 +262,7 @@ func TestLoadRegionsToCache(t *testing.T) { n = 20 mustSaveRegions(re, storage, n) - regionsNum, err = TryLoadRegionsOnce(context.Background(), storage, cache.CheckAndPutRegion) - re.NoError(err) - re.Equal(int64(n), regionsNum) + re.NoError(TryLoadRegionsOnce(context.Background(), storage, cache.CheckAndPutRegion)) re.Equal(n, cache.GetTotalRegionCount()) } @@ -280,9 +274,7 @@ func TestLoadRegionsExceedRangeLimit(t *testing.T) { n := 1000 regions := mustSaveRegions(re, storage, n) - regionsNum, err := storage.LoadRegions(context.Background(), cache.CheckAndPutRegion) - re.NoError(err) - re.Equal(int64(n), regionsNum) + re.NoError(storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)) re.Equal(n, cache.GetTotalRegionCount()) for _, region := range cache.GetMetaRegions() { re.Equal(regions[region.GetId()], region) @@ -300,12 +292,8 @@ func TestTrySwitchRegionStorage(t *testing.T) { TrySwitchRegionStorage(storage, false) regions10 := mustSaveRegions(re, storage, 10) - regionsNum, err := defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion) - re.NoError(err) - re.Equal(int64(10), regionsNum) - regionsNum, err = localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion) - re.NoError(err) - re.Equal(int64(0), regionsNum) + re.NoError(defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion)) + re.NoError(localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion)) re.Empty(localCache.GetMetaRegions()) re.Len(defaultCache.GetMetaRegions(), 10) for _, region := range defaultCache.GetMetaRegions() { @@ -314,12 +302,8 @@ func TestTrySwitchRegionStorage(t *testing.T) { TrySwitchRegionStorage(storage, true) regions20 := mustSaveRegions(re, storage, 20) - regionsNum, err = defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion) - re.NoError(err) - re.Equal(int64(10), regionsNum) - regionsNum, err = localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion) - re.NoError(err) - re.Equal(int64(20), regionsNum) + re.NoError(defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion)) + re.NoError(localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion)) re.Len(defaultCache.GetMetaRegions(), 10) re.Len(localCache.GetMetaRegions(), 20) for _, region := range defaultCache.GetMetaRegions() { @@ -443,8 +427,7 @@ func benchmarkLoadRegions(b *testing.B, n int, ratio int) { }() b.ResetTimer() - regionsNum, err := lb.LoadRegions(context.Background(), cluster.CheckAndPutRegion) - re.Equal(int64(n), regionsNum) + err = lb.LoadRegions(context.Background(), cluster.CheckAndPutRegion) re.NoError(err) } diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index f847a523adfb..08a17755a4f4 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -92,12 +92,9 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { // used to load region from kv storage to cache storage. bc := s.server.GetBasicCluster() regionStorage := s.server.GetStorage() - log.Info("region syncer start load regions") start := time.Now() - loadRegionsNum, err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion) - // FromStorage means this region's meta info might be stale. - bc.AtomicBatchAddStaleRegionCnt(loadRegionsNum) - log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start)), zap.Int64("regions-by-load", loadRegionsNum)) + err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion) + log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start))) if err != nil { log.Warn("failed to load regions", errs.ZapError(err)) } @@ -196,10 +193,6 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) continue } - // FromSync means this region's meta info might be stale. - if origin == nil || origin.IsSourceFresh() { - bc.RegionsInfo.AtomicAddStaleRegionCnt() - } _, saveKV, _, _ := regionGuide(region, origin) overlaps := bc.PutRegion(region) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 33a4b6c6c571..f2c505183d66 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -587,16 +587,12 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { start = time.Now() // used to load region from kv storage to cache storage. - loadRegionsNum, err := storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion) - if err != nil { + if err = storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil { return nil, err } - // FromStorage means this region's meta info might be stale. - c.core.AtomicBatchAddStaleRegionCnt(loadRegionsNum) log.Info("load regions", zap.Int("count", c.core.GetTotalRegionCount()), zap.Duration("cost", time.Since(start)), - zap.Int64("regions-by-load", loadRegionsNum), ) if !c.isAPIServiceMode { for _, store := range c.GetStores() { diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 5cb273139415..10848014740b 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2375,7 +2375,6 @@ func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) er peer, _ := c.AllocPeer(id) region.Peers = append(region.Peers, peer) } - c.core.AtomicAddStaleRegionCnt() return c.putRegion(core.NewRegionInfo(region, nil)) } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 1711125f8784..f22a754b8bf4 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -882,9 +882,7 @@ func TestLoadClusterInfo(t *testing.T) { for _, region := range regions { re.NoError(testStorage.SaveRegion(region)) } - regionsNum, err := storage.TryLoadRegionsOnce(ctx, testStorage, raftCluster.GetBasicCluster().PutRegion) - re.NoError(err) - re.Equal(int64(0), regionsNum) + re.NoError(storage.TryLoadRegionsOnce(ctx, testStorage, raftCluster.GetBasicCluster().PutRegion)) re.Equal(n, raftCluster.GetTotalRegionCount()) }