Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Sep 13, 2023
1 parent c67ed32 commit 67b4081
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 71 deletions.
45 changes: 27 additions & 18 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,19 +813,22 @@ type RegionsInfo struct {
learners map[uint64]*regionTree // storeID -> sub regionTree
witnesses map[uint64]*regionTree // storeID -> sub regionTree
pendingPeers map[uint64]*regionTree // storeID -> sub regionTree
// count the stale meta regions
staleRegionCnt int64
}

// NewRegionsInfo creates RegionsInfo with tree, regions, leaders and followers
func NewRegionsInfo() *RegionsInfo {
return &RegionsInfo{
tree: newRegionTree(),
regions: make(map[uint64]*regionItem),
subRegions: make(map[uint64]*regionItem),
leaders: make(map[uint64]*regionTree),
followers: make(map[uint64]*regionTree),
learners: make(map[uint64]*regionTree),
witnesses: make(map[uint64]*regionTree),
pendingPeers: make(map[uint64]*regionTree),
tree: newRegionTree(),
regions: make(map[uint64]*regionItem),
subRegions: make(map[uint64]*regionItem),
leaders: make(map[uint64]*regionTree),
followers: make(map[uint64]*regionTree),
learners: make(map[uint64]*regionTree),
witnesses: make(map[uint64]*regionTree),
pendingPeers: make(map[uint64]*regionTree),
staleRegionCnt: 0,
}
}

Expand Down Expand Up @@ -862,8 +865,6 @@ func (r *RegionsInfo) CheckAndPutRegion(region *RegionInfo) []*RegionInfo {
origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
r.UpdateSubTree(region, origin, overlaps, rangeChanged)
// FromStorage means this region's meta info might be stale.
r.AtomicAddStaleRegionCnt()
return overlaps
}

Expand All @@ -883,17 +884,25 @@ func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*reg

// GetStaleRegionCnt returns the stale region count.
func (r *RegionsInfo) GetStaleRegionCnt() int64 {
r.t.RLock()
defer r.t.RUnlock()
if r.tree.length() == 0 {
return 0
}
return r.tree.staleRegionCnt
return atomic.LoadInt64(&r.staleRegionCnt)
}

// AtomicAddStaleRegionCnt atomically adds the stale region count.
func (r *RegionsInfo) AtomicAddStaleRegionCnt() {
r.tree.AtomicAddStaleRegionCnt()
atomic.AddInt64(&r.staleRegionCnt, 1)
}

// AtomicBatchAddStaleRegionCnt atomically batch adds the stale region count.
func (r *RegionsInfo) AtomicBatchAddStaleRegionCnt(num int64) {
atomic.AddInt64(&r.staleRegionCnt, num)
}

// AtomicSubStaleRegionCnt atomically subtracts the stale region count.
func (r *RegionsInfo) AtomicSubStaleRegionCnt() {
if atomic.LoadInt64(&r.staleRegionCnt) == 0 {
return

Check warning on line 903 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L902-L903

Added lines #L902 - L903 were not covered by tests
}
atomic.AddInt64(&r.staleRegionCnt, -1)

Check warning on line 905 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L905

Added line #L905 was not covered by tests
}

// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
Expand All @@ -911,7 +920,7 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo
}
// If origin is stale, need to sub the stale region count.
if origin != nil && origin.IsSourceStale() && region.IsSourceFresh() {
r.tree.AtomicSubStaleRegionCnt()
r.AtomicSubStaleRegionCnt()

Check warning on line 923 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L923

Added line #L923 was not covered by tests
}
origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
Expand Down
21 changes: 0 additions & 21 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package core
import (
"bytes"
"math/rand"
"sync/atomic"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -62,8 +61,6 @@ type regionTree struct {
totalSize int64
totalWriteBytesRate float64
totalWriteKeysRate float64
// count the stale meta regions
staleRegionCnt int64
}

func newRegionTree() *regionTree {
Expand All @@ -72,7 +69,6 @@ func newRegionTree() *regionTree {
totalSize: 0,
totalWriteBytesRate: 0,
totalWriteKeysRate: 0,
staleRegionCnt: 0,
}
}

Expand Down Expand Up @@ -346,20 +342,3 @@ func (t *regionTree) TotalWriteRate() (bytesRate, keysRate float64) {
}
return t.totalWriteBytesRate, t.totalWriteKeysRate
}

func (t *regionTree) AtomicAddStaleRegionCnt() {
if t.length() == 0 {
return
}
atomic.AddInt64(&t.staleRegionCnt, 1)
}

func (t *regionTree) AtomicSubStaleRegionCnt() {
if t.length() == 0 {
return
}
if t.staleRegionCnt == 0 {
return
}
atomic.AddInt64(&t.staleRegionCnt, -1)
}
2 changes: 1 addition & 1 deletion pkg/schedule/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
return true
}
if float64(c.GetStaleRegionCnt()) < float64(c.GetTotalRegionCount())*(1-collectFactor) {
log.Info("stale meta region number is satisfied, skip prepare checker", zap.Int64("stale-region", c.GetStaleRegionCnt()), zap.Int("total-region", c.GetTotalRegionCount()))
log.Info("stale meta region number is satisfied, finish prepare checker", zap.Int64("stale-region", c.GetStaleRegionCnt()), zap.Int("total-region", c.GetTotalRegionCount()))
checker.prepared = true
return true
}
Expand Down
19 changes: 11 additions & 8 deletions pkg/storage/endpoint/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type MetaStorage interface {
// RegionStorage defines the storage operations on the Region meta info.
type RegionStorage interface {
LoadRegion(regionID uint64, region *metapb.Region) (ok bool, err error)
LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error
LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) (int64, error)
SaveRegion(region *metapb.Region) error
DeleteRegion(region *metapb.Region) error
Flush() error
Expand Down Expand Up @@ -166,14 +166,15 @@ func (se *StorageEndpoint) LoadRegion(regionID uint64, region *metapb.Region) (o
}

// LoadRegions loads all regions from storage to RegionsInfo.
func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error {
func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) (int64, error) {
nextID := uint64(0)
endKey := RegionPath(math.MaxUint64)

// Since the region key may be very long, using a larger rangeLimit will cause
// the message packet to exceed the grpc message size limit (4MB). Here we use
// a variable rangeLimit to work around.
rangeLimit := MaxKVRangeLimit
regionsNum := int64(0)
for {
failpoint.Inject("slowLoadRegion", func() {
rangeLimit = 1
Expand All @@ -185,34 +186,36 @@ func (se *StorageEndpoint) LoadRegions(ctx context.Context, f func(region *core.
if rangeLimit /= 2; rangeLimit >= MinKVRangeLimit {
continue
}
return err
return 0, err

Check warning on line 189 in pkg/storage/endpoint/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/endpoint/meta.go#L189

Added line #L189 was not covered by tests
}
select {
case <-ctx.Done():
return ctx.Err()
return 0, ctx.Err()
default:
}

for _, r := range res {
region := &metapb.Region{}
if err := region.Unmarshal([]byte(r)); err != nil {
return errs.ErrProtoUnmarshal.Wrap(err).GenWithStackByArgs()
return 0, errs.ErrProtoUnmarshal.Wrap(err).GenWithStackByArgs()

Check warning on line 200 in pkg/storage/endpoint/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/endpoint/meta.go#L200

Added line #L200 was not covered by tests
}
if err = encryption.DecryptRegion(region, se.encryptionKeyManager); err != nil {
return err
return 0, err

Check warning on line 203 in pkg/storage/endpoint/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/endpoint/meta.go#L203

Added line #L203 was not covered by tests
}

nextID = region.GetId() + 1
overlaps := f(core.NewRegionInfo(region, nil))
regionsNum += 1
for _, item := range overlaps {
if err := se.DeleteRegion(item.GetMeta()); err != nil {
return err
return 0, err

Check warning on line 211 in pkg/storage/endpoint/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/endpoint/meta.go#L211

Added line #L211 was not covered by tests
}
regionsNum -= 1

Check warning on line 213 in pkg/storage/endpoint/meta.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/endpoint/meta.go#L213

Added line #L213 was not covered by tests
}
}

if len(res) < rangeLimit {
return nil
return regionsNum, nil
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TrySwitchRegionStorage(s Storage, useLocalRegionStorage bool) endpoint.Regi

// TryLoadRegionsOnce loads all regions from storage to RegionsInfo.
// If the underlying storage is the local region storage, it will only load once.
func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.RegionInfo) []*core.RegionInfo) error {
func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.RegionInfo) []*core.RegionInfo) (regionsNum int64, err error) {
ps, ok := s.(*coreStorage)
if !ok {
return s.LoadRegions(ctx, f)
Expand All @@ -136,12 +136,12 @@ func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.Regi
ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.regionLoaded {
if err := ps.regionStorage.LoadRegions(ctx, f); err != nil {
return err
if regionsNum, err = ps.regionStorage.LoadRegions(ctx, f); err != nil {
return 0, err
}
ps.regionLoaded = true
}
return nil
return
}

// LoadRegion loads one region from storage.
Expand All @@ -153,7 +153,7 @@ func (ps *coreStorage) LoadRegion(regionID uint64, region *metapb.Region) (ok bo
}

// LoadRegions loads all regions from storage to RegionsInfo.
func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error {
func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) (int64, error) {
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
return ps.regionStorage.LoadRegions(ctx, f)
}
Expand Down
40 changes: 28 additions & 12 deletions pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ func TestLoadRegions(t *testing.T) {

n := 10
regions := mustSaveRegions(re, storage, n)
re.NoError(storage.LoadRegions(context.Background(), cache.CheckAndPutRegion))
regionsNum, err := storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(n), regionsNum)

re.Equal(n, cache.GetTotalRegionCount())
for _, region := range cache.GetMetaRegions() {
Expand Down Expand Up @@ -253,7 +255,9 @@ func TestLoadRegionsToCache(t *testing.T) {

n := 10
regions := mustSaveRegions(re, storage, n)
re.NoError(TryLoadRegionsOnce(context.Background(), storage, cache.CheckAndPutRegion))
regionsNum, err := storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(n), regionsNum)

re.Equal(n, cache.GetTotalRegionCount())
for _, region := range cache.GetMetaRegions() {
Expand All @@ -262,7 +266,9 @@ func TestLoadRegionsToCache(t *testing.T) {

n = 20
mustSaveRegions(re, storage, n)
re.NoError(TryLoadRegionsOnce(context.Background(), storage, cache.CheckAndPutRegion))
regionsNum, err = TryLoadRegionsOnce(context.Background(), storage, cache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(n), regionsNum)
re.Equal(n, cache.GetTotalRegionCount())
}

Expand All @@ -274,7 +280,9 @@ func TestLoadRegionsExceedRangeLimit(t *testing.T) {

n := 1000
regions := mustSaveRegions(re, storage, n)
re.NoError(storage.LoadRegions(context.Background(), cache.CheckAndPutRegion))
regionsNum, err := storage.LoadRegions(context.Background(), cache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(n), regionsNum)
re.Equal(n, cache.GetTotalRegionCount())
for _, region := range cache.GetMetaRegions() {
re.Equal(regions[region.GetId()], region)
Expand All @@ -292,8 +300,12 @@ func TestTrySwitchRegionStorage(t *testing.T) {

TrySwitchRegionStorage(storage, false)
regions10 := mustSaveRegions(re, storage, 10)
re.NoError(defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion))
re.NoError(localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion))
regionsNum, err := defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(10), regionsNum)
regionsNum, err = localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(0), regionsNum)
re.Empty(localCache.GetMetaRegions())
re.Len(defaultCache.GetMetaRegions(), 10)
for _, region := range defaultCache.GetMetaRegions() {
Expand All @@ -302,8 +314,12 @@ func TestTrySwitchRegionStorage(t *testing.T) {

TrySwitchRegionStorage(storage, true)
regions20 := mustSaveRegions(re, storage, 20)
re.NoError(defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion))
re.NoError(localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion))
regionsNum, err = defaultStorage.LoadRegions(context.Background(), defaultCache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(10), regionsNum)
regionsNum, err = localStorage.LoadRegions(context.Background(), localCache.CheckAndPutRegion)
re.NoError(err)
re.Equal(int64(20), regionsNum)
re.Len(defaultCache.GetMetaRegions(), 10)
re.Len(localCache.GetMetaRegions(), 20)
for _, region := range defaultCache.GetMetaRegions() {
Expand Down Expand Up @@ -407,6 +423,7 @@ func saveRegions(lb *levelDBBackend, n int, ratio int) error {
}

func benchmarkLoadRegions(b *testing.B, n int, ratio int) {
re := require.New(b)
ctx := context.Background()
dir := b.TempDir()
lb, err := newLevelDBBackend(ctx, dir, nil)
Expand All @@ -426,10 +443,9 @@ func benchmarkLoadRegions(b *testing.B, n int, ratio int) {
}()

b.ResetTimer()
err = lb.LoadRegions(ctx, cluster.CheckAndPutRegion)
if err != nil {
b.Fatal(err)
}
regionsNum, err := lb.LoadRegions(context.Background(), cluster.CheckAndPutRegion)
re.Equal(int64(n), regionsNum)
re.NoError(err)
}

var volumes = []struct {
Expand Down
10 changes: 6 additions & 4 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
// used to load region from kv storage to cache storage.
bc := s.server.GetBasicCluster()
regionStorage := s.server.GetStorage()
log.Info("region syncer start load region")
log.Info("region syncer start load regions")
start := time.Now()
err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion)
log.Info("region syncer finished load region", zap.Duration("time-cost", time.Since(start)))
loadRegionsNum, err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion)
// FromStorage means this region's meta info might be stale.
bc.AtomicBatchAddStaleRegionCnt(loadRegionsNum)
log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start)), zap.Int64("regions-by-load", loadRegionsNum))
if err != nil {
log.Warn("failed to load regions", errs.ZapError(err))
}
Expand Down Expand Up @@ -195,7 +197,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
continue
}
// FromSync means this region's meta info might be stale.
if origin == nil || (origin != nil && origin.IsSourceFresh()) {
if origin == nil || origin.IsSourceFresh() {
bc.RegionsInfo.AtomicAddStaleRegionCnt()
}
_, saveKV, _, _ := regionGuide(region, origin)
Expand Down
6 changes: 5 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,12 +587,16 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
start = time.Now()

// used to load region from kv storage to cache storage.
if err := storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil {
loadRegionsNum, err := storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion)
if err != nil {
return nil, err
}
// FromStorage means this region's meta info might be stale.
c.core.AtomicBatchAddStaleRegionCnt(loadRegionsNum)
log.Info("load regions",
zap.Int("count", c.core.GetTotalRegionCount()),
zap.Duration("cost", time.Since(start)),
zap.Int64("regions-by-load", loadRegionsNum),
)
if !c.isAPIServiceMode {
for _, store := range c.GetStores() {
Expand Down
4 changes: 3 additions & 1 deletion tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,9 @@ func TestLoadClusterInfo(t *testing.T) {
for _, region := range regions {
re.NoError(testStorage.SaveRegion(region))
}
re.NoError(storage.TryLoadRegionsOnce(ctx, testStorage, raftCluster.GetBasicCluster().PutRegion))
regionsNum, err := storage.TryLoadRegionsOnce(ctx, testStorage, raftCluster.GetBasicCluster().PutRegion)
re.NoError(err)
re.Equal(int64(0), regionsNum)
re.Equal(n, raftCluster.GetTotalRegionCount())
}

Expand Down

0 comments on commit 67b4081

Please sign in to comment.