Skip to content

Commit

Permalink
run prepare
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Nov 25, 2024
1 parent c29b578 commit 6ebc531
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 234 deletions.
3 changes: 2 additions & 1 deletion br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ func ExistsCheckpointProgress(
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointProgressTableName))
}

// CheckpointTaskInfoForLogRestore is tied to a specific cluster. It represents the last restore task executed this cluster.
// CheckpointTaskInfoForLogRestore is tied to a specific cluster.
// It represents the last restore task executed in this cluster.
type CheckpointTaskInfoForLogRestore struct {
Metadata *CheckpointMetadataForLogRestore
HasSnapshotMetadata bool
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ go_test(
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/utils",
"//br/pkg/utils/consts",
"//br/pkg/utils/iter",
"//br/pkg/utiltest",
"//pkg/domain",
Expand All @@ -119,7 +120,6 @@ go_test(
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/sqlexec",
"//pkg/util/table-filter",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/restore/log_client/batch_file_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (

// BatchFileProcessor defines how to process a batch of files
type BatchFileProcessor interface {
// process a batch of files and with a filterTS and return what's not processed for next iteration
processBatch(
// ProcessBatch processes a batch of files and with a filterTS and return what's not processed for next iteration
ProcessBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
Expand All @@ -47,7 +47,7 @@ type RestoreProcessor struct {
progressInc func()
}

func (rp *RestoreProcessor) processBatch(
func (rp *RestoreProcessor) ProcessBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
Expand All @@ -71,7 +71,7 @@ type DDLCollector struct {
tableRenameInfo *stream.LogBackupTableHistory
}

func (dc *DDLCollector) processBatch(
func (dc *DDLCollector) ProcessBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
Expand Down
19 changes: 12 additions & 7 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,15 +1016,17 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
dbReplaces = stream.FromSchemaMaps(dbMaps)
if len(dbReplaces) <= 0 {
envVal, ok := os.LookupEnv(UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL)
if ok && len(envVal) > 0 {
log.Info(fmt.Sprintf("the environment variable %s is active, skip loading the base schemas.", UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL))
} else {
if !ok || len(envVal) <= 0 {
return nil, errors.Errorf("miss upstream table information at `start-ts`(%d) but the full backup path is not specified", rc.startTS)
}
log.Info(fmt.Sprintf("the environment variable %s is active, skip loading the base schemas.", UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL))
}
} else {
log.Info("building table replaces from full backup storage")
dbReplaces, err = rc.generateDBReplacesFromFullBackupStorage(ctx, cfg, cipherInfo)
if err != nil {
return nil, errors.Trace(err)
}
}
}

Expand Down Expand Up @@ -1272,7 +1274,7 @@ func LoadAndProcessMetaKVFilesInBatch(
} else {
// Either f.MinTS > rangeMax or f.MinTs is the filterTs we need.
// So it is ok to pass f.MinTs as filterTs.
defaultKvEntries, err = processor.processBatch(ctx, defaultFiles[defaultIdx:i], defaultKvEntries, f.MinTs, consts.DefaultCF)
defaultKvEntries, err = processor.ProcessBatch(ctx, defaultFiles[defaultIdx:i], defaultKvEntries, f.MinTs, consts.DefaultCF)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1289,7 +1291,7 @@ func LoadAndProcessMetaKVFilesInBatch(
break
}
}
writeKvEntries, err = processor.processBatch(ctx, writeFiles[writeIdx:toWriteIdx], writeKvEntries, f.MinTs, consts.WriteCF)
writeKvEntries, err = processor.ProcessBatch(ctx, writeFiles[writeIdx:toWriteIdx], writeKvEntries, f.MinTs, consts.WriteCF)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1301,11 +1303,11 @@ func LoadAndProcessMetaKVFilesInBatch(
// restore the left meta kv files and entries
// Notice: restoreBatch needs to realize the parameter `files` and `kvEntries` might be empty
// Assert: defaultIdx <= len(defaultFiles) && writeIdx <= len(writeFiles)
_, err = processor.processBatch(ctx, defaultFiles[defaultIdx:], defaultKvEntries, math.MaxUint64, consts.DefaultCF)
_, err = processor.ProcessBatch(ctx, defaultFiles[defaultIdx:], defaultKvEntries, math.MaxUint64, consts.DefaultCF)
if err != nil {
return errors.Trace(err)
}
_, err = processor.processBatch(ctx, writeFiles[writeIdx:], writeKvEntries, math.MaxUint64, consts.WriteCF)
_, err = processor.ProcessBatch(ctx, writeFiles[writeIdx:], writeKvEntries, math.MaxUint64, consts.WriteCF)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1327,6 +1329,9 @@ func (rc *LogClient) RestoreBatchMetaKVFiles(
cf string,
) ([]*KvEntryWithTS, error) {
curSortedKvEntries, filteredOutKvEntries, err := rc.filterAndSortKvEntriesFromFiles(ctx, files, kvEntries, filterTS)
if err != nil {
return nil, errors.Trace(err)
}

// restore and rewrite these entries to TiKV with rawPut() method.
kvCount, size, err := rc.restoreAndRewriteMetaKvEntries(ctx, schemasReplace, curSortedKvEntries, cf)
Expand Down
Loading

0 comments on commit 6ebc531

Please sign in to comment.