diff --git a/br/pkg/checkpoint/log_restore.go b/br/pkg/checkpoint/log_restore.go index 2afcffe36d8d5..e78e35fe6cc85 100644 --- a/br/pkg/checkpoint/log_restore.go +++ b/br/pkg/checkpoint/log_restore.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/pkg/domain" - "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/util/sqlexec" @@ -123,13 +122,8 @@ func StartCheckpointLogRestoreRunnerForTest( func StartCheckpointRunnerForLogRestore( ctx context.Context, - g glue.Glue, - store kv.Storage, + session glue.Session, ) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error) { - session, err := g.CreateSession(store) - if err != nil { - return nil, errors.Trace(err) - } runner := newCheckpointRunner[LogRestoreKeyType, LogRestoreValueType]( newTableCheckpointStorage(session, LogRestoreCheckpointDatabaseName), nil, valueMarshalerForLogRestore) diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index 99924723d035b..b1c96d90c2784 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -94,7 +94,8 @@ func NewLogRestoreManager( ctx context.Context, fileImporter *LogFileImporter, poolSize uint, - createCheckpointSessionFn func() (glue.Session, error), + createSessionFn func() (glue.Session, error), + useCheckpoint bool, ) (*LogRestoreManager, error) { // for compacted reason, user only set --concurrency for log file restore speed. log.Info("log restore worker pool", zap.Uint("size", poolSize)) @@ -102,12 +103,12 @@ func NewLogRestoreManager( fileImporter: fileImporter, workerPool: tidbutil.NewWorkerPool(poolSize, "log manager worker pool"), } - se, err := createCheckpointSessionFn() - if err != nil { - return nil, errors.Trace(err) - } - if se != nil { + if useCheckpoint { + se, err := createSessionFn() + if err != nil { + return nil, errors.Trace(err) + } l.checkpointRunner, err = checkpoint.StartCheckpointRunnerForLogRestore(ctx, se) if err != nil { return nil, errors.Trace(err) @@ -151,7 +152,8 @@ func NewSstRestoreManager( snapFileImporter *snapclient.SnapFileImporter, concurrencyPerStore uint, storeCount uint, - createCheckpointSessionFn func() (glue.Session, error), + createSessionFn func() (glue.Session, error), + useCheckpoint bool, ) (*SstRestoreManager, error) { // This poolSize is similar to full restore, as both workflows are comparable. // The poolSize should be greater than concurrencyPerStore multiplied by the number of stores. @@ -162,11 +164,11 @@ func NewSstRestoreManager( s := &SstRestoreManager{ workerPool: tidbutil.NewWorkerPool(poolSize, "log manager worker pool"), } - se, err := createCheckpointSessionFn() - if err != nil { - return nil, errors.Trace(err) - } - if se != nil { + if useCheckpoint { + se, err := createSessionFn() + if err != nil { + return nil, errors.Trace(err) + } checkpointRunner, err := checkpoint.StartCheckpointRunnerForRestore(ctx, se) if err != nil { return nil, errors.Trace(err) @@ -233,23 +235,26 @@ func NewLogClient( // Close a client. func (rc *LogClient) Close(ctx context.Context) { - defer func() { - if rc.logRestoreManager != nil { - rc.logRestoreManager.Close(ctx) - } - if rc.sstRestoreManager != nil { - rc.sstRestoreManager.Close(ctx) - } - }() - // close the connection, and it must be succeeded when in SQL mode. if rc.unsafeSession != nil { rc.unsafeSession.Close() } + if rc.LogFileManager != nil { + rc.LogFileManager.Close() + } + if rc.rawKVClient != nil { rc.rawKVClient.Close() } + + if rc.logRestoreManager != nil { + rc.logRestoreManager.Close(ctx) + } + + if rc.sstRestoreManager != nil { + rc.sstRestoreManager.Close(ctx) + } log.Info("Log client closed") } @@ -394,6 +399,7 @@ func (rc *LogClient) InitClients( ctx context.Context, backend *backuppb.StorageBackend, createSessionFn func() (glue.Session, error), + useCheckpoint bool, concurrency uint, concurrencyPerStore uint, ) error { @@ -410,6 +416,7 @@ func (rc *LogClient) InitClients( NewLogFileImporter(metaClient, importCli, backend), concurrency, createSessionFn, + useCheckpoint, ) if err != nil { return errors.Trace(err) @@ -435,6 +442,7 @@ func (rc *LogClient) InitClients( concurrencyPerStore, uint(len(stores)), createSessionFn, + useCheckpoint, ) return errors.Trace(err) } diff --git a/br/pkg/restore/log_client/log_file_manager.go b/br/pkg/restore/log_client/log_file_manager.go index 86b58512b7482..bd33ffc9643fe 100644 --- a/br/pkg/restore/log_client/log_file_manager.go +++ b/br/pkg/restore/log_client/log_file_manager.go @@ -106,8 +106,8 @@ type LogFileManager struct { storage storage.ExternalStorage helper streamMetadataHelper - withMigraionBuilder *WithMigrationsBuilder - withMigrations *WithMigrations + withMigrationBuilder *WithMigrationsBuilder + withMigrations *WithMigrations metadataDownloadBatchSize uint } @@ -133,12 +133,12 @@ type DDLMetaGroup struct { // Generally the config cannot be changed during its lifetime. func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*LogFileManager, error) { fm := &LogFileManager{ - startTS: init.StartTS, - restoreTS: init.RestoreTS, - storage: init.Storage, - helper: stream.NewMetadataHelper(stream.WithEncryptionManager(init.EncryptionManager)), - withMigraionBuilder: init.MigrationsBuilder, - withMigrations: init.Migrations, + startTS: init.StartTS, + restoreTS: init.RestoreTS, + storage: init.Storage, + helper: stream.NewMetadataHelper(stream.WithEncryptionManager(init.EncryptionManager)), + withMigrationBuilder: init.MigrationsBuilder, + withMigrations: init.Migrations, metadataDownloadBatchSize: init.MetadataDownloadBatchSize, } @@ -150,12 +150,12 @@ func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*LogFil } func (lm *LogFileManager) BuildMigrations(migs []*backuppb.Migration) { - w := lm.withMigraionBuilder.Build(migs) + w := lm.withMigrationBuilder.Build(migs) lm.withMigrations = &w } func (lm *LogFileManager) ShiftTS() uint64 { - return rc.shiftStartTS + return lm.shiftStartTS } func (lm *LogFileManager) loadShiftTS(ctx context.Context) error { @@ -187,11 +187,11 @@ func (lm *LogFileManager) loadShiftTS(ctx context.Context) error { } if !shiftTS.exists { lm.shiftStartTS = lm.startTS - lm.withMigraionBuilder.SetShiftStartTS(lm.shiftStartTS) + lm.withMigrationBuilder.SetShiftStartTS(lm.shiftStartTS) return nil } lm.shiftStartTS = shiftTS.value - lm.withMigraionBuilder.SetShiftStartTS(lm.shiftStartTS) + lm.withMigrationBuilder.SetShiftStartTS(lm.shiftStartTS) return nil } @@ -254,7 +254,7 @@ func (lm *LogFileManager) FilterDataFiles(m MetaNameIter) LogIter { if m.meta.MetaVersion > backuppb.MetaVersion_V1 { di.Item.Path = gim.physical.Item.Path } - return di.Item.IsMeta || rc.ShouldFilterOut(di.Item) + return di.Item.IsMeta || lm.ShouldFilterOutByTs(di.Item) }) return iter.Map(fs, func(di FileIndex) *LogDataFileInfo { return &LogDataFileInfo{ @@ -333,7 +333,7 @@ func (lm *LogFileManager) FilterMetaFiles(ms MetaNameIter) MetaGroupIter { if m.meta.MetaVersion > backuppb.MetaVersion_V1 { d.Path = g.Path } - if lm.ShouldFilterOut(d) { + if lm.ShouldFilterOutByTs(d) { return true } // count the progress @@ -349,12 +349,12 @@ func (lm *LogFileManager) FilterMetaFiles(ms MetaNameIter) MetaGroupIter { }) } -// Fetch compactions that may contain file less than the TS. -func (rc *LogFileManager) GetCompactionIter(ctx context.Context) iter.TryNextor[*backuppb.LogFileSubcompaction] { - return rc.withMigrations.Compactions(ctx, rc.storage) +// GetCompactionIter fetches compactions that may contain file less than the TS. +func (lm *LogFileManager) GetCompactionIter(ctx context.Context) iter.TryNextor[*backuppb.LogFileSubcompaction] { + return lm.withMigrations.Compactions(ctx, lm.storage) } -// the kv entry with ts, the ts is decoded from entry. +// KvEntryWithTS is kv entry with ts, the ts is decoded from entry. type KvEntryWithTS struct { E kv.Entry Ts uint64 diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 57c61663e31c7..8cf278a00c339 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -807,7 +807,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s cfg.ConcurrencyPerStore = kvConfigs.ImportGoroutines // using tikv config to set the concurrency-per-store for client. client.SetConcurrencyPerStore(cfg.ConcurrencyPerStore.Value) - err := configureRestoreClient(ctx, client, cfg.RestoreConfig) + err = configureRestoreClient(ctx, client, cfg.RestoreConfig) if err != nil { return errors.Trace(err) } @@ -819,7 +819,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s } metaReader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - if err = client.LoadSchemaIfNeededAndInitClient(ctx, backupMeta, u, metaReader, cfg.LoadStats); err != nil { + if err = client.LoadSchemaIfNeededAndInitClient(ctx, backupMeta, u, metaReader, cfg.LoadStats, nil, nil); err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 7809bceaaf0f9..9b7d00a68fb5f 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1222,14 +1222,12 @@ func RunStreamRestore( var logBackupTableHistory *stream.LogBackupTableHistory // TODO need to do more if cfg.ExplicitFilter { - log.Info("### building log backup table history") logClient, err := createLogClient(ctx, g, cfg, mgr) if err != nil { return errors.Trace(err) } - defer logClient.Close() - dataFileCount := 0 - ddlFiles, err := logClient.LoadDDLFilesAndCountDMLFiles(ctx, &dataFileCount) + defer logClient.Close(ctx) + ddlFiles, err := logClient.LoadDDLFilesAndCountDMLFiles(ctx) logBackupTableHistory, err = logClient.LoadMetaKVFilesAndBuildTableRenameInfo(ctx, ddlFiles) if err != nil { return errors.Trace(err) @@ -1549,15 +1547,12 @@ func createLogClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr * client.SetCrypter(&cfg.CipherInfo) client.SetUpstreamClusterID(cfg.upstreamClusterID) - createCheckpointSessionFn := func() (glue.Session, error) { - // always create a new session for checkpoint runner - // because session is not thread safe - if cfg.UseCheckpoint { - return g.CreateSession(mgr.GetStorage()) - } - return nil, nil + // always create a new session for checkpoint runner + // because session is not thread safe + createSessionFn := func() (glue.Session, error) { + return g.CreateSession(mgr.GetStorage()) } - err = client.InitClients(ctx, u, createCheckpointSessionFn, uint(cfg.Concurrency), cfg.ConcurrencyPerStore.Value) + err = client.InitClients(ctx, u, createSessionFn, cfg.UseCheckpoint, uint(cfg.Concurrency), cfg.ConcurrencyPerStore.Value) if err != nil { return nil, errors.Trace(err) } @@ -1569,7 +1564,7 @@ func createLogClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr * migs, err := client.GetMigrations(ctx) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } client.BuildMigrations(migs)