Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: copy full backup to pitr storage #57716

Draft
wants to merge 38 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
a9861df
initial commit
YuJuncen Nov 25, 2024
f70831f
added basic implementation
YuJuncen Nov 13, 2024
addb8f7
initial commit
YuJuncen Nov 26, 2024
e00d5c1
added basic test toolkit
YuJuncen Nov 27, 2024
f91c7ca
added test cases
YuJuncen Nov 27, 2024
651b9b5
added force-flush to speed up testing
YuJuncen Dec 2, 2024
ec98b3e
added checksum table
YuJuncen Dec 4, 2024
2dfc800
make checksum work
YuJuncen Dec 4, 2024
c16d21b
adapt the new proto
YuJuncen Dec 6, 2024
d02c216
make it compile (but not work)
YuJuncen Dec 6, 2024
afad018
fix migration not uploaded
YuJuncen Dec 6, 2024
2dfdb30
try filter out full backup
YuJuncen Dec 6, 2024
9fc057e
persist progress when exit
YuJuncen Dec 9, 2024
a512330
initial try to hook at importer
YuJuncen Dec 9, 2024
a734f24
fix incorrect state
YuJuncen Dec 9, 2024
eac139e
shouldn't put finished when failed to commit
YuJuncen Dec 9, 2024
9a0659d
added more test cases
YuJuncen Dec 10, 2024
ed34ea5
make names more readable
YuJuncen Dec 10, 2024
6d464c3
reject restore if there is a task for full restore
YuJuncen Dec 10, 2024
6dd3946
added more test cases
YuJuncen Dec 12, 2024
547e192
try not allow user when local encryption enabled, add progress for SSTs
YuJuncen Dec 12, 2024
9e0dfd7
make a more friendly error message
YuJuncen Dec 13, 2024
8f00ef5
added number of SSTs
YuJuncen Dec 13, 2024
d14081a
make progress bar work
YuJuncen Dec 13, 2024
6702ce4
added a test case for encryption
YuJuncen Dec 16, 2024
aa48140
support printing extra full backups
YuJuncen Dec 16, 2024
f8a135f
return error when no BASE migration found
YuJuncen Dec 16, 2024
ad89115
Merge branch 'master' of https://github.com/pingcap/tidb into add-ext…
YuJuncen Dec 16, 2024
cdde94f
added some unit test for encryption
YuJuncen Dec 16, 2024
40b8640
use different key for testing
YuJuncen Dec 17, 2024
0360a8e
Merge branch 'master' of https://github.com/pingcap/tidb into add-ext…
YuJuncen Dec 18, 2024
a2b8b8e
make bazel_prepare
YuJuncen Dec 18, 2024
3e3c3da
added force-flush command
YuJuncen Dec 19, 2024
2c923d9
Merge branch 'master' of https://github.com/pingcap/tidb into add-ext…
YuJuncen Dec 20, 2024
7b95659
added two new ops
YuJuncen Dec 20, 2024
b8ef140
ignore err migration not found
YuJuncen Dec 20, 2024
c1d7800
fix ignoring error
YuJuncen Dec 20, 2024
5b262a4
fixed BRIE via SQL
YuJuncen Dec 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions br/cmd/br/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func newOperatorCommand() *cobra.Command {
cmd.AddCommand(newBase64ifyCommand())
cmd.AddCommand(newListMigrationsCommand())
cmd.AddCommand(newMigrateToCommand())
cmd.AddCommand(newForceFlushCommand())
cmd.AddCommand(newChecksumCommand())
return cmd
}

Expand Down Expand Up @@ -109,3 +111,43 @@ func newMigrateToCommand() *cobra.Command {
operator.DefineFlagsForMigrateToConfig(cmd.Flags())
return cmd
}

func newChecksumCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "checksum-as",
Short: "calculate the checksum with rewrite rules",
Long: "Calculate the checksum of the current cluster (specified by `-u`) " +
"with applying the rewrite rules generated from a backup (specified by `-s`). " +
"This can be used when you have the checksum of upstream elsewhere.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.ChecksumWithRewriteRulesConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.RunChecksumTable(ctx, tidbGlue, cfg)
},
}
task.DefineFilterFlags(cmd, []string{"!*.*"}, false)
operator.DefineFlagsForChecksumTableConfig(cmd.Flags())
return cmd
}

func newForceFlushCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "force-flush",
Short: "force a log backup task to flush",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.ForceFlushConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.RunForceFlush(ctx, &cfg)
},
}
operator.DefineFlagsForForceFlushConfig(cmd.Flags())
return cmd
}
3 changes: 3 additions & 0 deletions br/pkg/checkpoint/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"time"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/pdutil"
Expand Down Expand Up @@ -137,6 +138,8 @@ type CheckpointMetadataForSnapshotRestore struct {
UpstreamClusterID uint64 `json:"upstream-cluster-id"`
RestoredTS uint64 `json:"restored-ts"`
SchedulersConfig *pdutil.ClusterConfig `json:"schedulers-config"`

RestoreUUID uuid.UUID `json:"restore-uuid"`
}

func LoadCheckpointMetadataForSnapshotRestore(
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/checkpoint/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ import (
"go.uber.org/zap"
)

type hookedOnFlush struct {
checkpointStorage
cb func(ctx context.Context) error
}

type checkpointStorage interface {
flushCheckpointData(ctx context.Context, data []byte) error
flushCheckpointChecksum(ctx context.Context, data []byte) error
Expand Down
1 change: 1 addition & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
ErrEnvNotSpecified = errors.Normalize("environment variable not found", errors.RFCCodeText("BR:Common:ErrEnvNotSpecified"))
ErrUnsupportedOperation = errors.Normalize("the operation is not supported", errors.RFCCodeText("BR:Common:ErrUnsupportedOperation"))
ErrInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Common:ErrInvalidRange"))
ErrMigrationNotFound = errors.Normalize("no migrtion found", errors.RFCCodeText("BR:Common:ErrMigrationNotFound"))
ErrMigrationVersionNotSupported = errors.Normalize("the migration version isn't supported", errors.RFCCodeText("BR:Common:ErrMigrationVersionNotSupported"))

ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
Expand Down
20 changes: 20 additions & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package glue

import (
"context"
"sync/atomic"

"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
Expand Down Expand Up @@ -82,3 +83,22 @@ type Progress interface {
// called.
Close()
}

type CounterProgress struct {
Counter atomic.Int64
}

func (c *CounterProgress) Inc() {
c.Counter.Add(1)
}

func (c *CounterProgress) IncBy(cnt int64) {
c.Counter.Add(cnt)
}

func (c *CounterProgress) GetCurrent() int64 {
return c.Counter.Load()
}

func (c *CounterProgress) Close() {
}
4 changes: 4 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type Glue struct {
startDomainMu *sync.Mutex
}

func WrapSession(se sessiontypes.Session) glue.Session {
return &tidbSession{se: se}
}

type tidbSession struct {
se sessiontypes.Session
}
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"log_file_map.go",
"log_split_strategy.go",
"migration.go",
"ssts.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/restore/log_client",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -43,11 +44,13 @@ go_library(
"//pkg/kv",
"//pkg/meta",
"//pkg/meta/model",
"//pkg/tablecodec",
"//pkg/util",
"//pkg/util/codec",
"//pkg/util/redact",
"//pkg/util/sqlexec",
"//pkg/util/table-filter",
"@com_github_docker_go_units//:go-units",
"@com_github_fatih_color//:color",
"@com_github_gogo_protobuf//proto",
"@com_github_opentracing_opentracing_go//:opentracing-go",
Expand Down Expand Up @@ -90,7 +93,7 @@ go_test(
],
embed = [":log_client"],
flaky = True,
shard_count = 45,
shard_count = 50,
deps = [
"//br/pkg/errors",
"//br/pkg/glue",
Expand Down Expand Up @@ -119,6 +122,7 @@ go_test(
"//pkg/util/sqlexec",
"//pkg/util/table-filter",
"@com_github_docker_go_units//:go-units",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
Expand Down
103 changes: 91 additions & 12 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/docker/go-units"
"github.com/fatih/color"
"github.com/gogo/protobuf/proto"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backup "github.com/pingcap/kvproto/pkg/brpb"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/log"
Expand All @@ -40,6 +43,7 @@ import (
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/conn/util"
"github.com/pingcap/tidb/br/pkg/encryption"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
Expand Down Expand Up @@ -178,6 +182,7 @@ func NewSstRestoreManager(

type LogClient struct {
*LogFileManager

logRestoreManager *LogRestoreManager
sstRestoreManager *SstRestoreManager

Expand Down Expand Up @@ -209,6 +214,16 @@ type LogClient struct {

// checkpoint information for log restore
useCheckpoint bool

logFilesStat logFilesStatistic
restoreStat restoreStatistic
}

type restoreStatistic struct {
restoreSSTKVSize uint64
restoreSSTKVCount uint64
restoreSSTPhySize uint64
restoreSSTTakes uint64
}

// NewRestoreClient returns a new RestoreClient.
Expand Down Expand Up @@ -252,11 +267,12 @@ func (rc *LogClient) Close(ctx context.Context) {

func (rc *LogClient) RestoreCompactedSstFiles(
ctx context.Context,
compactionsIter iter.TryNextor[*backuppb.LogFileSubcompaction],
compactionsIter iter.TryNextor[SSTs],
rules map[int64]*restoreutils.RewriteRules,
importModeSwitcher *restore.ImportModeSwitcher,
onProgress func(int64),
) error {
begin := time.Now()
backupFileSets := make([]restore.BackupFileSet, 0, 8)
// Collect all items from the iterator in advance to avoid blocking during restoration.
// This approach ensures that we have all necessary data ready for processing,
Expand All @@ -267,14 +283,36 @@ func (rc *LogClient) RestoreCompactedSstFiles(
return r.Err
}
i := r.Item
rewriteRules, ok := rules[i.Meta.TableId]

tid := i.TableID()
if r, ok := i.(RewrittenSST); ok && r.RewrittenTo() > 0 {
tid = r.RewrittenTo()
}
rewriteRules, ok := rules[tid]
if !ok {
log.Warn("[Compacted SST Restore] Skipping excluded table during restore.", zap.Int64("table_id", i.Meta.TableId))
log.Warn("[Compacted SST Restore] Skipping excluded table during restore.", zap.Int64("table_id", i.TableID()))
continue
}

if r, ok := i.(RewrittenSST); ok {
rewritten := r.RewrittenTo()
if rewritten > 0 && rewritten != i.TableID() {
rewriteRules = rewriteRules.Clone()
if !rewriteRules.RewriteSourceTableID(rewritten, i.TableID()) {
return errors.Annotatef(
berrors.ErrUnknown,
"table rewritten from a table id (%d) to (%d) which doesn't exist in the stream",
rewritten,
i.TableID(),
)
}
log.Info("Rewritten rewrite rules.", zap.Stringer("rules", rewriteRules), zap.Int64("table_id", i.TableID()), zap.Int64("rewritten_to", rewritten))
}
}

set := restore.BackupFileSet{
TableID: i.Meta.TableId,
SSTFiles: i.SstOutputs,
TableID: i.TableID(),
SSTFiles: i.GetSSTs(),
RewriteRules: rewriteRules,
}
backupFileSets = append(backupFileSets, set)
Expand Down Expand Up @@ -311,7 +349,30 @@ func (rc *LogClient) RestoreCompactedSstFiles(
if err != nil {
return errors.Trace(err)
}
return rc.sstRestoreManager.restorer.WaitUntilFinish()
err = rc.sstRestoreManager.restorer.WaitUntilFinish()

for _, files := range backupFileSets {
for _, f := range files.SSTFiles {
log.Info("Collected file.", zap.Uint64("total_kv", f.TotalKvs), zap.Uint64("total_bytes", f.TotalBytes), zap.Uint64("size", f.Size_))
atomic.AddUint64(&rc.restoreStat.restoreSSTKVCount, f.TotalKvs)
atomic.AddUint64(&rc.restoreStat.restoreSSTKVSize, f.TotalBytes)
atomic.AddUint64(&rc.restoreStat.restoreSSTPhySize, f.Size_)
}
}
atomic.AddUint64(&rc.restoreStat.restoreSSTTakes, uint64(time.Since(begin)))
return err
}

func (rc *LogClient) RestoreSSTStatisticFields(pushTo *[]zapcore.Field) {
takes := time.Duration(rc.restoreStat.restoreSSTTakes)
fields := []zapcore.Field{
zap.Uint64("restore-sst-kv-count", rc.restoreStat.restoreSSTKVCount),
zap.Uint64("restore-sst-kv-size", rc.restoreStat.restoreSSTKVSize),
zap.Uint64("restore-sst-physical-size (after compression)", rc.restoreStat.restoreSSTPhySize),
zap.Duration("restore-sst-total-take", takes),
zap.String("average-speed (sst)", units.HumanSize(float64(rc.restoreStat.restoreSSTKVSize)/takes.Seconds())+"/s"),
}
*pushTo = append(*pushTo, fields...)
}

func (rc *LogClient) SetRawKVBatchClient(
Expand Down Expand Up @@ -516,13 +577,29 @@ func (rc *LogClient) InitCheckpointMetadataForLogRestore(
return gcRatio, nil
}

func (rc *LogClient) GetMigrations(ctx context.Context) ([]*backuppb.Migration, error) {
ext := stream.MigerationExtension(rc.storage)
type LockedMigrations struct {
Migs []*backup.Migration
ReadLock storage.RemoteLock
}

func (rc *LogClient) GetMigrations(ctx context.Context) (*LockedMigrations, error) {
ext := stream.MigrationExtension(rc.storage)
migs, err := ext.Load(ctx)
if err != nil {
return nil, errors.Trace(err)
}
return migs.ListAll(), nil

ms := migs.ListAll()
readLock, err := ext.GetReadLock(ctx, "restore stream")
if err != nil {
return nil, err
}

lms := &LockedMigrations{
Migs: ms,
ReadLock: readLock,
}
return lms, nil
}

func (rc *LogClient) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64, metadataDownloadBatchSize uint,
Expand All @@ -544,6 +621,8 @@ func (rc *LogClient) InstallLogFileManager(ctx context.Context, startTS, restore
if err != nil {
return err
}
rc.logFilesStat = logFilesStatistic{}
rc.LogFileManager.Stats = &rc.logFilesStat
return nil
}

Expand Down Expand Up @@ -1509,15 +1588,15 @@ func (rc *LogClient) UpdateSchemaVersion(ctx context.Context) error {
// It uses a region splitter to handle the splitting logic based on the provided rules and checkpoint sets.
func (rc *LogClient) WrapCompactedFilesIterWithSplitHelper(
ctx context.Context,
compactedIter iter.TryNextor[*backuppb.LogFileSubcompaction],
compactedIter iter.TryNextor[SSTs],
rules map[int64]*restoreutils.RewriteRules,
checkpointSets map[string]struct{},
updateStatsFn func(uint64, uint64),
splitSize uint64,
splitKeys int64,
) (iter.TryNextor[*backuppb.LogFileSubcompaction], error) {
) (iter.TryNextor[SSTs], error) {
client := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, 3)
wrapper := restore.PipelineRestorerWrapper[*backuppb.LogFileSubcompaction]{
wrapper := restore.PipelineRestorerWrapper[SSTs]{
PipelineRegionsSplitter: split.NewPipelineRegionsSplitter(client, splitSize, splitKeys),
}
strategy := NewCompactedFileSplitStrategy(rules, checkpointSets, updateStatsFn)
Expand Down
Loading
Loading