Skip to content

Commit

Permalink
*: optimize heartbeat process with concurrent runner - part 2 (#8052)
Browse files Browse the repository at this point in the history
ref #7897

Optimize heartbeat process
- Split the statics updates on the subtree

Signed-off-by: nolouch <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
nolouch and ti-chi-bot[bot] committed Apr 24, 2024
1 parent 1e65f9d commit 141186e
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 41 deletions.
173 changes: 173 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,24 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
}

// RegionHeartbeatStageName is the name of the stage of the region heartbeat.
const (
HandleStatsAsync = "HandleStatsAsync"
ObserveRegionStatsAsync = "ObserveRegionStatsAsync"
UpdateSubTree = "UpdateSubTree"
HandleOverlaps = "HandleOverlaps"
CollectRegionStatsAsync = "CollectRegionStatsAsync"
SaveRegionToKV = "SaveRegionToKV"
)

// ExtraTaskOpts returns the task options for the task.
func ExtraTaskOpts(ctx *MetaProcessContext, name string) ratelimit.TaskOpts {
return ratelimit.TaskOpts{
TaskName: name,
Limit: ctx.Limiter,
}
}

// RWLockStats is a read-write lock with statistics.
type RWLockStats struct {
syncutil.RWMutex
Expand Down Expand Up @@ -1004,6 +1022,161 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *R
return overlaps, nil
}

// CheckAndPutRootTree checks if the region is valid to put to the root, if valid then return error.
// Usually used with CheckAndPutSubTree together.
func (r *RegionsInfo) CheckAndPutRootTree(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) {
tracer := ctx.Tracer
r.t.Lock()
var ols []*regionItem
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
if err != nil {
r.t.Unlock()
tracer.OnValidateRegionFinished()
return nil, err
}
tracer.OnValidateRegionFinished()
_, overlaps, _ := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
tracer.OnSetRegionFinished()
return overlaps, nil
}

// CheckAndPutSubTree checks if the region is valid to put to the sub tree, if valid then return error.
// Usually used with CheckAndPutRootTree together.
func (r *RegionsInfo) CheckAndPutSubTree(region *RegionInfo) {
// new region get from root tree again
var newRegion *RegionInfo
newRegion = r.GetRegion(region.GetID())
if newRegion == nil {
newRegion = region
}
r.UpdateSubTreeOrderInsensitive(newRegion)
}

// UpdateSubTreeOrderInsensitive updates the subtree.
// It's can used to update the subtree concurrently.
// because it can use concurrently, check region version to make sure the order.
// 1. if the version is stale, drop this update.
// 2. if the version is same, then only some statistic info need to be updated.
// in this situation, the order of update is not important.
//
// in another hand, the overlap regions need re-check, because the region tree and the subtree update is not atomic.
func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) {
var origin *RegionInfo
r.st.Lock()
defer r.st.Unlock()
originItem, ok := r.subRegions[region.GetID()]
if ok {
origin = originItem.RegionInfo
}
rangeChanged := true

if origin != nil {
re := region.GetRegionEpoch()
oe := origin.GetRegionEpoch()
isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm()
if (isTermBehind || re.GetVersion() < oe.GetVersion() || re.GetConfVer() < oe.GetConfVer()) && !region.isRegionRecreated() {
// Region meta is stale, skip.
return
}
rangeChanged = !origin.rangeEqualsTo(region)

if rangeChanged || !origin.peersEqualTo(region) {
// If the range or peers have changed, the sub regionTree needs to be cleaned up.
// TODO: Improve performance by deleting only the different peers.
r.removeRegionFromSubTreeLocked(origin)
} else {
// The region tree and the subtree update is not atomic and the region tree is updated first.
// If there are two thread needs to update region tree,
// t1: thread-A update region tree
// t2: thread-B: update region tree again
// t3: thread-B: update subtree
// t4: thread-A: update region subtree
// to keep region tree consistent with subtree, we need to drop this update.
if tree, ok := r.subRegions[region.GetID()]; ok {
r.updateSubTreeStat(origin, region)
tree.RegionInfo = region
}
return
}
}

if rangeChanged {
overlaps := r.getOverlapRegionFromSubTreeLocked(region)
for _, re := range overlaps {
r.removeRegionFromSubTreeLocked(re)
}
}

item := &regionItem{region}
r.subRegions[region.GetID()] = item
// It has been removed and all information needs to be updated again.
// Set peers then.
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) {
store, ok := peersMap[storeID]
if !ok {
store = newRegionTree()
peersMap[storeID] = store
}
store.update(item, false)
}

// Add to leaders and followers.
for _, peer := range region.GetVoters() {
storeID := peer.GetStoreId()
if peer.GetId() == region.leader.GetId() {
// Add leader peer to leaders.
setPeer(r.leaders, storeID, item)
} else {
// Add follower peer to followers.
setPeer(r.followers, storeID, item)
}
}

setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) {
for _, peer := range peers {
storeID := peer.GetStoreId()
setPeer(peersMap, storeID, item)
}
}
// Add to learners.
setPeers(r.learners, region.GetLearners())
// Add to witnesses.
setPeers(r.witnesses, region.GetWitnesses())
// Add to PendingPeers
setPeers(r.pendingPeers, region.GetPendingPeers())
}

func (r *RegionsInfo) getOverlapRegionFromSubTreeLocked(region *RegionInfo) []*RegionInfo {
it := &regionItem{RegionInfo: region}
overlaps := make([]*RegionInfo, 0)
overlapsMap := make(map[uint64]struct{})
collectFromItemSlice := func(peersMap map[uint64]*regionTree, storeID uint64) {
if tree, ok := peersMap[storeID]; ok {
items := tree.overlaps(it)
for _, item := range items {
if _, ok := overlapsMap[item.GetID()]; !ok {
overlapsMap[item.GetID()] = struct{}{}
overlaps = append(overlaps, item.RegionInfo)
}
}
}
}
for _, peer := range region.GetMeta().GetPeers() {
storeID := peer.GetStoreId()
collectFromItemSlice(r.leaders, storeID)
collectFromItemSlice(r.followers, storeID)
collectFromItemSlice(r.learners, storeID)
collectFromItemSlice(r.witnesses, storeID)
}
return overlaps
}

// GetRelevantRegions returns the relevant regions for a given region.
func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) {
r.t.RLock()
Expand Down
86 changes: 83 additions & 3 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,14 +789,15 @@ func randomBytes(n int) []byte {
return bytes
}

func newRegionInfoID(idAllocator id.Allocator) *RegionInfo {
func newRegionInfoIDRandom(idAllocator id.Allocator) *RegionInfo {
var (
peers []*metapb.Peer
leader *metapb.Peer
)
storeNum := 10
for i := 0; i < 3; i++ {
id, _ := idAllocator.Alloc()
p := &metapb.Peer{Id: id, StoreId: id}
p := &metapb.Peer{Id: id, StoreId: uint64(i%storeNum + 1)}
if i == 0 {
leader = p
}
Expand All @@ -811,6 +812,8 @@ func newRegionInfoID(idAllocator id.Allocator) *RegionInfo {
Peers: peers,
},
leader,
SetApproximateSize(10),
SetApproximateKeys(10),
)
}

Expand All @@ -819,7 +822,7 @@ func BenchmarkAddRegion(b *testing.B) {
idAllocator := mockid.NewIDAllocator()
var items []*RegionInfo
for i := 0; i < 10000000; i++ {
items = append(items, newRegionInfoID(idAllocator))
items = append(items, newRegionInfoIDRandom(idAllocator))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -858,3 +861,80 @@ func BenchmarkRegionFromHeartbeat(b *testing.B) {
RegionFromHeartbeat(regionReq)
}
}

func TestUpdateRegionEquivalence(t *testing.T) {
re := require.New(t)
regionsOld := NewRegionsInfo()
regionsNew := NewRegionsInfo()
storeNums := 5
items := generateTestRegions(1000, storeNums)

updateRegion := func(item *RegionInfo) {
// old way
ctx := ContextTODO()
regionsOld.AtomicCheckAndPutRegion(ctx, item)
// new way
ctx = ContextTODO()
regionsNew.CheckAndPutRootTree(ctx, item)
regionsNew.CheckAndPutSubTree(item)
}
checksEquivalence := func() {
re.Equal(regionsOld.GetRegionCount([]byte(""), []byte("")), regionsNew.GetRegionCount([]byte(""), []byte("")))
re.Equal(regionsOld.GetRegionSizeByRange([]byte(""), []byte("")), regionsNew.GetRegionSizeByRange([]byte(""), []byte("")))
checkRegions(re, regionsOld)
checkRegions(re, regionsNew)

for i := 1; i <= storeNums; i++ {
re.Equal(regionsOld.GetStoreRegionCount(uint64(i)), regionsNew.GetStoreRegionCount(uint64(i)))
re.Equal(regionsOld.GetStoreLeaderCount(uint64(i)), regionsNew.GetStoreLeaderCount(uint64(i)))
re.Equal(regionsOld.GetStorePendingPeerCount(uint64(i)), regionsNew.GetStorePendingPeerCount(uint64(i)))
re.Equal(regionsOld.GetStoreLearnerRegionSize(uint64(i)), regionsNew.GetStoreLearnerRegionSize(uint64(i)))
re.Equal(regionsOld.GetStoreRegionSize(uint64(i)), regionsNew.GetStoreRegionSize(uint64(i)))
re.Equal(regionsOld.GetStoreLeaderRegionSize(uint64(i)), regionsNew.GetStoreLeaderRegionSize(uint64(i)))
re.Equal(regionsOld.GetStoreFollowerRegionSize(uint64(i)), regionsNew.GetStoreFollowerRegionSize(uint64(i)))
}
}

// Add a region.
for _, item := range items {
updateRegion(item)
}
checksEquivalence()

// Merge regions.
itemA, itemB := items[10], items[11]
itemMergedAB := itemA.Clone(WithEndKey(itemB.GetEndKey()), WithIncVersion())
updateRegion(itemMergedAB)
checksEquivalence()

// Split
itemA = itemA.Clone(WithIncVersion(), WithIncVersion())
itemB = itemB.Clone(WithIncVersion(), WithIncVersion())
updateRegion(itemA)
updateRegion(itemB)
checksEquivalence()
}

func generateTestRegions(count int, storeNum int) []*RegionInfo {
var items []*RegionInfo
for i := 0; i < count; i++ {
peer1 := &metapb.Peer{StoreId: uint64(i%storeNum + 1), Id: uint64(i*storeNum + 1)}
peer2 := &metapb.Peer{StoreId: uint64((i+1)%storeNum + 1), Id: uint64(i*storeNum + 2)}
peer3 := &metapb.Peer{StoreId: uint64((i+2)%storeNum + 1), Id: uint64(i*storeNum + 3)}
if i%3 == 0 {
peer2.IsWitness = true
}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer1, peer2, peer3},
StartKey: []byte(fmt.Sprintf("%20d", i*10)),
EndKey: []byte(fmt.Sprintf("%20d", (i+1)*10)),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 100, Version: 100},
},
peer1,
SetApproximateKeys(10),
SetApproximateSize(10))
items = append(items, region)
}
return items
}
28 changes: 12 additions & 16 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c

ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "HandleStatsAsync",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.HandleStatsAsync),
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
Expand All @@ -610,10 +607,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "ObserveRegionStatsAsync",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.ObserveRegionStatsAsync),
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
Expand All @@ -632,16 +626,21 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// However, it can't solve the race condition of concurrent heartbeats from the same region.

// Async task in next PR.
if overlaps, err = c.AtomicCheckAndPutRegion(ctx, region); err != nil {
if overlaps, err = c.CheckAndPutRootTree(ctx, region); err != nil {
tracer.OnSaveCacheFinished()
return err
}
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "HandleOverlaps",
Limit: ctx.Limiter,
core.ExtraTaskOpts(ctx, core.UpdateSubTree),
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
ctx,
core.ExtraTaskOpts(ctx, core.HandleOverlaps),
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
Expand All @@ -651,10 +650,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// handle region stats
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "CollectRegionStatsAsync",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.CollectRegionStatsAsync),
func(_ context.Context) {
cluster.Collect(c, region, hasRegionStats)
},
Expand Down
Loading

0 comments on commit 141186e

Please sign in to comment.