Skip to content

Commit

Permalink
change to healthy region and move to region tree
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Sep 18, 2023
1 parent 67b4081 commit b23924f
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 101 deletions.
53 changes: 15 additions & 38 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,22 +813,19 @@ type RegionsInfo struct {
learners map[uint64]*regionTree // storeID -> sub regionTree
witnesses map[uint64]*regionTree // storeID -> sub regionTree
pendingPeers map[uint64]*regionTree // storeID -> sub regionTree
// count the stale meta regions
staleRegionCnt int64
}

// NewRegionsInfo creates RegionsInfo with tree, regions, leaders and followers
func NewRegionsInfo() *RegionsInfo {
return &RegionsInfo{
tree: newRegionTree(),
regions: make(map[uint64]*regionItem),
subRegions: make(map[uint64]*regionItem),
leaders: make(map[uint64]*regionTree),
followers: make(map[uint64]*regionTree),
learners: make(map[uint64]*regionTree),
witnesses: make(map[uint64]*regionTree),
pendingPeers: make(map[uint64]*regionTree),
staleRegionCnt: 0,
tree: newRegionTree(),
regions: make(map[uint64]*regionItem),
subRegions: make(map[uint64]*regionItem),
leaders: make(map[uint64]*regionTree),
followers: make(map[uint64]*regionTree),
learners: make(map[uint64]*regionTree),
witnesses: make(map[uint64]*regionTree),
pendingPeers: make(map[uint64]*regionTree),
}
}

Expand Down Expand Up @@ -882,29 +879,6 @@ func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*reg
return origin, overlaps, err
}

// GetStaleRegionCnt returns the stale region count.
func (r *RegionsInfo) GetStaleRegionCnt() int64 {
return atomic.LoadInt64(&r.staleRegionCnt)
}

// AtomicAddStaleRegionCnt atomically adds the stale region count.
func (r *RegionsInfo) AtomicAddStaleRegionCnt() {
atomic.AddInt64(&r.staleRegionCnt, 1)
}

// AtomicBatchAddStaleRegionCnt atomically batch adds the stale region count.
func (r *RegionsInfo) AtomicBatchAddStaleRegionCnt(num int64) {
atomic.AddInt64(&r.staleRegionCnt, num)
}

// AtomicSubStaleRegionCnt atomically subtracts the stale region count.
func (r *RegionsInfo) AtomicSubStaleRegionCnt() {
if atomic.LoadInt64(&r.staleRegionCnt) == 0 {
return
}
atomic.AddInt64(&r.staleRegionCnt, -1)
}

// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo, error) {
r.t.Lock()
Expand All @@ -918,10 +892,6 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo
r.t.Unlock()
return nil, err
}
// If origin is stale, need to sub the stale region count.
if origin != nil && origin.IsSourceStale() && region.IsSourceFresh() {
r.AtomicSubStaleRegionCnt()
}
origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
r.UpdateSubTree(region, origin, overlaps, rangeChanged)
Expand Down Expand Up @@ -1333,6 +1303,13 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo
return
}

// GetClusterHealthyRegionsCnt get healthy region count of cluster
func (r *RegionsInfo) GetClusterHealthyRegionsCnt() int64 {
r.st.RLock()
defer r.st.RUnlock()
return r.tree.healthyRegionsCnt
}

// GetMetaRegions gets a set of metapb.Region from regionMap
func (r *RegionsInfo) GetMetaRegions() []*metapb.Region {
r.t.RLock()
Expand Down
21 changes: 21 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type regionTree struct {
totalSize int64
totalWriteBytesRate float64
totalWriteKeysRate float64
// count the healthy meta regions
healthyRegionsCnt int64
}

func newRegionTree() *regionTree {
Expand All @@ -69,6 +71,7 @@ func newRegionTree() *regionTree {
totalSize: 0,
totalWriteBytesRate: 0,
totalWriteKeysRate: 0,
healthyRegionsCnt: 0,
}
}

Expand Down Expand Up @@ -112,6 +115,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
t.totalWriteBytesRate += regionWriteBytesRate
t.totalWriteKeysRate += regionWriteKeysRate
if region.IsSourceFresh() {
t.healthyRegionsCnt++
}

if !withOverlaps {
overlaps = t.overlaps(item)
Expand All @@ -133,6 +139,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
regionWriteBytesRate, regionWriteKeysRate = old.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
if old.IsSourceFresh() {
t.healthyRegionsCnt--
}
}

return result
Expand All @@ -149,6 +158,15 @@ func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
regionWriteBytesRate, regionWriteKeysRate = origin.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate

// If origin is stale, need to add the healthy region count.
if origin.IsSourceStale() && region.IsSourceFresh() {
t.healthyRegionsCnt++
}
// If origin is healthy, need to sub the healthy region count.
if origin.IsSourceFresh() && region.IsSourceStale() {
t.healthyRegionsCnt--
}
}

// remove removes a region if the region is in the tree.
Expand All @@ -168,6 +186,9 @@ func (t *regionTree) remove(region *RegionInfo) {
regionWriteBytesRate, regionWriteKeysRate := result.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
if region.IsSourceFresh() {
t.healthyRegionsCnt--
}
t.tree.Delete(item)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
checker.prepared = true
return true
}
if float64(c.GetStaleRegionCnt()) < float64(c.GetTotalRegionCount())*(1-collectFactor) {
log.Info("stale meta region number is satisfied, finish prepare checker", zap.Int64("stale-region", c.GetStaleRegionCnt()), zap.Int("total-region", c.GetTotalRegionCount()))
if float64(c.GetClusterHealthyRegionsCnt()) > float64(c.GetTotalRegionCount())*collectFactor {
log.Info("healthy meta region number is satisfied, finish prepare checker", zap.Int64("stale-region", c.GetClusterHealthyRegionsCnt()), zap.Int("total-region", c.GetTotalRegionCount()))
checker.prepared = true
return true
}
Expand Down
21 changes: 9 additions & 12 deletions pkg/storage/endpoint/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type MetaStorage interface {
// RegionStorage defines the storage operations on the Region meta info.
type RegionStorage interface {
LoadRegion(regionID uint64, region *metapb.Region) (ok bool, err error)
LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) (int64, error)
LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error
SaveRegion(region *metapb.Region) error
DeleteRegion(region *metapb.Region) error
Flush() error
Expand Down Expand Up @@ -166,15 +166,14 @@ func (se *StorageEndpoint) LoadRegion(regionID uint64, region *metapb.Region) (o
}

// LoadRegions loads all regions from storage to RegionsInfo.
func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) (int64, error) {
func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error {
nextID := uint64(0)
endKey := RegionPath(math.MaxUint64)

// Since the region key may be very long, using a larger rangeLimit will cause
// the message packet to exceed the grpc message size limit (4MB). Here we use
// a variable rangeLimit to work around.
rangeLimit := MaxKVRangeLimit
regionsNum := int64(0)
for {
failpoint.Inject("slowLoadRegion", func() {
rangeLimit = 1
Expand All @@ -186,36 +185,34 @@ func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.
if rangeLimit /= 2; rangeLimit >= MinKVRangeLimit {
continue
}
return 0, err
return err
}
select {
case <-ctx.Done():
return 0, ctx.Err()
return ctx.Err()
default:
}

for _, r := range res {
region := &metapb.Region{}
if err := region.Unmarshal([]byte(r)); err != nil {
return 0, errs.ErrProtoUnmarshal.Wrap(err).GenWithStackByArgs()
return errs.ErrProtoUnmarshal.Wrap(err).GenWithStackByArgs()
}
if err = encryption.DecryptRegion(region, se.encryptionKeyManager); err != nil {
return 0, err
return err
}

nextID = region.GetId() + 1
overlaps := f(core.NewRegionInfo(region, nil))
regionsNum += 1
overlaps := f(core.NewRegionInfo(region, nil, core.SetSource(core.FromStorage)))
for _, item := range overlaps {
if err := se.DeleteRegion(item.GetMeta()); err != nil {
return 0, err
return err
}
regionsNum -= 1
}
}

if len(res) < rangeLimit {
return regionsNum, nil
return nil
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TrySwitchRegionStorage(s Storage, useLocalRegionStorage bool) endpoint.Regi

// TryLoadRegionsOnce loads all regions from storage to RegionsInfo.
// If the underlying storage is the local region storage, it will only load once.
func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.RegionInfo) []*core.RegionInfo) (regionsNum int64, err error) {
func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.RegionInfo) []*core.RegionInfo) error {
ps, ok := s.(*coreStorage)
if !ok {
return s.LoadRegions(ctx, f)
Expand All @@ -136,12 +136,12 @@ func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.Regi
ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.regionLoaded {
if regionsNum, err = ps.regionStorage.LoadRegions(ctx, f); err != nil {
return 0, err
if err := ps.regionStorage.LoadRegions(ctx, f); err != nil {
return err
}
ps.regionLoaded = true
}
return
return nil
}

// LoadRegion loads one region from storage.
Expand All @@ -153,7 +153,7 @@ func (ps *coreStorage) LoadRegion(regionID uint64, region *metapb.Region) (ok bo
}

// LoadRegions loads all regions from storage to RegionsInfo.
func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) (int64, error) {
func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error {
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
return ps.regionStorage.LoadRegions(ctx, f)
}
Expand Down
35 changes: 9 additions & 26 deletions pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,7 @@ func TestLoadRegions(t *testing.T) {

n := 10
regions := mustSaveRegions(re, storage, n)
regionsNum, err := storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(n), regionsNum)
re.NoError(storage.LoadRegions(context.Background(), cache.CheckAndPutRegion))

re.Equal(n, cache.GetTotalRegionCount())
for _, region := range cache.GetMetaRegions() {
Expand Down Expand Up @@ -255,9 +253,7 @@ func TestLoadRegionsToCache(t *testing.T) {

n := 10
regions := mustSaveRegions(re, storage, n)
regionsNum, err := storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(n), regionsNum)
re.NoError(storage.LoadRegions(context.Background(), cache.CheckAndPutRegion))

re.Equal(n, cache.GetTotalRegionCount())
for _, region := range cache.GetMetaRegions() {
Expand All @@ -266,9 +262,7 @@ func TestLoadRegionsToCache(t *testing.T) {

n = 20
mustSaveRegions(re, storage, n)
regionsNum, err = TryLoadRegionsOnce(context.Background(), storage, cache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(n), regionsNum)
re.NoError(TryLoadRegionsOnce(context.Background(), storage, cache.CheckAndPutRegion))
re.Equal(n, cache.GetTotalRegionCount())
}

Expand All @@ -280,9 +274,7 @@ func TestLoadRegionsExceedRangeLimit(t *testing.T) {

n := 1000
regions := mustSaveRegions(re, storage, n)
regionsNum, err := storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(n), regionsNum)
re.NoError(storage.LoadRegions(context.Background(), cache.CheckAndPutRegion))
re.Equal(n, cache.GetTotalRegionCount())
for _, region := range cache.GetMetaRegions() {
re.Equal(regions[region.GetId()], region)
Expand All @@ -300,12 +292,8 @@ func TestTrySwitchRegionStorage(t *testing.T) {

TrySwitchRegionStorage(storage, false)
regions10 := mustSaveRegions(re, storage, 10)
regionsNum, err := defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(10), regionsNum)
regionsNum, err = localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(0), regionsNum)
re.NoError(defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion))
re.NoError(localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion))
re.Empty(localCache.GetMetaRegions())
re.Len(defaultCache.GetMetaRegions(), 10)
for _, region := range defaultCache.GetMetaRegions() {
Expand All @@ -314,12 +302,8 @@ func TestTrySwitchRegionStorage(t *testing.T) {

TrySwitchRegionStorage(storage, true)
regions20 := mustSaveRegions(re, storage, 20)
regionsNum, err = defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(10), regionsNum)
regionsNum, err = localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(20), regionsNum)
re.NoError(defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion))
re.NoError(localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion))
re.Len(defaultCache.GetMetaRegions(), 10)
re.Len(localCache.GetMetaRegions(), 20)
for _, region := range defaultCache.GetMetaRegions() {
Expand Down Expand Up @@ -443,8 +427,7 @@ func benchmarkLoadRegions(b *testing.B, n int, ratio int) {
}()

b.ResetTimer()
regionsNum, err := lb.LoadRegions(context.Background(), cluster.CheckAndPutRegion)
re.Equal(int64(n), regionsNum)
err = lb.LoadRegions(context.Background(), cluster.CheckAndPutRegion)
re.NoError(err)
}

Expand Down
11 changes: 2 additions & 9 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,9 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
// used to load region from kv storage to cache storage.
bc := s.server.GetBasicCluster()
regionStorage := s.server.GetStorage()
log.Info("region syncer start load regions")
start := time.Now()
loadRegionsNum, err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion)
// FromStorage means this region's meta info might be stale.
bc.AtomicBatchAddStaleRegionCnt(loadRegionsNum)
log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start)), zap.Int64("regions-by-load", loadRegionsNum))
err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion)
log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start)))
if err != nil {
log.Warn("failed to load regions", errs.ZapError(err))
}
Expand Down Expand Up @@ -196,10 +193,6 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
}
// FromSync means this region's meta info might be stale.
if origin == nil || origin.IsSourceFresh() {
bc.RegionsInfo.AtomicAddStaleRegionCnt()
}
_, saveKV, _, _ := regionGuide(region, origin)
overlaps := bc.PutRegion(region)

Expand Down
6 changes: 1 addition & 5 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,16 +587,12 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
start = time.Now()

// used to load region from kv storage to cache storage.
loadRegionsNum, err := storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion)
if err != nil {
if err = storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil {
return nil, err
}
// FromStorage means this region's meta info might be stale.
c.core.AtomicBatchAddStaleRegionCnt(loadRegionsNum)
log.Info("load regions",
zap.Int("count", c.core.GetTotalRegionCount()),
zap.Duration("cost", time.Since(start)),
zap.Int64("regions-by-load", loadRegionsNum),
)
if !c.isAPIServiceMode {
for _, store := range c.GetStores() {
Expand Down
1 change: 0 additions & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2375,7 +2375,6 @@ func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) er
peer, _ := c.AllocPeer(id)
region.Peers = append(region.Peers, peer)
}
c.core.AtomicAddStaleRegionCnt()
return c.putRegion(core.NewRegionInfo(region, nil))
}

Expand Down
Loading

0 comments on commit b23924f

Please sign in to comment.