Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: optimize heartbeat process with concurrent runner - part 2 #8052

Merged
merged 5 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,162 @@
return overlaps, nil
}

// CheckAndPutRootTree checks if the region is valid to put to the root, if valid then return error.
// Usually used with CheckAndPutSubTree together.
func (r *RegionsInfo) CheckAndPutRootTree(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) {
tracer := ctx.Tracer
r.t.Lock()
var ols []*regionItem
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
if err != nil {
r.t.Unlock()
tracer.OnValidateRegionFinished()
return nil, err
}
tracer.OnValidateRegionFinished()
_, overlaps, _ := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
tracer.OnSetRegionFinished()
return overlaps, nil
}

// CheckAndPutSubTree checks if the region is valid to put to the sub tree, if valid then return error.
// Usually used with CheckAndPutRootTree together.
func (r *RegionsInfo) CheckAndPutSubTree(region *RegionInfo) error {
nolouch marked this conversation as resolved.
Show resolved Hide resolved
// new region get from root tree again
var newRegion *RegionInfo
newRegion = r.GetRegion(region.GetID())
if newRegion == nil {
newRegion = region

Check warning on line 1038 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1038

Added line #L1038 was not covered by tests
}
r.UpdateSubTreeOrderInsensitive(newRegion)
return nil
}

// UpdateSubTreeOrderInsensitive updates the subtree.
// It's can used to update the subtree concurrently.
// because it can use concurrently, check region version to make sure the order.
// 1. if the version is stale, drop this update.
// 2. if the version is same, then only some statistic info need to be updated.
// in this situation, the order of update is not important.
//
// in another hand, the overlap regions need re-check, because the region tree and the subtree update is not atomic.
func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) {
var origin *RegionInfo
r.st.Lock()
defer r.st.Unlock()
originItem, ok := r.subRegions[region.GetID()]
if ok {
origin = originItem.RegionInfo
}
rangeChanged := true

if origin != nil {
re := region.GetRegionEpoch()
oe := origin.GetRegionEpoch()
isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm()
if (isTermBehind || re.GetVersion() < oe.GetVersion() || re.GetConfVer() < oe.GetConfVer()) && !region.isRegionRecreated() {
// Region meta is stale, skip.
return
}
rangeChanged = !origin.rangeEqualsTo(region)

if rangeChanged || !origin.peersEqualTo(region) {
// If the range or peers have changed, the sub regionTree needs to be cleaned up.
// TODO: Improve performance by deleting only the different peers.
r.removeRegionFromSubTreeLocked(origin)
} else {
// The region tree and the subtree update is not atomic and the region tree is updated first.
// If there are two thread needs to update region tree,
// t1: thread-A update region tree
// t2: thread-B: update region tree again
// t3: thread-B: update subtree
// t4: thread-A: update region subtree
// to keep region tree consistent with subtree, we need to drop this update.
if tree, ok := r.subRegions[region.GetID()]; ok {
r.updateSubTreeStat(origin, region)
tree.RegionInfo = region
}
return
}
}

if rangeChanged {
overlaps := r.getOverlapRegionFromSubTreeLocked(region)
for _, re := range overlaps {
r.removeRegionFromSubTreeLocked(re)
}
}

item := &regionItem{region}
r.subRegions[region.GetID()] = item
// It has been removed and all information needs to be updated again.
// Set peers then.
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) {
store, ok := peersMap[storeID]
if !ok {
store = newRegionTree()
peersMap[storeID] = store
}
store.update(item, false)
}

// Add to leaders and followers.
for _, peer := range region.GetVoters() {
storeID := peer.GetStoreId()
if peer.GetId() == region.leader.GetId() {
// Add leader peer to leaders.
setPeer(r.leaders, storeID, item)
} else {
// Add follower peer to followers.
setPeer(r.followers, storeID, item)
}
}

setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) {
for _, peer := range peers {
storeID := peer.GetStoreId()
setPeer(peersMap, storeID, item)
}
}
// Add to learners.
setPeers(r.learners, region.GetLearners())
// Add to witnesses.
setPeers(r.witnesses, region.GetWitnesses())
// Add to PendingPeers
setPeers(r.pendingPeers, region.GetPendingPeers())
}

func (r *RegionsInfo) getOverlapRegionFromSubTreeLocked(region *RegionInfo) []*RegionInfo {
it := &regionItem{RegionInfo: region}
overlaps := make([]*RegionInfo, 0)
overlapsMap := make(map[uint64]struct{})
collectFromItemSlice := func(peersMap map[uint64]*regionTree, storeID uint64) {
if tree, ok := peersMap[storeID]; ok {
items := tree.overlaps(it)
for _, item := range items {
if _, ok := overlapsMap[item.GetID()]; !ok {
overlapsMap[item.GetID()] = struct{}{}
overlaps = append(overlaps, item.RegionInfo)
}
}
}
}
for _, peer := range region.GetMeta().GetPeers() {
storeID := peer.GetStoreId()
collectFromItemSlice(r.leaders, storeID)
collectFromItemSlice(r.followers, storeID)
collectFromItemSlice(r.learners, storeID)
collectFromItemSlice(r.witnesses, storeID)
Comment on lines +1172 to +1175
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a possibility that the following situation exists: I detected an overlap in the leaders tree and added item.RegionInfo, then, before I check other role trees, something like a leader transfer occurs. Will I not be able to get the latest item.RegionInfo because of this? Will there be any side effects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here checks hold the lock, which means the new changes have not updated the subtree yet. So I think it's ok, then will handle the new updates.

}
return overlaps
}

// GetRelevantRegions returns the relevant regions for a given region.
func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) {
r.t.RLock()
Expand Down
86 changes: 83 additions & 3 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,14 +789,15 @@ func randomBytes(n int) []byte {
return bytes
}

func newRegionInfoID(idAllocator id.Allocator) *RegionInfo {
func newRegionInfoIDRandom(idAllocator id.Allocator) *RegionInfo {
var (
peers []*metapb.Peer
leader *metapb.Peer
)
storeNum := 10
for i := 0; i < 3; i++ {
id, _ := idAllocator.Alloc()
p := &metapb.Peer{Id: id, StoreId: id}
p := &metapb.Peer{Id: id, StoreId: uint64(i%storeNum + 1)}
if i == 0 {
leader = p
}
Expand All @@ -811,6 +812,8 @@ func newRegionInfoID(idAllocator id.Allocator) *RegionInfo {
Peers: peers,
},
leader,
SetApproximateSize(10),
SetApproximateKeys(10),
)
}

Expand All @@ -819,7 +822,7 @@ func BenchmarkAddRegion(b *testing.B) {
idAllocator := mockid.NewIDAllocator()
var items []*RegionInfo
for i := 0; i < 10000000; i++ {
items = append(items, newRegionInfoID(idAllocator))
items = append(items, newRegionInfoIDRandom(idAllocator))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -858,3 +861,80 @@ func BenchmarkRegionFromHeartbeat(b *testing.B) {
RegionFromHeartbeat(regionReq)
}
}

func TestUpdateRegionEquivalence(t *testing.T) {
re := require.New(t)
regionsOld := NewRegionsInfo()
regionsNew := NewRegionsInfo()
storeNums := 5
items := generateTestRegions(1000, storeNums)

updateRegion := func(item *RegionInfo) {
// old way
ctx := ContextTODO()
regionsOld.AtomicCheckAndPutRegion(ctx, item)
// new way
ctx = ContextTODO()
regionsNew.CheckAndPutRootTree(ctx, item)
regionsNew.CheckAndPutSubTree(item)
}
checksEquivalence := func() {
re.Equal(regionsOld.GetRegionCount([]byte(""), []byte("")), regionsNew.GetRegionCount([]byte(""), []byte("")))
re.Equal(regionsOld.GetRegionSizeByRange([]byte(""), []byte("")), regionsNew.GetRegionSizeByRange([]byte(""), []byte("")))
checkRegions(re, regionsOld)
checkRegions(re, regionsNew)

for i := 1; i <= storeNums; i++ {
re.Equal(regionsOld.GetStoreRegionCount(uint64(i)), regionsNew.GetStoreRegionCount(uint64(i)))
re.Equal(regionsOld.GetStoreLeaderCount(uint64(i)), regionsNew.GetStoreLeaderCount(uint64(i)))
re.Equal(regionsOld.GetStorePendingPeerCount(uint64(i)), regionsNew.GetStorePendingPeerCount(uint64(i)))
re.Equal(regionsOld.GetStoreLearnerRegionSize(uint64(i)), regionsNew.GetStoreLearnerRegionSize(uint64(i)))
re.Equal(regionsOld.GetStoreRegionSize(uint64(i)), regionsNew.GetStoreRegionSize(uint64(i)))
re.Equal(regionsOld.GetStoreLeaderRegionSize(uint64(i)), regionsNew.GetStoreLeaderRegionSize(uint64(i)))
re.Equal(regionsOld.GetStoreFollowerRegionSize(uint64(i)), regionsNew.GetStoreFollowerRegionSize(uint64(i)))
}
}

// Add a region.
for _, item := range items {
updateRegion(item)
}
checksEquivalence()

// Merge regions.
itemA, itemB := items[10], items[11]
itemMergedAB := itemA.Clone(WithEndKey(itemB.GetEndKey()), WithIncVersion())
updateRegion(itemMergedAB)
checksEquivalence()

// Split
itemA = itemA.Clone(WithIncVersion(), WithIncVersion())
itemB = itemB.Clone(WithIncVersion(), WithIncVersion())
updateRegion(itemA)
updateRegion(itemB)
checksEquivalence()
}

func generateTestRegions(count int, storeNum int) []*RegionInfo {
var items []*RegionInfo
for i := 0; i < count; i++ {
peer1 := &metapb.Peer{StoreId: uint64(i%storeNum + 1), Id: uint64(i*storeNum + 1)}
peer2 := &metapb.Peer{StoreId: uint64((i+1)%storeNum + 1), Id: uint64(i*storeNum + 2)}
peer3 := &metapb.Peer{StoreId: uint64((i+2)%storeNum + 1), Id: uint64(i*storeNum + 3)}
if i%3 == 0 {
peer2.IsWitness = true
}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer1, peer2, peer3},
StartKey: []byte(fmt.Sprintf("%20d", i*10)),
EndKey: []byte(fmt.Sprintf("%20d", (i+1)*10)),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 100, Version: 100},
},
peer1,
SetApproximateKeys(10),
SetApproximateSize(10))
items = append(items, region)
}
return items
}
13 changes: 12 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,10 +632,21 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// However, it can't solve the race condition of concurrent heartbeats from the same region.

// Async task in next PR.
if overlaps, err = c.AtomicCheckAndPutRegion(ctx, region); err != nil {
if overlaps, err = c.CheckAndPutRootTree(ctx, region); err != nil {
tracer.OnSaveCacheFinished()
return err
}
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "UpdateSubTree",
Limit: ctx.Limiter,
},
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
Expand Down
50 changes: 50 additions & 0 deletions pkg/ratelimit/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ratelimit

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
RunnerTaskMaxWaitingDuration = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_max_waiting_duration_seconds",
Help: "The duration of tasks waiting in the runner.",
}, []string{"name"})
nolouch marked this conversation as resolved.
Show resolved Hide resolved

RunnerTaskPendingTasks = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_pending_tasks",
Help: "The number of pending tasks in the runner.",
}, []string{"name"})
RunnerTaskFailedTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_failed_tasks_total",
Help: "The number of failed tasks in the runner.",
}, []string{"name"})
)

func init() {
prometheus.MustRegister(RunnerTaskMaxWaitingDuration)
prometheus.MustRegister(RunnerTaskPendingTasks)
prometheus.MustRegister(RunnerTaskFailedTasks)
}
Loading
Loading