Skip to content

Commit

Permalink
resolve more
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Nov 22, 2024
1 parent e9dd4d4 commit fddc0ce
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 61 deletions.
8 changes: 1 addition & 7 deletions br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util/sqlexec"
Expand Down Expand Up @@ -123,13 +122,8 @@ func StartCheckpointLogRestoreRunnerForTest(

func StartCheckpointRunnerForLogRestore(
ctx context.Context,
g glue.Glue,
store kv.Storage,
session glue.Session,
) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error) {
session, err := g.CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
}
runner := newCheckpointRunner[LogRestoreKeyType, LogRestoreValueType](
newTableCheckpointStorage(session, LogRestoreCheckpointDatabaseName),
nil, valueMarshalerForLogRestore)
Expand Down
50 changes: 29 additions & 21 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,21 @@ func NewLogRestoreManager(
ctx context.Context,
fileImporter *LogFileImporter,
poolSize uint,
createCheckpointSessionFn func() (glue.Session, error),
createSessionFn func() (glue.Session, error),
useCheckpoint bool,
) (*LogRestoreManager, error) {
// for compacted reason, user only set --concurrency for log file restore speed.
log.Info("log restore worker pool", zap.Uint("size", poolSize))
l := &LogRestoreManager{
fileImporter: fileImporter,
workerPool: tidbutil.NewWorkerPool(poolSize, "log manager worker pool"),
}
se, err := createCheckpointSessionFn()
if err != nil {
return nil, errors.Trace(err)
}

if se != nil {
if useCheckpoint {
se, err := createSessionFn()
if err != nil {
return nil, errors.Trace(err)
}
l.checkpointRunner, err = checkpoint.StartCheckpointRunnerForLogRestore(ctx, se)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -151,7 +152,8 @@ func NewSstRestoreManager(
snapFileImporter *snapclient.SnapFileImporter,
concurrencyPerStore uint,
storeCount uint,
createCheckpointSessionFn func() (glue.Session, error),
createSessionFn func() (glue.Session, error),
useCheckpoint bool,
) (*SstRestoreManager, error) {
// This poolSize is similar to full restore, as both workflows are comparable.
// The poolSize should be greater than concurrencyPerStore multiplied by the number of stores.
Expand All @@ -162,11 +164,11 @@ func NewSstRestoreManager(
s := &SstRestoreManager{
workerPool: tidbutil.NewWorkerPool(poolSize, "log manager worker pool"),
}
se, err := createCheckpointSessionFn()
if err != nil {
return nil, errors.Trace(err)
}
if se != nil {
if useCheckpoint {
se, err := createSessionFn()
if err != nil {
return nil, errors.Trace(err)
}
checkpointRunner, err := checkpoint.StartCheckpointRunnerForRestore(ctx, se)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -233,23 +235,26 @@ func NewLogClient(

// Close a client.
func (rc *LogClient) Close(ctx context.Context) {
defer func() {
if rc.logRestoreManager != nil {
rc.logRestoreManager.Close(ctx)
}
if rc.sstRestoreManager != nil {
rc.sstRestoreManager.Close(ctx)
}
}()

// close the connection, and it must be succeeded when in SQL mode.
if rc.unsafeSession != nil {
rc.unsafeSession.Close()
}

if rc.LogFileManager != nil {
rc.LogFileManager.Close()
}

if rc.rawKVClient != nil {
rc.rawKVClient.Close()
}

if rc.logRestoreManager != nil {
rc.logRestoreManager.Close(ctx)
}

if rc.sstRestoreManager != nil {
rc.sstRestoreManager.Close(ctx)
}
log.Info("Log client closed")
}

Expand Down Expand Up @@ -394,6 +399,7 @@ func (rc *LogClient) InitClients(
ctx context.Context,
backend *backuppb.StorageBackend,
createSessionFn func() (glue.Session, error),
useCheckpoint bool,
concurrency uint,
concurrencyPerStore uint,
) error {
Expand All @@ -410,6 +416,7 @@ func (rc *LogClient) InitClients(
NewLogFileImporter(metaClient, importCli, backend),
concurrency,
createSessionFn,
useCheckpoint,
)
if err != nil {
return errors.Trace(err)
Expand All @@ -435,6 +442,7 @@ func (rc *LogClient) InitClients(
concurrencyPerStore,
uint(len(stores)),
createSessionFn,
useCheckpoint,
)
return errors.Trace(err)
}
Expand Down
36 changes: 18 additions & 18 deletions br/pkg/restore/log_client/log_file_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ type LogFileManager struct {
storage storage.ExternalStorage
helper streamMetadataHelper

withMigraionBuilder *WithMigrationsBuilder
withMigrations *WithMigrations
withMigrationBuilder *WithMigrationsBuilder
withMigrations *WithMigrations

metadataDownloadBatchSize uint
}
Expand All @@ -133,12 +133,12 @@ type DDLMetaGroup struct {
// Generally the config cannot be changed during its lifetime.
func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*LogFileManager, error) {
fm := &LogFileManager{
startTS: init.StartTS,
restoreTS: init.RestoreTS,
storage: init.Storage,
helper: stream.NewMetadataHelper(stream.WithEncryptionManager(init.EncryptionManager)),
withMigraionBuilder: init.MigrationsBuilder,
withMigrations: init.Migrations,
startTS: init.StartTS,
restoreTS: init.RestoreTS,
storage: init.Storage,
helper: stream.NewMetadataHelper(stream.WithEncryptionManager(init.EncryptionManager)),
withMigrationBuilder: init.MigrationsBuilder,
withMigrations: init.Migrations,

metadataDownloadBatchSize: init.MetadataDownloadBatchSize,
}
Expand All @@ -150,12 +150,12 @@ func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*LogFil
}

func (lm *LogFileManager) BuildMigrations(migs []*backuppb.Migration) {
w := lm.withMigraionBuilder.Build(migs)
w := lm.withMigrationBuilder.Build(migs)
lm.withMigrations = &w
}

func (lm *LogFileManager) ShiftTS() uint64 {
return rc.shiftStartTS
return lm.shiftStartTS
}

func (lm *LogFileManager) loadShiftTS(ctx context.Context) error {
Expand Down Expand Up @@ -187,11 +187,11 @@ func (lm *LogFileManager) loadShiftTS(ctx context.Context) error {
}
if !shiftTS.exists {
lm.shiftStartTS = lm.startTS
lm.withMigraionBuilder.SetShiftStartTS(lm.shiftStartTS)
lm.withMigrationBuilder.SetShiftStartTS(lm.shiftStartTS)
return nil
}
lm.shiftStartTS = shiftTS.value
lm.withMigraionBuilder.SetShiftStartTS(lm.shiftStartTS)
lm.withMigrationBuilder.SetShiftStartTS(lm.shiftStartTS)
return nil
}

Expand Down Expand Up @@ -254,7 +254,7 @@ func (lm *LogFileManager) FilterDataFiles(m MetaNameIter) LogIter {
if m.meta.MetaVersion > backuppb.MetaVersion_V1 {
di.Item.Path = gim.physical.Item.Path
}
return di.Item.IsMeta || rc.ShouldFilterOut(di.Item)
return di.Item.IsMeta || lm.ShouldFilterOutByTs(di.Item)
})
return iter.Map(fs, func(di FileIndex) *LogDataFileInfo {
return &LogDataFileInfo{
Expand Down Expand Up @@ -333,7 +333,7 @@ func (lm *LogFileManager) FilterMetaFiles(ms MetaNameIter) MetaGroupIter {
if m.meta.MetaVersion > backuppb.MetaVersion_V1 {
d.Path = g.Path
}
if lm.ShouldFilterOut(d) {
if lm.ShouldFilterOutByTs(d) {
return true
}
// count the progress
Expand All @@ -349,12 +349,12 @@ func (lm *LogFileManager) FilterMetaFiles(ms MetaNameIter) MetaGroupIter {
})
}

// Fetch compactions that may contain file less than the TS.
func (rc *LogFileManager) GetCompactionIter(ctx context.Context) iter.TryNextor[*backuppb.LogFileSubcompaction] {
return rc.withMigrations.Compactions(ctx, rc.storage)
// GetCompactionIter fetches compactions that may contain file less than the TS.
func (lm *LogFileManager) GetCompactionIter(ctx context.Context) iter.TryNextor[*backuppb.LogFileSubcompaction] {
return lm.withMigrations.Compactions(ctx, lm.storage)
}

// the kv entry with ts, the ts is decoded from entry.
// KvEntryWithTS is kv entry with ts, the ts is decoded from entry.
type KvEntryWithTS struct {
E kv.Entry
Ts uint64
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s
cfg.ConcurrencyPerStore = kvConfigs.ImportGoroutines
// using tikv config to set the concurrency-per-store for client.
client.SetConcurrencyPerStore(cfg.ConcurrencyPerStore.Value)
err := configureRestoreClient(ctx, client, cfg.RestoreConfig)
err = configureRestoreClient(ctx, client, cfg.RestoreConfig)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -819,7 +819,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s
}

metaReader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo)
if err = client.LoadSchemaIfNeededAndInitClient(ctx, backupMeta, u, metaReader, cfg.LoadStats); err != nil {
if err = client.LoadSchemaIfNeededAndInitClient(ctx, backupMeta, u, metaReader, cfg.LoadStats, nil, nil); err != nil {
return errors.Trace(err)
}

Expand Down
21 changes: 8 additions & 13 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,14 +1222,12 @@ func RunStreamRestore(
var logBackupTableHistory *stream.LogBackupTableHistory
// TODO need to do more
if cfg.ExplicitFilter {
log.Info("### building log backup table history")
logClient, err := createLogClient(ctx, g, cfg, mgr)
if err != nil {
return errors.Trace(err)
}
defer logClient.Close()
dataFileCount := 0
ddlFiles, err := logClient.LoadDDLFilesAndCountDMLFiles(ctx, &dataFileCount)
defer logClient.Close(ctx)
ddlFiles, err := logClient.LoadDDLFilesAndCountDMLFiles(ctx)
logBackupTableHistory, err = logClient.LoadMetaKVFilesAndBuildTableRenameInfo(ctx, ddlFiles)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1549,15 +1547,12 @@ func createLogClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr *
client.SetCrypter(&cfg.CipherInfo)
client.SetUpstreamClusterID(cfg.upstreamClusterID)

createCheckpointSessionFn := func() (glue.Session, error) {
// always create a new session for checkpoint runner
// because session is not thread safe
if cfg.UseCheckpoint {
return g.CreateSession(mgr.GetStorage())
}
return nil, nil
// always create a new session for checkpoint runner
// because session is not thread safe
createSessionFn := func() (glue.Session, error) {
return g.CreateSession(mgr.GetStorage())
}
err = client.InitClients(ctx, u, createCheckpointSessionFn, uint(cfg.Concurrency), cfg.ConcurrencyPerStore.Value)
err = client.InitClients(ctx, u, createSessionFn, cfg.UseCheckpoint, uint(cfg.Concurrency), cfg.ConcurrencyPerStore.Value)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -1569,7 +1564,7 @@ func createLogClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr *

migs, err := client.GetMigrations(ctx)
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}
client.BuildMigrations(migs)

Expand Down

0 comments on commit fddc0ce

Please sign in to comment.