Skip to content

Commit

Permalink
fix sql, some rename
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Jan 22, 2025
1 parent b6c7dea commit 6940958
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 19 deletions.
7 changes: 2 additions & 5 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,7 @@ func (rc *LogClient) RestoreKVFiles(
}
}()

log.Info("starting to restore kv files")
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.RestoreKVFiles", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand All @@ -843,10 +844,6 @@ func (rc *LogClient) RestoreKVFiles(
// because the tableID of files is the same.
rule, ok := rules[files[0].TableId]
if !ok {
// TODO handle new created table
// For this version we do not handle new created table after full backup.
// in next version we will perform rewrite and restore meta key to restore new created tables.
// so we can simply skip the file that doesn't have the rule here.
onProgress(kvCount)
summary.CollectInt("FileSkip", len(files))
log.Debug("skip file due to table id not matched", zap.Int64("table-id", files[0].TableId))
Expand Down Expand Up @@ -1502,7 +1499,7 @@ func (rc *LogClient) WrapCompactedFilesIterWithSplitHelper(
return wrapper.WithSplit(ctx, compactedIter, strategy), nil
}

// WrapLogFilesIteratorWithSplit applies a splitting strategy to the log files iterator.
// WrapLogFilesIterWithSplitHelper applies a splitting strategy to the log files iterator.
// It uses a region splitter to handle the splitting logic based on the provided rules.
func (rc *LogClient) WrapLogFilesIterWithSplitHelper(
ctx context.Context,
Expand Down
1 change: 0 additions & 1 deletion br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,6 @@ func (cfg *RestoreConfig) adjustRestoreConfigForStreamRestore() {
// another goroutine is used to iterate the backup file
cfg.PitrConcurrency += 1
log.Info("set restore kv files concurrency", zap.Int("concurrency", int(cfg.PitrConcurrency)))
cfg.Config.Concurrency = cfg.PitrConcurrency
if cfg.ConcurrencyPerStore.Value > 0 {
log.Info("set restore compacted sst files concurrency per store",
zap.Int("concurrency", int(cfg.ConcurrencyPerStore.Value)))
Expand Down
27 changes: 14 additions & 13 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,11 +483,11 @@ func (s *streamMgr) checkStreamStartEnable(ctx context.Context) error {
return nil
}

type RestoreGcFunc func(string) error
type RestoreGCFunc func(string) error

// DisableGC disables and returns a function that can enable gc back.
// gc.ratio-threshold = "-1.0", which represents disable gc in TiKV.
func DisableGC(g glue.Glue, store kv.Storage) (RestoreGcFunc, string, error) {
func DisableGC(g glue.Glue, store kv.Storage) (RestoreGCFunc, string, error) {
se, err := g.CreateSession(store)
if err != nil {
return nil, "", errors.Trace(err)
Expand Down Expand Up @@ -1328,6 +1328,8 @@ func RunStreamRestore(
failpoint.Return(errors.New("failpoint: failed before full restore"))
})

// restore log.
cfg.adjustRestoreConfigForStreamRestore()
cfg.tiflashRecorder = tiflashrec.New()
logClient, err := createLogClient(ctx, g, cfg, mgr)
if err != nil {
Expand Down Expand Up @@ -1385,8 +1387,6 @@ func RunStreamRestore(
cfg.tiflashRecorder.Load(taskInfo.CheckpointInfo.Metadata.TiFlashItems)
}
}
// restore log.
cfg.adjustRestoreConfigForStreamRestore()
logRestoreConfig := &LogRestoreConfig{
RestoreConfig: cfg,
checkpointTaskInfo: taskInfo.CheckpointInfo,
Expand Down Expand Up @@ -1495,7 +1495,7 @@ func restoreStream(

// It need disable GC in TiKV when PiTR.
// because the process of PITR is concurrent and kv events isn't sorted by tso.
restoreGcFunc, oldGcRatio, err := DisableGC(g, mgr.GetStorage())
restoreGCFunc, oldGCRatio, err := DisableGC(g, mgr.GetStorage())
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1509,24 +1509,24 @@ func restoreStream(

// If the oldGcRatio is negative, which is not normal status.
// It should set default value "1.1" after PiTR finished.
if strings.HasPrefix(oldGcRatio, "-") {
log.Warn("the original gc-ratio is negative, reset by default value 1.1", zap.String("old-gc-ratio", oldGcRatio))
oldGcRatio = utils.DefaultGcRatioVal
if strings.HasPrefix(oldGCRatio, "-") {
log.Warn("the original gc-ratio is negative, reset by default value 1.1", zap.String("old-gc-ratio", oldGCRatio))
oldGCRatio = utils.DefaultGcRatioVal
}
log.Info("start to restore gc", zap.String("ratio", oldGcRatio))
if err := restoreGcFunc(oldGcRatio); err != nil {
log.Info("start to restore gc", zap.String("ratio", oldGCRatio))
if err := restoreGCFunc(oldGCRatio); err != nil {
log.Error("failed to restore gc", zap.Error(err))
}
log.Info("finish restoring gc")
}()

var sstCheckpointSets map[string]struct{}
if cfg.UseCheckpoint {
gcRatioFromCheckpoint, err := client.LoadOrCreateCheckpointMetadataForLogRestore(ctx, cfg.StartTS, cfg.RestoreTS, oldGcRatio, cfg.tiflashRecorder)
gcRatioFromCheckpoint, err := client.LoadOrCreateCheckpointMetadataForLogRestore(ctx, cfg.StartTS, cfg.RestoreTS, oldGCRatio, cfg.tiflashRecorder)
if err != nil {
return errors.Trace(err)
}
oldGcRatio = gcRatioFromCheckpoint
oldGCRatio = gcRatioFromCheckpoint
sstCheckpointSets, err = client.InitCheckpointMetadataForCompactedSstRestore(ctx)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1734,7 +1734,8 @@ func createLogClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr *
}
return nil, nil
}
err = client.InitClients(ctx, u, createCheckpointSessionFn, uint(cfg.Concurrency), cfg.ConcurrencyPerStore.Value)

err = client.InitClients(ctx, u, createCheckpointSessionFn, uint(cfg.PitrConcurrency), cfg.ConcurrencyPerStore.Value)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down

0 comments on commit 6940958

Please sign in to comment.