From 6940958a016a02eb2a6e48fa3f99b13aa86a900b Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Wed, 22 Jan 2025 13:05:57 -0500 Subject: [PATCH] fix sql, some rename Signed-off-by: Wenqi Mou --- br/pkg/restore/log_client/client.go | 7 ++----- br/pkg/task/restore.go | 1 - br/pkg/task/stream.go | 27 ++++++++++++++------------- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index dd800a4f959c9..d5c5d33340d61 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -827,6 +827,7 @@ func (rc *LogClient) RestoreKVFiles( } }() + log.Info("starting to restore kv files") if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("Client.RestoreKVFiles", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -843,10 +844,6 @@ func (rc *LogClient) RestoreKVFiles( // because the tableID of files is the same. rule, ok := rules[files[0].TableId] if !ok { - // TODO handle new created table - // For this version we do not handle new created table after full backup. - // in next version we will perform rewrite and restore meta key to restore new created tables. - // so we can simply skip the file that doesn't have the rule here. onProgress(kvCount) summary.CollectInt("FileSkip", len(files)) log.Debug("skip file due to table id not matched", zap.Int64("table-id", files[0].TableId)) @@ -1502,7 +1499,7 @@ func (rc *LogClient) WrapCompactedFilesIterWithSplitHelper( return wrapper.WithSplit(ctx, compactedIter, strategy), nil } -// WrapLogFilesIteratorWithSplit applies a splitting strategy to the log files iterator. +// WrapLogFilesIterWithSplitHelper applies a splitting strategy to the log files iterator. // It uses a region splitter to handle the splitting logic based on the provided rules. func (rc *LogClient) WrapLogFilesIterWithSplitHelper( ctx context.Context, diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index b7b39dfafdd55..e550a4ac311c7 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -538,7 +538,6 @@ func (cfg *RestoreConfig) adjustRestoreConfigForStreamRestore() { // another goroutine is used to iterate the backup file cfg.PitrConcurrency += 1 log.Info("set restore kv files concurrency", zap.Int("concurrency", int(cfg.PitrConcurrency))) - cfg.Config.Concurrency = cfg.PitrConcurrency if cfg.ConcurrencyPerStore.Value > 0 { log.Info("set restore compacted sst files concurrency per store", zap.Int("concurrency", int(cfg.ConcurrencyPerStore.Value))) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 1bda77b74001f..ae6cf999b289e 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -483,11 +483,11 @@ func (s *streamMgr) checkStreamStartEnable(ctx context.Context) error { return nil } -type RestoreGcFunc func(string) error +type RestoreGCFunc func(string) error // DisableGC disables and returns a function that can enable gc back. // gc.ratio-threshold = "-1.0", which represents disable gc in TiKV. -func DisableGC(g glue.Glue, store kv.Storage) (RestoreGcFunc, string, error) { +func DisableGC(g glue.Glue, store kv.Storage) (RestoreGCFunc, string, error) { se, err := g.CreateSession(store) if err != nil { return nil, "", errors.Trace(err) @@ -1328,6 +1328,8 @@ func RunStreamRestore( failpoint.Return(errors.New("failpoint: failed before full restore")) }) + // restore log. + cfg.adjustRestoreConfigForStreamRestore() cfg.tiflashRecorder = tiflashrec.New() logClient, err := createLogClient(ctx, g, cfg, mgr) if err != nil { @@ -1385,8 +1387,6 @@ func RunStreamRestore( cfg.tiflashRecorder.Load(taskInfo.CheckpointInfo.Metadata.TiFlashItems) } } - // restore log. - cfg.adjustRestoreConfigForStreamRestore() logRestoreConfig := &LogRestoreConfig{ RestoreConfig: cfg, checkpointTaskInfo: taskInfo.CheckpointInfo, @@ -1495,7 +1495,7 @@ func restoreStream( // It need disable GC in TiKV when PiTR. // because the process of PITR is concurrent and kv events isn't sorted by tso. - restoreGcFunc, oldGcRatio, err := DisableGC(g, mgr.GetStorage()) + restoreGCFunc, oldGCRatio, err := DisableGC(g, mgr.GetStorage()) if err != nil { return errors.Trace(err) } @@ -1509,12 +1509,12 @@ func restoreStream( // If the oldGcRatio is negative, which is not normal status. // It should set default value "1.1" after PiTR finished. - if strings.HasPrefix(oldGcRatio, "-") { - log.Warn("the original gc-ratio is negative, reset by default value 1.1", zap.String("old-gc-ratio", oldGcRatio)) - oldGcRatio = utils.DefaultGcRatioVal + if strings.HasPrefix(oldGCRatio, "-") { + log.Warn("the original gc-ratio is negative, reset by default value 1.1", zap.String("old-gc-ratio", oldGCRatio)) + oldGCRatio = utils.DefaultGcRatioVal } - log.Info("start to restore gc", zap.String("ratio", oldGcRatio)) - if err := restoreGcFunc(oldGcRatio); err != nil { + log.Info("start to restore gc", zap.String("ratio", oldGCRatio)) + if err := restoreGCFunc(oldGCRatio); err != nil { log.Error("failed to restore gc", zap.Error(err)) } log.Info("finish restoring gc") @@ -1522,11 +1522,11 @@ func restoreStream( var sstCheckpointSets map[string]struct{} if cfg.UseCheckpoint { - gcRatioFromCheckpoint, err := client.LoadOrCreateCheckpointMetadataForLogRestore(ctx, cfg.StartTS, cfg.RestoreTS, oldGcRatio, cfg.tiflashRecorder) + gcRatioFromCheckpoint, err := client.LoadOrCreateCheckpointMetadataForLogRestore(ctx, cfg.StartTS, cfg.RestoreTS, oldGCRatio, cfg.tiflashRecorder) if err != nil { return errors.Trace(err) } - oldGcRatio = gcRatioFromCheckpoint + oldGCRatio = gcRatioFromCheckpoint sstCheckpointSets, err = client.InitCheckpointMetadataForCompactedSstRestore(ctx) if err != nil { return errors.Trace(err) @@ -1734,7 +1734,8 @@ func createLogClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr * } return nil, nil } - err = client.InitClients(ctx, u, createCheckpointSessionFn, uint(cfg.Concurrency), cfg.ConcurrencyPerStore.Value) + + err = client.InitClients(ctx, u, createCheckpointSessionFn, uint(cfg.PitrConcurrency), cfg.ConcurrencyPerStore.Value) if err != nil { return nil, errors.Trace(err) }