Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: revert backup progress #53914

Merged
merged 9 commits into from
Jan 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
// MaxResolveLocksbackupOffSleep is the maximum sleep time for resolving locks.
// 10 minutes for every round.
MaxResolveLocksbackupOffSleepMs = 600000

IncompleteRangesUpdateInterval = time.Second * 15
)

// ClientMgr manages connections needed by backup.
Expand All @@ -69,6 +71,13 @@ type Checksum struct {
// ProgressUnit represents the unit of progress.
type ProgressUnit string

const (
// UnitRange represents the progress updated counter when a range finished.
UnitRange ProgressUnit = "range"
// UnitRegion represents the progress updated counter when a region finished.
UnitRegion ProgressUnit = "region"
)

type MainBackupLoop struct {
BackupSender

Expand All @@ -82,7 +91,7 @@ type MainBackupLoop struct {
ReplicaReadLabel map[string]string
StateNotifier chan BackupRetryPolicy

ProgressCallBack func()
ProgressCallBack func(ProgressUnit)
GetBackupClientCallBack func(ctx context.Context, storeID uint64, reset bool) (backuppb.BackupClient, error)
}

Expand Down Expand Up @@ -176,6 +185,9 @@ func (bc *Client) RunLoop(ctx context.Context, loop *MainBackupLoop) error {
round := uint64(0)
// reset grpc connection every round except key_locked error.
reset := true
// update incompleteRanges to advance the progress and the request.
incompleteRangesUpdateTicker := time.NewTicker(IncompleteRangesUpdateInterval)
defer incompleteRangesUpdateTicker.Stop()
mainLoop:
for {
round += 1
Expand Down Expand Up @@ -211,8 +223,7 @@ mainLoop:
mainCancel()
return ctx.Err()
default:
iter := loop.GlobalProgressTree.Iter()
inCompleteRanges = iter.GetIncompleteRanges()
inCompleteRanges = loop.GlobalProgressTree.GetIncompleteRanges()
if len(inCompleteRanges) == 0 {
// all range backuped
logutil.CL(ctx).Info("This round finished all backup ranges", zap.Uint64("round", round))
Expand Down Expand Up @@ -259,13 +270,21 @@ mainLoop:
}
// infinite loop to collect region backup response to global channel
loop.CollectStoreBackupsAsync(handleCtx, round, storeBackupResultChMap, globalBackupResultCh)
incompleteRangesUpdateTicker.Reset(IncompleteRangesUpdateInterval)
handleLoop:
for {
select {
case <-ctx.Done():
handleCancel()
mainCancel()
return ctx.Err()
case <-incompleteRangesUpdateTicker.C:
startUpdate := time.Now()
inCompleteRanges = loop.GlobalProgressTree.GetIncompleteRanges()
Leavrth marked this conversation as resolved.
Show resolved Hide resolved
loop.BackupReq.SubRanges = getBackupRanges(inCompleteRanges)
elapsed := time.Since(startUpdate)
log.Info("update the incomplete ranges", zap.Duration("take", elapsed))
incompleteRangesUpdateTicker.Reset(max(5*elapsed, IncompleteRangesUpdateInterval))
case storeBackupInfo := <-loop.StateNotifier:
if storeBackupInfo.All {
logutil.CL(mainCtx).Info("cluster state changed. restart store backups", zap.Uint64("round", round))
Expand Down Expand Up @@ -352,7 +371,7 @@ mainLoop:
if lock != nil {
allTxnLocks = append(allTxnLocks, lock)
}
loop.ProgressCallBack()
loop.ProgressCallBack(UnitRegion)
}
}
}
Expand Down Expand Up @@ -562,7 +581,7 @@ func (bc *Client) StartCheckpointRunner(
backupTS uint64,
ranges []rtree.Range,
safePointID string,
progressCallBack func(),
progressCallBack func(ProgressUnit),
) (err error) {
if bc.checkpointMeta == nil {
bc.checkpointMeta = &checkpoint.CheckpointMetadataForBackup{
Expand Down Expand Up @@ -603,31 +622,28 @@ func (bc *Client) getProgressRange(r rtree.Range) *rtree.ProgressRange {
if bc.checkpointMeta != nil && len(bc.checkpointMeta.CheckpointDataMap) > 0 {
rangeTree, exists := bc.checkpointMeta.CheckpointDataMap[groupKey]
if exists {
incomplete := rangeTree.GetIncompleteRange(r.StartKey, r.EndKey)
delete(bc.checkpointMeta.CheckpointDataMap, groupKey)
return &rtree.ProgressRange{
Res: rangeTree,
Incomplete: incomplete,
Origin: r,
GroupKey: groupKey,
Res: rangeTree,
Origin: r,
GroupKey: groupKey,
Complete: false,
}
}
}

// the origin range are not recorded in checkpoint
// return the default progress range
return &rtree.ProgressRange{
Res: rtree.NewRangeTree(),
Incomplete: []rtree.Range{
r,
},
Res: rtree.NewRangeTree(),
Origin: r,
GroupKey: groupKey,
Complete: false,
}
}

// LoadCheckpointRange loads the checkpoint(finished) sub-ranges of the current range, and calculate its incompleted sub-ranges.
func (bc *Client) loadCheckpointRanges(ctx context.Context, progressCallBack func()) (map[string]rtree.RangeTree, error) {
func (bc *Client) loadCheckpointRanges(ctx context.Context, progressCallBack func(ProgressUnit)) (map[string]rtree.RangeTree, error) {
rangeDataMap := make(map[string]rtree.RangeTree)

pastDureTime, err := checkpoint.WalkCheckpointFileForBackup(ctx, bc.storage, bc.cipher, func(groupKey string, rg checkpoint.BackupValueType) {
Expand All @@ -637,7 +653,7 @@ func (bc *Client) loadCheckpointRanges(ctx context.Context, progressCallBack fun
rangeDataMap[groupKey] = rangeTree
}
rangeTree.Put(rg.StartKey, rg.EndKey, rg.Files)
progressCallBack()
progressCallBack(UnitRegion)
})

// we should adjust start-time of the summary to `pastDureTime` earlier
Expand Down Expand Up @@ -1082,7 +1098,7 @@ func (bc *Client) BackupRanges(
concurrency uint,
replicaReadLabel map[string]string,
metaWriter *metautil.MetaWriter,
progressCallBack func(),
progressCallBack func(ProgressUnit),
) error {
log.Info("Backup Ranges Started", rtree.ZapRanges(ranges))
init := time.Now()
Expand All @@ -1101,6 +1117,7 @@ func (bc *Client) BackupRanges(
if err != nil {
return errors.Trace(err)
}
globalProgressTree.SetCallBack(func() { progressCallBack(UnitRange) })

stateNotifier := make(chan BackupRetryPolicy)
ObserveStoreChangesAsync(ctx, stateNotifier, bc.mgr.GetPDClient())
Expand Down
14 changes: 7 additions & 7 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func TestOnBackupResponse(t *testing.T) {
require.NoError(t, err)
require.Nil(t, lock)

incomplete := tree.Iter().GetIncompleteRanges()
incomplete := tree.GetIncompleteRanges()
require.Len(t, incomplete, 1)
require.Equal(t, []byte("b"), incomplete[0].StartKey)
require.Equal(t, []byte("c"), incomplete[0].EndKey)
Expand All @@ -398,7 +398,7 @@ func TestOnBackupResponse(t *testing.T) {
lock, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
require.NoError(t, err)
require.Nil(t, lock)
incomplete = tree.Iter().GetIncompleteRanges()
incomplete = tree.GetIncompleteRanges()
require.Len(t, incomplete, 0)

// case #5: failed case, key is locked
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestMainBackupLoop(t *testing.T) {
GlobalProgressTree: &tree,
ReplicaReadLabel: nil,
StateNotifier: ch,
ProgressCallBack: func() {},
ProgressCallBack: func(backup.ProgressUnit) {},
GetBackupClientCallBack: mockGetBackupClientCallBack,
}

Expand Down Expand Up @@ -548,7 +548,7 @@ func TestMainBackupLoop(t *testing.T) {
GlobalProgressTree: &tree,
ReplicaReadLabel: nil,
StateNotifier: ch,
ProgressCallBack: func() {},
ProgressCallBack: func(backup.ProgressUnit) {},
GetBackupClientCallBack: mockGetBackupClientCallBack,
}

Expand Down Expand Up @@ -600,7 +600,7 @@ func TestMainBackupLoop(t *testing.T) {
GlobalProgressTree: &tree,
ReplicaReadLabel: nil,
StateNotifier: ch,
ProgressCallBack: func() {},
ProgressCallBack: func(backup.ProgressUnit) {},
GetBackupClientCallBack: mockGetBackupClientCallBack,
}
go func() {
Expand Down Expand Up @@ -655,7 +655,7 @@ func TestMainBackupLoop(t *testing.T) {
GlobalProgressTree: &tree,
ReplicaReadLabel: nil,
StateNotifier: ch,
ProgressCallBack: func() {},
ProgressCallBack: func(backup.ProgressUnit) {},
GetBackupClientCallBack: mockGetBackupClientCallBack,
}
go func() {
Expand Down Expand Up @@ -712,7 +712,7 @@ func TestMainBackupLoop(t *testing.T) {
GlobalProgressTree: &tree,
ReplicaReadLabel: nil,
StateNotifier: ch,
ProgressCallBack: func() {},
ProgressCallBack: func(backup.ProgressUnit) {},
GetBackupClientCallBack: mockGetBackupClientCallBack,
}
go func() {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/rtree/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 5,
shard_count = 6,
deps = [
":rtree",
"//pkg/kv",
Expand Down
73 changes: 28 additions & 45 deletions br/pkg/rtree/rtree.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,11 @@ func (rangeTree *RangeTree) GetIncompleteRange(
}

type ProgressRange struct {
Res RangeTree
Incomplete []Range
Origin Range
GroupKey string
Res RangeTree
Origin Range
GroupKey string
// only for statistic
Complete bool
}

// Less impls btree.Item.
Expand All @@ -332,15 +333,24 @@ func (pr *ProgressRange) Less(than *ProgressRange) bool {
// All the progress ranges it sorted do not overlap.
type ProgressRangeTree struct {
*btree.BTreeG[*ProgressRange]

completeCallBack func()
}

// NewProgressRangeTree returns an empty range tree.
func NewProgressRangeTree() ProgressRangeTree {
return ProgressRangeTree{
BTreeG: btree.NewG[*ProgressRange](32, (*ProgressRange).Less),

completeCallBack: func() {},
}
}

// SetCallBack set the complete call back to update the progress.
func (rangeTree *ProgressRangeTree) SetCallBack(callback func()) {
rangeTree.completeCallBack = callback
}

// find is a helper function to find an item that contains the range.
func (rangeTree *ProgressRangeTree) find(pr *ProgressRange) *ProgressRange {
var ret *ProgressRange
Expand Down Expand Up @@ -392,50 +402,23 @@ func (rangeTree *ProgressRangeTree) FindContained(startKey, endKey []byte) (*Pro
return ret, nil
}

type incompleteRangesFetcherItem struct {
pr *ProgressRange
complete bool
}

type IncompleteRangesFetcher struct {
items []*incompleteRangesFetcherItem
left int
}

func (rangeTree *ProgressRangeTree) Iter() *IncompleteRangesFetcher {
items := make([]*incompleteRangesFetcherItem, 0, rangeTree.Len())
rangeTree.Ascend(func(item *ProgressRange) bool {
items = append(items, &incompleteRangesFetcherItem{
pr: item,
complete: false,
})
return true
})
return &IncompleteRangesFetcher{
items: items,
left: len(items),
}
}

func (iter *IncompleteRangesFetcher) GetIncompleteRanges() []Range {
func (rangeTree *ProgressRangeTree) GetIncompleteRanges() []Range {
// about 64 MB memory if there are 1 million ranges
incompleteRanges := make([]Range, 0, len(iter.items))
for _, item := range iter.items {
if item.complete {
continue
}

incomplete := item.pr.Res.GetIncompleteRange(item.pr.Origin.StartKey, item.pr.Origin.EndKey)
incompleteRanges := make([]Range, 0, rangeTree.Len())
rangeTree.Ascend(func(item *ProgressRange) bool {
// NOTE: maybe there is a late response whose range overlaps with an existing item, which
// may cause the complete range tree to become incomplete. Therefore, `item.Complete` is
// only for statistic.
incomplete := item.Res.GetIncompleteRange(item.Origin.StartKey, item.Origin.EndKey)
if len(incomplete) == 0 {
item.complete = true
iter.left -= 1
continue
if !item.Complete {
item.Complete = true
rangeTree.completeCallBack()
}
return true
}
incompleteRanges = append(incompleteRanges, incomplete...)
}
return true
})
return incompleteRanges
}

func (iter *IncompleteRangesFetcher) Len() int {
return iter.left
}
39 changes: 35 additions & 4 deletions br/pkg/rtree/rtree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ func TestProgressRangeTree(t *testing.T) {
require.NoError(t, prTree.Insert(buildProgressRange("cc", "dd")))
require.NoError(t, prTree.Insert(buildProgressRange("ee", "ff")))

prIter := prTree.Iter()
ranges := prIter.GetIncompleteRanges()
ranges := prTree.GetIncompleteRanges()
require.Equal(t, rtree.Range{StartKey: []byte("aa"), EndKey: []byte("cc")}, ranges[0])
require.Equal(t, rtree.Range{StartKey: []byte("cc"), EndKey: []byte("dd")}, ranges[1])
require.Equal(t, rtree.Range{StartKey: []byte("ee"), EndKey: []byte("ff")}, ranges[2])
Expand All @@ -253,7 +252,7 @@ func TestProgressRangeTree(t *testing.T) {
require.NoError(t, err)
pr.Res.Put([]byte("cc"), []byte("dd"), nil)

ranges = prIter.GetIncompleteRanges()
ranges = prTree.GetIncompleteRanges()
require.Equal(t, rtree.Range{StartKey: []byte("aa"), EndKey: []byte("aaa")}, ranges[0])
require.Equal(t, rtree.Range{StartKey: []byte("b"), EndKey: []byte("cc")}, ranges[1])
require.Equal(t, rtree.Range{StartKey: []byte("ee"), EndKey: []byte("ff")}, ranges[2])
Expand All @@ -270,6 +269,38 @@ func TestProgressRangeTree(t *testing.T) {
require.NoError(t, err)
pr.Res.Put([]byte("ee"), []byte("ff"), nil)

ranges = prIter.GetIncompleteRanges()
ranges = prTree.GetIncompleteRanges()
require.Equal(t, 0, len(ranges))
}

func TestProgreeRangeTreeCallBack(t *testing.T) {
prTree := rtree.NewProgressRangeTree()

require.NoError(t, prTree.Insert(buildProgressRange("a", "b")))
require.NoError(t, prTree.Insert(buildProgressRange("c", "d")))
require.NoError(t, prTree.Insert(buildProgressRange("e", "f")))

completeCount := 0
prTree.SetCallBack(func() { completeCount += 1 })

pr, err := prTree.FindContained([]byte("a"), []byte("b"))
require.NoError(t, err)
pr.Res.Put([]byte("a"), []byte("b"), nil)
ranges := prTree.GetIncompleteRanges()
require.Equal(t, completeCount, 1)
require.Equal(t, rtree.Range{StartKey: []byte("c"), EndKey: []byte("d")}, ranges[0])
require.Equal(t, rtree.Range{StartKey: []byte("e"), EndKey: []byte("f")}, ranges[1])

pr.Res.Put([]byte("a"), []byte("aa"), nil)
ranges = prTree.GetIncompleteRanges()
require.Equal(t, completeCount, 1)
require.Equal(t, rtree.Range{StartKey: []byte("aa"), EndKey: []byte("b")}, ranges[0])
require.Equal(t, rtree.Range{StartKey: []byte("c"), EndKey: []byte("d")}, ranges[1])
require.Equal(t, rtree.Range{StartKey: []byte("e"), EndKey: []byte("f")}, ranges[2])

pr.Res.Put([]byte("a"), []byte("b"), nil)
ranges = prTree.GetIncompleteRanges()
require.Equal(t, completeCount, 1)
require.Equal(t, rtree.Range{StartKey: []byte("c"), EndKey: []byte("d")}, ranges[0])
require.Equal(t, rtree.Range{StartKey: []byte("e"), EndKey: []byte("f")}, ranges[1])
}
Loading
Loading