From 67b4081ddb9efc6f2483e8dc071da338bc5adf76 Mon Sep 17 00:00:00 2001 From: husharp Date: Wed, 13 Sep 2023 13:17:06 +0800 Subject: [PATCH] address comment Signed-off-by: husharp --- pkg/core/region.go | 45 +++++++++++++++++----------- pkg/core/region_tree.go | 21 ------------- pkg/schedule/prepare_checker.go | 2 +- pkg/storage/endpoint/meta.go | 19 +++++++----- pkg/storage/storage.go | 10 +++---- pkg/storage/storage_test.go | 40 +++++++++++++++++-------- pkg/syncer/client.go | 10 ++++--- server/cluster/cluster.go | 6 +++- tests/server/cluster/cluster_test.go | 4 ++- 9 files changed, 86 insertions(+), 71 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index cf02c72d024..4b1de26f045 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -813,19 +813,22 @@ type RegionsInfo struct { learners map[uint64]*regionTree // storeID -> sub regionTree witnesses map[uint64]*regionTree // storeID -> sub regionTree pendingPeers map[uint64]*regionTree // storeID -> sub regionTree + // count the stale meta regions + staleRegionCnt int64 } // NewRegionsInfo creates RegionsInfo with tree, regions, leaders and followers func NewRegionsInfo() *RegionsInfo { return &RegionsInfo{ - tree: newRegionTree(), - regions: make(map[uint64]*regionItem), - subRegions: make(map[uint64]*regionItem), - leaders: make(map[uint64]*regionTree), - followers: make(map[uint64]*regionTree), - learners: make(map[uint64]*regionTree), - witnesses: make(map[uint64]*regionTree), - pendingPeers: make(map[uint64]*regionTree), + tree: newRegionTree(), + regions: make(map[uint64]*regionItem), + subRegions: make(map[uint64]*regionItem), + leaders: make(map[uint64]*regionTree), + followers: make(map[uint64]*regionTree), + learners: make(map[uint64]*regionTree), + witnesses: make(map[uint64]*regionTree), + pendingPeers: make(map[uint64]*regionTree), + staleRegionCnt: 0, } } @@ -862,8 +865,6 @@ func (r *RegionsInfo) CheckAndPutRegion(region *RegionInfo) []*RegionInfo { origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...) r.t.Unlock() r.UpdateSubTree(region, origin, overlaps, rangeChanged) - // FromStorage means this region's meta info might be stale. - r.AtomicAddStaleRegionCnt() return overlaps } @@ -883,17 +884,25 @@ func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*reg // GetStaleRegionCnt returns the stale region count. func (r *RegionsInfo) GetStaleRegionCnt() int64 { - r.t.RLock() - defer r.t.RUnlock() - if r.tree.length() == 0 { - return 0 - } - return r.tree.staleRegionCnt + return atomic.LoadInt64(&r.staleRegionCnt) } // AtomicAddStaleRegionCnt atomically adds the stale region count. func (r *RegionsInfo) AtomicAddStaleRegionCnt() { - r.tree.AtomicAddStaleRegionCnt() + atomic.AddInt64(&r.staleRegionCnt, 1) +} + +// AtomicBatchAddStaleRegionCnt atomically batch adds the stale region count. +func (r *RegionsInfo) AtomicBatchAddStaleRegionCnt(num int64) { + atomic.AddInt64(&r.staleRegionCnt, num) +} + +// AtomicSubStaleRegionCnt atomically subtracts the stale region count. +func (r *RegionsInfo) AtomicSubStaleRegionCnt() { + if atomic.LoadInt64(&r.staleRegionCnt) == 0 { + return + } + atomic.AddInt64(&r.staleRegionCnt, -1) } // AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put. @@ -911,7 +920,7 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo } // If origin is stale, need to sub the stale region count. if origin != nil && origin.IsSourceStale() && region.IsSourceFresh() { - r.tree.AtomicSubStaleRegionCnt() + r.AtomicSubStaleRegionCnt() } origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...) r.t.Unlock() diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index 11ecc32825b..db1c8c28fc7 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -16,7 +16,6 @@ package core import ( "bytes" "math/rand" - "sync/atomic" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -62,8 +61,6 @@ type regionTree struct { totalSize int64 totalWriteBytesRate float64 totalWriteKeysRate float64 - // count the stale meta regions - staleRegionCnt int64 } func newRegionTree() *regionTree { @@ -72,7 +69,6 @@ func newRegionTree() *regionTree { totalSize: 0, totalWriteBytesRate: 0, totalWriteKeysRate: 0, - staleRegionCnt: 0, } } @@ -346,20 +342,3 @@ func (t *regionTree) TotalWriteRate() (bytesRate, keysRate float64) { } return t.totalWriteBytesRate, t.totalWriteKeysRate } - -func (t *regionTree) AtomicAddStaleRegionCnt() { - if t.length() == 0 { - return - } - atomic.AddInt64(&t.staleRegionCnt, 1) -} - -func (t *regionTree) AtomicSubStaleRegionCnt() { - if t.length() == 0 { - return - } - if t.staleRegionCnt == 0 { - return - } - atomic.AddInt64(&t.staleRegionCnt, -1) -} diff --git a/pkg/schedule/prepare_checker.go b/pkg/schedule/prepare_checker.go index 3ca8432e546..bb5aed590c7 100644 --- a/pkg/schedule/prepare_checker.go +++ b/pkg/schedule/prepare_checker.go @@ -50,7 +50,7 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool { return true } if float64(c.GetStaleRegionCnt()) < float64(c.GetTotalRegionCount())*(1-collectFactor) { - log.Info("stale meta region number is satisfied, skip prepare checker", zap.Int64("stale-region", c.GetStaleRegionCnt()), zap.Int("total-region", c.GetTotalRegionCount())) + log.Info("stale meta region number is satisfied, finish prepare checker", zap.Int64("stale-region", c.GetStaleRegionCnt()), zap.Int("total-region", c.GetTotalRegionCount())) checker.prepared = true return true } diff --git a/pkg/storage/endpoint/meta.go b/pkg/storage/endpoint/meta.go index 4ba9eb42c5c..f2ddc100322 100644 --- a/pkg/storage/endpoint/meta.go +++ b/pkg/storage/endpoint/meta.go @@ -43,7 +43,7 @@ type MetaStorage interface { // RegionStorage defines the storage operations on the Region meta info. type RegionStorage interface { LoadRegion(regionID uint64, region *metapb.Region) (ok bool, err error) - LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error + LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) (int64, error) SaveRegion(region *metapb.Region) error DeleteRegion(region *metapb.Region) error Flush() error @@ -166,7 +166,7 @@ func (se *StorageEndpoint) LoadRegion(regionID uint64, region *metapb.Region) (o } // LoadRegions loads all regions from storage to RegionsInfo. -func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error { +func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) (int64, error) { nextID := uint64(0) endKey := RegionPath(math.MaxUint64) @@ -174,6 +174,7 @@ func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core. // the message packet to exceed the grpc message size limit (4MB). Here we use // a variable rangeLimit to work around. rangeLimit := MaxKVRangeLimit + regionsNum := int64(0) for { failpoint.Inject("slowLoadRegion", func() { rangeLimit = 1 @@ -185,34 +186,36 @@ func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core. if rangeLimit /= 2; rangeLimit >= MinKVRangeLimit { continue } - return err + return 0, err } select { case <-ctx.Done(): - return ctx.Err() + return 0, ctx.Err() default: } for _, r := range res { region := &metapb.Region{} if err := region.Unmarshal([]byte(r)); err != nil { - return errs.ErrProtoUnmarshal.Wrap(err).GenWithStackByArgs() + return 0, errs.ErrProtoUnmarshal.Wrap(err).GenWithStackByArgs() } if err = encryption.DecryptRegion(region, se.encryptionKeyManager); err != nil { - return err + return 0, err } nextID = region.GetId() + 1 overlaps := f(core.NewRegionInfo(region, nil)) + regionsNum += 1 for _, item := range overlaps { if err := se.DeleteRegion(item.GetMeta()); err != nil { - return err + return 0, err } + regionsNum -= 1 } } if len(res) < rangeLimit { - return nil + return regionsNum, nil } } } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index aba01dfa806..382d28fd7cd 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -123,7 +123,7 @@ func TrySwitchRegionStorage(s Storage, useLocalRegionStorage bool) endpoint.Regi // TryLoadRegionsOnce loads all regions from storage to RegionsInfo. // If the underlying storage is the local region storage, it will only load once. -func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.RegionInfo) []*core.RegionInfo) error { +func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.RegionInfo) []*core.RegionInfo) (regionsNum int64, err error) { ps, ok := s.(*coreStorage) if !ok { return s.LoadRegions(ctx, f) @@ -136,12 +136,12 @@ func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.Regi ps.mu.Lock() defer ps.mu.Unlock() if !ps.regionLoaded { - if err := ps.regionStorage.LoadRegions(ctx, f); err != nil { - return err + if regionsNum, err = ps.regionStorage.LoadRegions(ctx, f); err != nil { + return 0, err } ps.regionLoaded = true } - return nil + return } // LoadRegion loads one region from storage. @@ -153,7 +153,7 @@ func (ps *coreStorage) LoadRegion(regionID uint64, region *metapb.Region) (ok bo } // LoadRegions loads all regions from storage to RegionsInfo. -func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error { +func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) (int64, error) { if atomic.LoadInt32(&ps.useRegionStorage) > 0 { return ps.regionStorage.LoadRegions(ctx, f) } diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 77f94183403..b5af92aa86c 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -216,7 +216,9 @@ func TestLoadRegions(t *testing.T) { n := 10 regions := mustSaveRegions(re, storage, n) - re.NoError(storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)) + regionsNum, err := storage.LoadRegions(context.Background(), cache.CheckAndPutRegion) + re.NoError(err) + re.Equal(int64(n), regionsNum) re.Equal(n, cache.GetTotalRegionCount()) for _, region := range cache.GetMetaRegions() { @@ -253,7 +255,9 @@ func TestLoadRegionsToCache(t *testing.T) { n := 10 regions := mustSaveRegions(re, storage, n) - re.NoError(TryLoadRegionsOnce(context.Background(), storage, cache.CheckAndPutRegion)) + regionsNum, err := storage.LoadRegions(context.Background(), cache.CheckAndPutRegion) + re.NoError(err) + re.Equal(int64(n), regionsNum) re.Equal(n, cache.GetTotalRegionCount()) for _, region := range cache.GetMetaRegions() { @@ -262,7 +266,9 @@ func TestLoadRegionsToCache(t *testing.T) { n = 20 mustSaveRegions(re, storage, n) - re.NoError(TryLoadRegionsOnce(context.Background(), storage, cache.CheckAndPutRegion)) + regionsNum, err = TryLoadRegionsOnce(context.Background(), storage, cache.CheckAndPutRegion) + re.NoError(err) + re.Equal(int64(n), regionsNum) re.Equal(n, cache.GetTotalRegionCount()) } @@ -274,7 +280,9 @@ func TestLoadRegionsExceedRangeLimit(t *testing.T) { n := 1000 regions := mustSaveRegions(re, storage, n) - re.NoError(storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)) + regionsNum, err := storage.LoadRegions(context.Background(), cache.CheckAndPutRegion) + re.NoError(err) + re.Equal(int64(n), regionsNum) re.Equal(n, cache.GetTotalRegionCount()) for _, region := range cache.GetMetaRegions() { re.Equal(regions[region.GetId()], region) @@ -292,8 +300,12 @@ func TestTrySwitchRegionStorage(t *testing.T) { TrySwitchRegionStorage(storage, false) regions10 := mustSaveRegions(re, storage, 10) - re.NoError(defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion)) - re.NoError(localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion)) + regionsNum, err := defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion) + re.NoError(err) + re.Equal(int64(10), regionsNum) + regionsNum, err = localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion) + re.NoError(err) + re.Equal(int64(0), regionsNum) re.Empty(localCache.GetMetaRegions()) re.Len(defaultCache.GetMetaRegions(), 10) for _, region := range defaultCache.GetMetaRegions() { @@ -302,8 +314,12 @@ func TestTrySwitchRegionStorage(t *testing.T) { TrySwitchRegionStorage(storage, true) regions20 := mustSaveRegions(re, storage, 20) - re.NoError(defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion)) - re.NoError(localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion)) + regionsNum, err = defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion) + re.NoError(err) + re.Equal(int64(10), regionsNum) + regionsNum, err = localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion) + re.NoError(err) + re.Equal(int64(20), regionsNum) re.Len(defaultCache.GetMetaRegions(), 10) re.Len(localCache.GetMetaRegions(), 20) for _, region := range defaultCache.GetMetaRegions() { @@ -407,6 +423,7 @@ func saveRegions(lb *levelDBBackend, n int, ratio int) error { } func benchmarkLoadRegions(b *testing.B, n int, ratio int) { + re := require.New(b) ctx := context.Background() dir := b.TempDir() lb, err := newLevelDBBackend(ctx, dir, nil) @@ -426,10 +443,9 @@ func benchmarkLoadRegions(b *testing.B, n int, ratio int) { }() b.ResetTimer() - err = lb.LoadRegions(ctx, cluster.CheckAndPutRegion) - if err != nil { - b.Fatal(err) - } + regionsNum, err := lb.LoadRegions(context.Background(), cluster.CheckAndPutRegion) + re.Equal(int64(n), regionsNum) + re.NoError(err) } var volumes = []struct { diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index cb123453e0e..f847a523adf 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -92,10 +92,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { // used to load region from kv storage to cache storage. bc := s.server.GetBasicCluster() regionStorage := s.server.GetStorage() - log.Info("region syncer start load region") + log.Info("region syncer start load regions") start := time.Now() - err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion) - log.Info("region syncer finished load region", zap.Duration("time-cost", time.Since(start))) + loadRegionsNum, err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion) + // FromStorage means this region's meta info might be stale. + bc.AtomicBatchAddStaleRegionCnt(loadRegionsNum) + log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start)), zap.Int64("regions-by-load", loadRegionsNum)) if err != nil { log.Warn("failed to load regions", errs.ZapError(err)) } @@ -195,7 +197,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { continue } // FromSync means this region's meta info might be stale. - if origin == nil || (origin != nil && origin.IsSourceFresh()) { + if origin == nil || origin.IsSourceFresh() { bc.RegionsInfo.AtomicAddStaleRegionCnt() } _, saveKV, _, _ := regionGuide(region, origin) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 18b5b53248f..33a4b6c6c57 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -587,12 +587,16 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { start = time.Now() // used to load region from kv storage to cache storage. - if err := storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil { + loadRegionsNum, err := storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion) + if err != nil { return nil, err } + // FromStorage means this region's meta info might be stale. + c.core.AtomicBatchAddStaleRegionCnt(loadRegionsNum) log.Info("load regions", zap.Int("count", c.core.GetTotalRegionCount()), zap.Duration("cost", time.Since(start)), + zap.Int64("regions-by-load", loadRegionsNum), ) if !c.isAPIServiceMode { for _, store := range c.GetStores() { diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index f22a754b8bf..1711125f878 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -882,7 +882,9 @@ func TestLoadClusterInfo(t *testing.T) { for _, region := range regions { re.NoError(testStorage.SaveRegion(region)) } - re.NoError(storage.TryLoadRegionsOnce(ctx, testStorage, raftCluster.GetBasicCluster().PutRegion)) + regionsNum, err := storage.TryLoadRegionsOnce(ctx, testStorage, raftCluster.GetBasicCluster().PutRegion) + re.NoError(err) + re.Equal(int64(0), regionsNum) re.Equal(n, raftCluster.GetTotalRegionCount()) }