Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize heartbeat - async process statistics #7898

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Cluster interface {
GetLabelStats() *statistics.LabelStatistics
GetCoordinator() *schedule.Coordinator
GetRuleManager() *placement.RuleManager
GetBasicCluster() *core.BasicCluster
}

// HandleStatsAsync handles the flow asynchronously.
Expand Down Expand Up @@ -55,8 +56,10 @@ func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) {
}

// Collect collects the cluster information.
func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats bool) {
func Collect(c Cluster, region *core.RegionInfo, hasRegionStats bool) {
if hasRegionStats {
c.GetRegionStats().Observe(region, stores)
// get region again from root tree. make sure the observed region is the latest.
region = c.GetBasicCluster().GetRegion(region.GetID())
c.GetRegionStats().Observe(region, c.GetBasicCluster().GetRegionStores(region))
}
}
245 changes: 225 additions & 20 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math"
Expand All @@ -35,6 +36,8 @@
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/utils/ctxutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -711,20 +714,51 @@

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool)
type RegionGuideFunc func(ctx context.Context, region, origin *RegionInfo) (saveKV, saveCache, needSync bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
noLog := func(msg string, fields ...zap.Field) {}
debug, info := noLog, noLog
d, i := noLog, noLog
debug, info := d, i
if enableLog {
debug = log.Debug
info = log.Info
d = log.Debug
i = log.Info
debug, info = d, i
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
return func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
return func(ctx context.Context, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
taskRunner, ok := ctx.Value(ctxutil.TaskRunnerKey).(ratelimit.Runner)
limiter, _ := ctx.Value(ctxutil.LimiterKey).(*ratelimit.ConcurrencyLimiter)
// print log asynchronously
if ok {
debug = func(msg string, fields ...zap.Field) {
taskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "Log",
Limit: limiter,
},

Check warning on line 743 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L738-L743

Added lines #L738 - L743 were not covered by tests
func(ctx context.Context) {
d(msg, fields...)
},

Check warning on line 746 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L745-L746

Added lines #L745 - L746 were not covered by tests
)
}
info = func(msg string, fields ...zap.Field) {
taskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "Log",
Limit: limiter,
},
func(ctx context.Context) {
i(msg, fields...)
},
)
}
}
if origin == nil {
if log.GetLevel() <= zap.DebugLevel {
debug("insert new region",
Expand Down Expand Up @@ -789,7 +823,7 @@
}
if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) {
if log.GetLevel() <= zap.DebugLevel {
debug("down-peers changed", zap.Uint64("region-id", region.GetID()))
debug("down-peers changed", zap.Uint64("region-id", region.GetID()), zap.Reflect("before", origin.GetDownPeers()), zap.Reflect("after", region.GetDownPeers()))

Check warning on line 826 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L826

Added line #L826 was not covered by tests
}
saveCache, needSync = true, true
return
Expand Down Expand Up @@ -912,7 +946,7 @@
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
err := check(region, origin, ols)
err := check(region, origin, convertItemsToRegions(ols))
if err != nil {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
// return the state region to delete.
Expand All @@ -933,48 +967,102 @@
}

// PreCheckPutRegion checks if the region is valid to put.
func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) (*RegionInfo, []*regionItem, error) {
origin, overlaps := r.GetRelevantRegions(region, trace)
func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*RegionInfo, error) {
origin, overlaps := r.GetRelevantRegions(region)
err := check(region, origin, overlaps)
return origin, overlaps, err
}

func convertItemsToRegions(items []*regionItem) []*RegionInfo {
regions := make([]*RegionInfo, 0, len(items))
for _, item := range items {
regions = append(regions, item.RegionInfo)
}
return regions
}

// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) ([]*RegionInfo, error) {
func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx context.Context, region *RegionInfo) ([]*RegionInfo, error) {
tracer, ok := ctx.Value("tracer").(RegionHeartbeatProcessTracer)
if !ok {
tracer = NewNoopHeartbeatProcessTracer()
}
r.t.Lock()
var ols []*regionItem
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
trace.OnCheckOverlapsFinished()
err := check(region, origin, ols)
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
if err != nil {
r.t.Unlock()
trace.OnValidateRegionFinished()
tracer.OnValidateRegionFinished()

Check warning on line 1000 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1000

Added line #L1000 was not covered by tests
return nil, err
}
trace.OnValidateRegionFinished()
tracer.OnValidateRegionFinished()
origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
trace.OnSetRegionFinished()
tracer.OnSetRegionFinished()
r.UpdateSubTree(region, origin, overlaps, rangeChanged)
trace.OnUpdateSubTreeFinished()
tracer.OnUpdateSubTreeFinished()
return overlaps, nil
}

// CheckAndPutSupTree checks if the region is valid to put to the root, if valid then return error.
// Usually used with CheckAndPutSubTree together.
func (r *RegionsInfo) CheckAndPutSupTree(ctx context.Context, region *RegionInfo) ([]*RegionInfo, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (r *RegionsInfo) CheckAndPutSupTree(ctx context.Context, region *RegionInfo) ([]*RegionInfo, error) {
func (r *RegionsInfo) CheckAndPutSubTree(ctx context.Context, region *RegionInfo) ([]*RegionInfo, error) {

tracer, ok := ctx.Value(ctxutil.HeartbeatTracerKey).(RegionHeartbeatProcessTracer)
if !ok {
tracer = NewNoopHeartbeatProcessTracer()
}
r.t.Lock()
var ols []*regionItem
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
if err != nil {
r.t.Unlock()
tracer.OnValidateRegionFinished()
return nil, err
}
tracer.OnValidateRegionFinished()
_, overlaps, _ := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
tracer.OnSetRegionFinished()
return overlaps, nil
}

// CheckAndPutSubTree checks if the region is valid to put to the sub tree, if valid then return error.
// Usually used with CheckAndPutSupTree together.
func (r *RegionsInfo) CheckAndPutSubTree(ctx context.Context, region *RegionInfo) error {
// new region get from root tree again
var newRegion *RegionInfo
newRegion = r.GetRegion(region.GetID())
if newRegion == nil {
newRegion = region

Check warning on line 1046 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L1046

Added line #L1046 was not covered by tests
}
r.UpdateSubTreeOrderInsensitive(newRegion)
return nil
}

// GetRelevantRegions returns the relevant regions for a given region.
func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo, trace RegionHeartbeatProcessTracer) (origin *RegionInfo, overlaps []*regionItem) {
func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) {
r.t.RLock()
defer r.t.RUnlock()
origin = r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
overlaps = r.tree.overlaps(&regionItem{RegionInfo: region})
for _, item := range r.tree.overlaps(&regionItem{RegionInfo: region}) {
overlaps = append(overlaps, item.RegionInfo)
}
}
return
}

func check(region, origin *RegionInfo, overlaps []*regionItem) error {
func check(region, origin *RegionInfo, overlaps []*RegionInfo) error {
for _, item := range overlaps {
// PD ignores stale regions' heartbeats, unless it is recreated recently by unsafe recover operation.
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !region.isRegionRecreated() {
Expand Down Expand Up @@ -1043,7 +1131,6 @@
item = &regionItem{RegionInfo: region}
r.regions[region.GetID()] = item
}

var overlaps []*RegionInfo
if rangeChanged {
overlaps = r.tree.update(item, withOverlaps, ol...)
Expand Down Expand Up @@ -1129,6 +1216,99 @@
setPeers(r.pendingPeers, region.GetPendingPeers())
}

// UpdateSubTreeOrderInsensitive updates the subtree.
// It's can used to update the subtree concurrently.
// because it can use concurrently, check region version to make sure the order.
// 1. if the version is stale, drop this update.
// 2. if the version is same, then only some statistic info need to be updated. the order of update is not important.
//
// in another hand, the overlap regions need re-check, because the region tree and the subtree update is not atomic.
func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) {
var origin *RegionInfo
r.st.Lock()
defer r.st.Unlock()
originItem, ok := r.subRegions[region.GetID()]
if ok {
origin = originItem.RegionInfo
}
rangeChanged := true

if origin != nil {
re := region.GetRegionEpoch()
oe := origin.GetRegionEpoch()
isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm()
if (isTermBehind || re.GetVersion() < oe.GetVersion() || re.GetConfVer() < oe.GetConfVer()) && !region.isRegionRecreated() {
// Region meta is stale, skip.
return
}
rangeChanged = !origin.rangeEqualsTo(region)

if rangeChanged || !origin.peersEqualTo(region) {
// If the range or peers have changed, the sub regionTree needs to be cleaned up.
// TODO: Improve performance by deleting only the different peers.
r.removeRegionFromSubTreeLocked(origin)
} else {
// The region tree and the subtree update is not atomic and the region tree is updated first.
// If there are two thread needs to update region tree,
// t1: thread-A update region tree
// t2: thread-B: update region tree again
// t3: thread-B: update subtree
// t4: thread-A: update region subtree
// to keep region tree consistent with subtree, we need to drop this update.
if tree, ok := r.subRegions[region.GetID()]; ok {
r.updateSubTreeStat(origin, region)
tree.RegionInfo = region
}
return
}
}

if rangeChanged {
overlaps := r.getOverlapRegionFromSubTreeLocked(region)
for _, re := range overlaps {
r.removeRegionFromSubTreeLocked(re)
}
}

item := &regionItem{region}
r.subRegions[region.GetID()] = item
// It has been removed and all information needs to be updated again.
// Set peers then.
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) {
store, ok := peersMap[storeID]
if !ok {
store = newRegionTree()
peersMap[storeID] = store
}
store.update(item, false)
}

// Add to leaders and followers.
for _, peer := range region.GetVoters() {
storeID := peer.GetStoreId()
if peer.GetId() == region.leader.GetId() {
// Add leader peer to leaders.
setPeer(r.leaders, storeID, item)
} else {
// Add follower peer to followers.
setPeer(r.followers, storeID, item)
}
}

setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) {
for _, peer := range peers {
storeID := peer.GetStoreId()
setPeer(peersMap, storeID, item)
}
}
// Add to learners.
setPeers(r.learners, region.GetLearners())
// Add to witnesses.
setPeers(r.witnesses, region.GetWitnesses())
// Add to PendingPeers
setPeers(r.pendingPeers, region.GetPendingPeers())
}

func (r *RegionsInfo) updateSubTreeStat(origin *RegionInfo, region *RegionInfo) {
updatePeerStat := func(peersMap map[uint64]*regionTree, storeID uint64) {
if tree, ok := peersMap[storeID]; ok {
Expand Down Expand Up @@ -1214,6 +1394,31 @@
delete(r.subRegions, region.GetMeta().GetId())
}

func (r *RegionsInfo) getOverlapRegionFromSubTreeLocked(region *RegionInfo) []*RegionInfo {
it := &regionItem{RegionInfo: region}
overlaps := make([]*RegionInfo, 0)
overlapsMap := make(map[uint64]struct{})
collectFromItemSlice := func(peersMap map[uint64]*regionTree, storeID uint64) {
if tree, ok := peersMap[storeID]; ok {
items := tree.overlaps(it)
for _, item := range items {
if _, ok := overlapsMap[item.GetID()]; !ok {
overlapsMap[item.GetID()] = struct{}{}
overlaps = append(overlaps, item.RegionInfo)
}
}
}
}
for _, peer := range region.GetMeta().GetPeers() {
storeID := peer.GetStoreId()
collectFromItemSlice(r.leaders, storeID)
collectFromItemSlice(r.followers, storeID)
collectFromItemSlice(r.learners, storeID)
collectFromItemSlice(r.witnesses, storeID)
}
return overlaps
}

// RemoveRegionIfExist removes RegionInfo from regionTree and regionMap if exists.
func (r *RegionsInfo) RemoveRegionIfExist(id uint64) {
if region := r.GetRegion(id); region != nil {
Expand Down
7 changes: 4 additions & 3 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package core

import (
"context"
"crypto/rand"
"fmt"
"math"
Expand Down Expand Up @@ -363,7 +364,7 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, needSync := RegionGuide(regionA, regionB)
_, _, needSync := RegionGuide(context.TODO(), regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}
Expand Down Expand Up @@ -459,9 +460,9 @@ func TestSetRegionConcurrence(t *testing.T) {
regions := NewRegionsInfo()
region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b"))
go func() {
regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer())
regions.AtomicCheckAndPutRegion(context.TODO(), region)
}()
regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer())
regions.AtomicCheckAndPutRegion(context.TODO(), region)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/core/UpdateSubTree"))
}

Expand Down
Loading
Loading