Skip to content

Commit

Permalink
*: optimize heartbeat process with async runner
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Apr 10, 2024
1 parent b9240a0 commit de81774
Show file tree
Hide file tree
Showing 13 changed files with 522 additions and 134 deletions.
7 changes: 5 additions & 2 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Cluster interface {
GetLabelStats() *statistics.LabelStatistics
GetCoordinator() *schedule.Coordinator
GetRuleManager() *placement.RuleManager
GetBasicCluster() *core.BasicCluster
}

// HandleStatsAsync handles the flow asynchronously.
Expand Down Expand Up @@ -55,8 +56,10 @@ func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) {
}

// Collect collects the cluster information.
func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats bool) {
func Collect(c Cluster, region *core.RegionInfo, hasRegionStats bool) {
if hasRegionStats {
c.GetRegionStats().Observe(region, stores)
// get region again from root tree. make sure the observed region is the latest.
region = c.GetBasicCluster().GetRegion(region.GetID())
c.GetRegionStats().Observe(region, c.GetBasicCluster().GetRegionStores(region))
}
}
245 changes: 225 additions & 20 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math"
Expand All @@ -35,6 +36,8 @@ import (
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/utils/ctxutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -711,20 +714,51 @@ func (r *RegionInfo) isRegionRecreated() bool {

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool)
type RegionGuideFunc func(ctx context.Context, region, origin *RegionInfo) (saveKV, saveCache, needSync bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
noLog := func(msg string, fields ...zap.Field) {}
debug, info := noLog, noLog
d, i := noLog, noLog
debug, info := d, i
if enableLog {
debug = log.Debug
info = log.Info
d = log.Debug
i = log.Info
debug, info = d, i
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
return func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
return func(ctx context.Context, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
taskRunner, ok := ctx.Value(ctxutil.TaskRunnerKey).(ratelimit.Runner)
limiter, _ := ctx.Value(ctxutil.LimiterKey).(*ratelimit.ConcurrencyLimiter)
// print log asynchronously
if ok {
debug = func(msg string, fields ...zap.Field) {
taskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "Log",
Limit: limiter,
},
func(ctx context.Context) {
d(msg, fields...)
},
)
}
info = func(msg string, fields ...zap.Field) {
taskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "Log",
Limit: limiter,
},
func(ctx context.Context) {
i(msg, fields...)
},
)
}
}
if origin == nil {
if log.GetLevel() <= zap.DebugLevel {
debug("insert new region",
Expand Down Expand Up @@ -789,7 +823,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) {
if log.GetLevel() <= zap.DebugLevel {
debug("down-peers changed", zap.Uint64("region-id", region.GetID()))
debug("down-peers changed", zap.Uint64("region-id", region.GetID()), zap.Reflect("before", origin.GetDownPeers()), zap.Reflect("after", region.GetDownPeers()))
}
saveCache, needSync = true, true
return
Expand Down Expand Up @@ -912,7 +946,7 @@ func (r *RegionsInfo) CheckAndPutRegion(region *RegionInfo) []*RegionInfo {
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
err := check(region, origin, ols)
err := check(region, origin, convertItemsToRegions(ols))
if err != nil {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
// return the state region to delete.
Expand All @@ -933,48 +967,102 @@ func (r *RegionsInfo) PutRegion(region *RegionInfo) []*RegionInfo {
}

// PreCheckPutRegion checks if the region is valid to put.
func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) (*RegionInfo, []*regionItem, error) {
origin, overlaps := r.GetRelevantRegions(region, trace)
func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*RegionInfo, error) {
origin, overlaps := r.GetRelevantRegions(region)
err := check(region, origin, overlaps)
return origin, overlaps, err
}

func convertItemsToRegions(items []*regionItem) []*RegionInfo {
regions := make([]*RegionInfo, 0, len(items))
for _, item := range items {
regions = append(regions, item.RegionInfo)
}
return regions
}

// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) ([]*RegionInfo, error) {
func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx context.Context, region *RegionInfo) ([]*RegionInfo, error) {
tracer, ok := ctx.Value("tracer").(RegionHeartbeatProcessTracer)
if !ok {
tracer = NewNoopHeartbeatProcessTracer()
}
r.t.Lock()
var ols []*regionItem
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
trace.OnCheckOverlapsFinished()
err := check(region, origin, ols)
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
if err != nil {
r.t.Unlock()
trace.OnValidateRegionFinished()
tracer.OnValidateRegionFinished()
return nil, err
}
trace.OnValidateRegionFinished()
tracer.OnValidateRegionFinished()
origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
trace.OnSetRegionFinished()
tracer.OnSetRegionFinished()
r.UpdateSubTree(region, origin, overlaps, rangeChanged)
trace.OnUpdateSubTreeFinished()
tracer.OnUpdateSubTreeFinished()
return overlaps, nil
}

// CheckAndPutSuperTree checks if the region is valid to put to the root, if valid then return error.
// Usually used with CheckAndPutSubTree together.
func (r *RegionsInfo) CheckAndPutSuperTree(ctx context.Context, region *RegionInfo) ([]*RegionInfo, error) {
tracer, ok := ctx.Value(ctxutil.HeartbeatTracerKey).(RegionHeartbeatProcessTracer)
if !ok {
tracer = NewNoopHeartbeatProcessTracer()
}
r.t.Lock()
var ols []*regionItem
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
if err != nil {
r.t.Unlock()
tracer.OnValidateRegionFinished()
return nil, err
}
tracer.OnValidateRegionFinished()
_, overlaps, _ := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
tracer.OnSetRegionFinished()
return overlaps, nil
}

// CheckAndPutSubTree checks if the region is valid to put to the sub tree, if valid then return error.
// Usually used with CheckAndPutSuperTree together.
func (r *RegionsInfo) CheckAndPutSubTree(ctx context.Context, region *RegionInfo) error {
// new region get from root tree again
var newRegion *RegionInfo
newRegion = r.GetRegion(region.GetID())
if newRegion == nil {
newRegion = region
}
r.UpdateSubTreeOrderInsensitive(newRegion)
return nil
}

// GetRelevantRegions returns the relevant regions for a given region.
func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo, trace RegionHeartbeatProcessTracer) (origin *RegionInfo, overlaps []*regionItem) {
func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) {
r.t.RLock()
defer r.t.RUnlock()
origin = r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
overlaps = r.tree.overlaps(&regionItem{RegionInfo: region})
for _, item := range r.tree.overlaps(&regionItem{RegionInfo: region}) {
overlaps = append(overlaps, item.RegionInfo)
}
}
return
}

func check(region, origin *RegionInfo, overlaps []*regionItem) error {
func check(region, origin *RegionInfo, overlaps []*RegionInfo) error {
for _, item := range overlaps {
// PD ignores stale regions' heartbeats, unless it is recreated recently by unsafe recover operation.
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !region.isRegionRecreated() {
Expand Down Expand Up @@ -1043,7 +1131,6 @@ func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol
item = &regionItem{RegionInfo: region}
r.regions[region.GetID()] = item
}

var overlaps []*RegionInfo
if rangeChanged {
overlaps = r.tree.update(item, withOverlaps, ol...)
Expand Down Expand Up @@ -1129,6 +1216,99 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi
setPeers(r.pendingPeers, region.GetPendingPeers())
}

// UpdateSubTreeOrderInsensitive updates the subtree.
// It's can used to update the subtree concurrently.
// because it can use concurrently, check region version to make sure the order.
// 1. if the version is stale, drop this update.
// 2. if the version is same, then only some statistic info need to be updated. the order of update is not important.
//
// in another hand, the overlap regions need re-check, because the region tree and the subtree update is not atomic.
func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) {
var origin *RegionInfo
r.st.Lock()
defer r.st.Unlock()
originItem, ok := r.subRegions[region.GetID()]
if ok {
origin = originItem.RegionInfo
}
rangeChanged := true

if origin != nil {
re := region.GetRegionEpoch()
oe := origin.GetRegionEpoch()
isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm()
if (isTermBehind || re.GetVersion() < oe.GetVersion() || re.GetConfVer() < oe.GetConfVer()) && !region.isRegionRecreated() {
// Region meta is stale, skip.
return
}
rangeChanged = !origin.rangeEqualsTo(region)

if rangeChanged || !origin.peersEqualTo(region) {
// If the range or peers have changed, the sub regionTree needs to be cleaned up.
// TODO: Improve performance by deleting only the different peers.
r.removeRegionFromSubTreeLocked(origin)
} else {
// The region tree and the subtree update is not atomic and the region tree is updated first.
// If there are two thread needs to update region tree,
// t1: thread-A update region tree
// t2: thread-B: update region tree again
// t3: thread-B: update subtree
// t4: thread-A: update region subtree
// to keep region tree consistent with subtree, we need to drop this update.
if tree, ok := r.subRegions[region.GetID()]; ok {
r.updateSubTreeStat(origin, region)
tree.RegionInfo = region
}
return
}
}

if rangeChanged {
overlaps := r.getOverlapRegionFromSubTreeLocked(region)
for _, re := range overlaps {
r.removeRegionFromSubTreeLocked(re)
}
}

item := &regionItem{region}
r.subRegions[region.GetID()] = item
// It has been removed and all information needs to be updated again.
// Set peers then.
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) {
store, ok := peersMap[storeID]
if !ok {
store = newRegionTree()
peersMap[storeID] = store
}
store.update(item, false)
}

// Add to leaders and followers.
for _, peer := range region.GetVoters() {
storeID := peer.GetStoreId()
if peer.GetId() == region.leader.GetId() {
// Add leader peer to leaders.
setPeer(r.leaders, storeID, item)
} else {
// Add follower peer to followers.
setPeer(r.followers, storeID, item)
}
}

setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) {
for _, peer := range peers {
storeID := peer.GetStoreId()
setPeer(peersMap, storeID, item)
}
}
// Add to learners.
setPeers(r.learners, region.GetLearners())
// Add to witnesses.
setPeers(r.witnesses, region.GetWitnesses())
// Add to PendingPeers
setPeers(r.pendingPeers, region.GetPendingPeers())
}

func (r *RegionsInfo) updateSubTreeStat(origin *RegionInfo, region *RegionInfo) {
updatePeerStat := func(peersMap map[uint64]*regionTree, storeID uint64) {
if tree, ok := peersMap[storeID]; ok {
Expand Down Expand Up @@ -1214,6 +1394,31 @@ func (r *RegionsInfo) removeRegionFromSubTreeLocked(region *RegionInfo) {
delete(r.subRegions, region.GetMeta().GetId())
}

func (r *RegionsInfo) getOverlapRegionFromSubTreeLocked(region *RegionInfo) []*RegionInfo {
it := &regionItem{RegionInfo: region}
overlaps := make([]*RegionInfo, 0)
overlapsMap := make(map[uint64]struct{})
collectFromItemSlice := func(peersMap map[uint64]*regionTree, storeID uint64) {
if tree, ok := peersMap[storeID]; ok {
items := tree.overlaps(it)
for _, item := range items {
if _, ok := overlapsMap[item.GetID()]; !ok {
overlapsMap[item.GetID()] = struct{}{}
overlaps = append(overlaps, item.RegionInfo)
}
}
}
}
for _, peer := range region.GetMeta().GetPeers() {
storeID := peer.GetStoreId()
collectFromItemSlice(r.leaders, storeID)
collectFromItemSlice(r.followers, storeID)
collectFromItemSlice(r.learners, storeID)
collectFromItemSlice(r.witnesses, storeID)
}
return overlaps
}

// RemoveRegionIfExist removes RegionInfo from regionTree and regionMap if exists.
func (r *RegionsInfo) RemoveRegionIfExist(id uint64) {
if region := r.GetRegion(id); region != nil {
Expand Down
7 changes: 4 additions & 3 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package core

import (
"context"
"crypto/rand"
"fmt"
"math"
Expand Down Expand Up @@ -363,7 +364,7 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, needSync := RegionGuide(regionA, regionB)
_, _, needSync := RegionGuide(context.TODO(), regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}
Expand Down Expand Up @@ -459,9 +460,9 @@ func TestSetRegionConcurrence(t *testing.T) {
regions := NewRegionsInfo()
region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b"))
go func() {
regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer())
regions.AtomicCheckAndPutRegion(context.TODO(), region)
}()
regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer())
regions.AtomicCheckAndPutRegion(context.TODO(), region)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/core/UpdateSubTree"))
}

Expand Down
Loading

0 comments on commit de81774

Please sign in to comment.