Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: optimize heartbeat process with concurrent runner - part 2 #8052

Merged
merged 5 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,24 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
}

// RegionHeartbeatStageName is the name of the stage of the region heartbeat.
const (
HandleStatsAsync = "HandleStatsAsync"
ObserveRegionStatsAsync = "ObserveRegionStatsAsync"
UpdateSubTree = "UpdateSubTree"
HandleOverlaps = "HandleOverlaps"
CollectRegionStatsAsync = "CollectRegionStatsAsync"
SaveRegionToKV = "SaveRegionToKV"
)

// ExtraTaskOpts returns the task options for the task.
func ExtraTaskOpts(ctx *MetaProcessContext, name string) ratelimit.TaskOpts {
return ratelimit.TaskOpts{
TaskName: name,
Limit: ctx.Limiter,
}
}

// RWLockStats is a read-write lock with statistics.
type RWLockStats struct {
syncutil.RWMutex
Expand Down Expand Up @@ -1004,6 +1022,161 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *R
return overlaps, nil
}

// CheckAndPutRootTree checks if the region is valid to put to the root, if valid then return error.
// Usually used with CheckAndPutSubTree together.
func (r *RegionsInfo) CheckAndPutRootTree(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) {
tracer := ctx.Tracer
r.t.Lock()
var ols []*regionItem
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
if err != nil {
r.t.Unlock()
tracer.OnValidateRegionFinished()
return nil, err
}
tracer.OnValidateRegionFinished()
_, overlaps, _ := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
tracer.OnSetRegionFinished()
return overlaps, nil
}

// CheckAndPutSubTree checks if the region is valid to put to the sub tree, if valid then return error.
// Usually used with CheckAndPutRootTree together.
func (r *RegionsInfo) CheckAndPutSubTree(region *RegionInfo) {
// new region get from root tree again
var newRegion *RegionInfo
newRegion = r.GetRegion(region.GetID())
if newRegion == nil {
newRegion = region
}
r.UpdateSubTreeOrderInsensitive(newRegion)
}

// UpdateSubTreeOrderInsensitive updates the subtree.
// It's can used to update the subtree concurrently.
// because it can use concurrently, check region version to make sure the order.
// 1. if the version is stale, drop this update.
// 2. if the version is same, then only some statistic info need to be updated.
// in this situation, the order of update is not important.
//
// in another hand, the overlap regions need re-check, because the region tree and the subtree update is not atomic.
func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) {
var origin *RegionInfo
r.st.Lock()
defer r.st.Unlock()
originItem, ok := r.subRegions[region.GetID()]
if ok {
origin = originItem.RegionInfo
}
rangeChanged := true

if origin != nil {
re := region.GetRegionEpoch()
oe := origin.GetRegionEpoch()
isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm()
if (isTermBehind || re.GetVersion() < oe.GetVersion() || re.GetConfVer() < oe.GetConfVer()) && !region.isRegionRecreated() {
// Region meta is stale, skip.
return
}
rangeChanged = !origin.rangeEqualsTo(region)

if rangeChanged || !origin.peersEqualTo(region) {
// If the range or peers have changed, the sub regionTree needs to be cleaned up.
// TODO: Improve performance by deleting only the different peers.
r.removeRegionFromSubTreeLocked(origin)
} else {
// The region tree and the subtree update is not atomic and the region tree is updated first.
// If there are two thread needs to update region tree,
// t1: thread-A update region tree
// t2: thread-B: update region tree again
// t3: thread-B: update subtree
// t4: thread-A: update region subtree
// to keep region tree consistent with subtree, we need to drop this update.
if tree, ok := r.subRegions[region.GetID()]; ok {
r.updateSubTreeStat(origin, region)
tree.RegionInfo = region
}
return
}
}

if rangeChanged {
overlaps := r.getOverlapRegionFromSubTreeLocked(region)
for _, re := range overlaps {
r.removeRegionFromSubTreeLocked(re)
}
}

item := &regionItem{region}
r.subRegions[region.GetID()] = item
// It has been removed and all information needs to be updated again.
// Set peers then.
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) {
store, ok := peersMap[storeID]
if !ok {
store = newRegionTree()
peersMap[storeID] = store
}
store.update(item, false)
}

// Add to leaders and followers.
for _, peer := range region.GetVoters() {
storeID := peer.GetStoreId()
if peer.GetId() == region.leader.GetId() {
// Add leader peer to leaders.
setPeer(r.leaders, storeID, item)
} else {
// Add follower peer to followers.
setPeer(r.followers, storeID, item)
}
}

setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) {
for _, peer := range peers {
storeID := peer.GetStoreId()
setPeer(peersMap, storeID, item)
}
}
// Add to learners.
setPeers(r.learners, region.GetLearners())
// Add to witnesses.
setPeers(r.witnesses, region.GetWitnesses())
// Add to PendingPeers
setPeers(r.pendingPeers, region.GetPendingPeers())
}

func (r *RegionsInfo) getOverlapRegionFromSubTreeLocked(region *RegionInfo) []*RegionInfo {
it := &regionItem{RegionInfo: region}
overlaps := make([]*RegionInfo, 0)
overlapsMap := make(map[uint64]struct{})
collectFromItemSlice := func(peersMap map[uint64]*regionTree, storeID uint64) {
if tree, ok := peersMap[storeID]; ok {
items := tree.overlaps(it)
for _, item := range items {
if _, ok := overlapsMap[item.GetID()]; !ok {
overlapsMap[item.GetID()] = struct{}{}
overlaps = append(overlaps, item.RegionInfo)
}
}
}
}
for _, peer := range region.GetMeta().GetPeers() {
storeID := peer.GetStoreId()
collectFromItemSlice(r.leaders, storeID)
collectFromItemSlice(r.followers, storeID)
collectFromItemSlice(r.learners, storeID)
collectFromItemSlice(r.witnesses, storeID)
Comment on lines +1172 to +1175
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a possibility that the following situation exists: I detected an overlap in the leaders tree and added item.RegionInfo, then, before I check other role trees, something like a leader transfer occurs. Will I not be able to get the latest item.RegionInfo because of this? Will there be any side effects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here checks hold the lock, which means the new changes have not updated the subtree yet. So I think it's ok, then will handle the new updates.

}
return overlaps
}

// GetRelevantRegions returns the relevant regions for a given region.
func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) {
r.t.RLock()
Expand Down
86 changes: 83 additions & 3 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,14 +789,15 @@ func randomBytes(n int) []byte {
return bytes
}

func newRegionInfoID(idAllocator id.Allocator) *RegionInfo {
func newRegionInfoIDRandom(idAllocator id.Allocator) *RegionInfo {
var (
peers []*metapb.Peer
leader *metapb.Peer
)
storeNum := 10
for i := 0; i < 3; i++ {
id, _ := idAllocator.Alloc()
p := &metapb.Peer{Id: id, StoreId: id}
p := &metapb.Peer{Id: id, StoreId: uint64(i%storeNum + 1)}
if i == 0 {
leader = p
}
Expand All @@ -811,6 +812,8 @@ func newRegionInfoID(idAllocator id.Allocator) *RegionInfo {
Peers: peers,
},
leader,
SetApproximateSize(10),
SetApproximateKeys(10),
)
}

Expand All @@ -819,7 +822,7 @@ func BenchmarkAddRegion(b *testing.B) {
idAllocator := mockid.NewIDAllocator()
var items []*RegionInfo
for i := 0; i < 10000000; i++ {
items = append(items, newRegionInfoID(idAllocator))
items = append(items, newRegionInfoIDRandom(idAllocator))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -858,3 +861,80 @@ func BenchmarkRegionFromHeartbeat(b *testing.B) {
RegionFromHeartbeat(regionReq)
}
}

func TestUpdateRegionEquivalence(t *testing.T) {
re := require.New(t)
regionsOld := NewRegionsInfo()
regionsNew := NewRegionsInfo()
storeNums := 5
items := generateTestRegions(1000, storeNums)

updateRegion := func(item *RegionInfo) {
// old way
ctx := ContextTODO()
regionsOld.AtomicCheckAndPutRegion(ctx, item)
// new way
ctx = ContextTODO()
regionsNew.CheckAndPutRootTree(ctx, item)
regionsNew.CheckAndPutSubTree(item)
}
checksEquivalence := func() {
re.Equal(regionsOld.GetRegionCount([]byte(""), []byte("")), regionsNew.GetRegionCount([]byte(""), []byte("")))
re.Equal(regionsOld.GetRegionSizeByRange([]byte(""), []byte("")), regionsNew.GetRegionSizeByRange([]byte(""), []byte("")))
checkRegions(re, regionsOld)
checkRegions(re, regionsNew)

for i := 1; i <= storeNums; i++ {
re.Equal(regionsOld.GetStoreRegionCount(uint64(i)), regionsNew.GetStoreRegionCount(uint64(i)))
re.Equal(regionsOld.GetStoreLeaderCount(uint64(i)), regionsNew.GetStoreLeaderCount(uint64(i)))
re.Equal(regionsOld.GetStorePendingPeerCount(uint64(i)), regionsNew.GetStorePendingPeerCount(uint64(i)))
re.Equal(regionsOld.GetStoreLearnerRegionSize(uint64(i)), regionsNew.GetStoreLearnerRegionSize(uint64(i)))
re.Equal(regionsOld.GetStoreRegionSize(uint64(i)), regionsNew.GetStoreRegionSize(uint64(i)))
re.Equal(regionsOld.GetStoreLeaderRegionSize(uint64(i)), regionsNew.GetStoreLeaderRegionSize(uint64(i)))
re.Equal(regionsOld.GetStoreFollowerRegionSize(uint64(i)), regionsNew.GetStoreFollowerRegionSize(uint64(i)))
}
}

// Add a region.
for _, item := range items {
updateRegion(item)
}
checksEquivalence()

// Merge regions.
itemA, itemB := items[10], items[11]
itemMergedAB := itemA.Clone(WithEndKey(itemB.GetEndKey()), WithIncVersion())
updateRegion(itemMergedAB)
checksEquivalence()

// Split
itemA = itemA.Clone(WithIncVersion(), WithIncVersion())
itemB = itemB.Clone(WithIncVersion(), WithIncVersion())
updateRegion(itemA)
updateRegion(itemB)
checksEquivalence()
}

func generateTestRegions(count int, storeNum int) []*RegionInfo {
var items []*RegionInfo
for i := 0; i < count; i++ {
peer1 := &metapb.Peer{StoreId: uint64(i%storeNum + 1), Id: uint64(i*storeNum + 1)}
peer2 := &metapb.Peer{StoreId: uint64((i+1)%storeNum + 1), Id: uint64(i*storeNum + 2)}
peer3 := &metapb.Peer{StoreId: uint64((i+2)%storeNum + 1), Id: uint64(i*storeNum + 3)}
if i%3 == 0 {
peer2.IsWitness = true
}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer1, peer2, peer3},
StartKey: []byte(fmt.Sprintf("%20d", i*10)),
EndKey: []byte(fmt.Sprintf("%20d", (i+1)*10)),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 100, Version: 100},
},
peer1,
SetApproximateKeys(10),
SetApproximateSize(10))
items = append(items, region)
}
return items
}
28 changes: 12 additions & 16 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c

ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "HandleStatsAsync",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.HandleStatsAsync),
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
Expand All @@ -610,10 +607,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "ObserveRegionStatsAsync",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.ObserveRegionStatsAsync),
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
Expand All @@ -632,16 +626,21 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// However, it can't solve the race condition of concurrent heartbeats from the same region.

// Async task in next PR.
if overlaps, err = c.AtomicCheckAndPutRegion(ctx, region); err != nil {
if overlaps, err = c.CheckAndPutRootTree(ctx, region); err != nil {
tracer.OnSaveCacheFinished()
return err
}
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "HandleOverlaps",
Limit: ctx.Limiter,
core.ExtraTaskOpts(ctx, core.UpdateSubTree),
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
ctx,
core.ExtraTaskOpts(ctx, core.HandleOverlaps),
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
Expand All @@ -651,10 +650,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// handle region stats
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "CollectRegionStatsAsync",
Limit: ctx.Limiter,
},
core.ExtraTaskOpts(ctx, core.CollectRegionStatsAsync),
func(_ context.Context) {
cluster.Collect(c, region, hasRegionStats)
},
Expand Down
Loading