Skip to content

Commit

Permalink
br: separate nested logic of table rewrite and building id map (#58112)
Browse files Browse the repository at this point in the history
close #57859
  • Loading branch information
Tristan1900 authored Dec 19, 2024
1 parent a3c9b79 commit 27a9a7d
Show file tree
Hide file tree
Showing 10 changed files with 586 additions and 488 deletions.
171 changes: 105 additions & 66 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ func (rc *LogClient) SetCurrentTS(ts uint64) error {
return nil
}

func (rc *LogClient) CurrentTS() uint64 {
return rc.currentTS
}

// GetClusterID gets the cluster id from down-stream cluster.
func (rc *LogClient) GetClusterID(ctx context.Context) uint64 {
if rc.clusterID <= 0 {
Expand Down Expand Up @@ -906,22 +910,22 @@ type FullBackupStorageConfig struct {
Opts *storage.ExternalStorageOptions
}

type InitSchemaConfig struct {
type BuildTableMappingManagerConfig struct {
// required
IsNewTask bool
TableFilter filter.Filter
CurrentIdMapSaved bool
TableFilter filter.Filter

// optional
TiFlashRecorder *tiflashrec.TiFlashRecorder
FullBackupStorage *FullBackupStorageConfig
CipherInfo *backuppb.CipherInfo
Files []*backuppb.DataFileInfo
}

const UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL = "UNSAFE_PITR_LOG_RESTORE_START_BEFORE_ANY_UPSTREAM_USER_DDL"

func (rc *LogClient) generateDBReplacesFromFullBackupStorage(
ctx context.Context,
cfg *InitSchemaConfig,
cipherInfo *backuppb.CipherInfo,
cfg *BuildTableMappingManagerConfig,
) (map[stream.UpstreamID]*stream.DBReplace, error) {
dbReplaces := make(map[stream.UpstreamID]*stream.DBReplace)
if cfg.FullBackupStorage == nil {
Expand All @@ -936,7 +940,7 @@ func (rc *LogClient) generateDBReplacesFromFullBackupStorage(
if err != nil {
return nil, errors.Trace(err)
}
fullBackupTables, err := initFullBackupTables(ctx, s, cfg.TableFilter, cipherInfo)
fullBackupTables, err := initFullBackupTables(ctx, s, cfg.TableFilter, cfg.CipherInfo)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -974,25 +978,24 @@ func (rc *LogClient) generateDBReplacesFromFullBackupStorage(
return dbReplaces, nil
}

// InitSchemasReplaceForDDL gets schemas information Mapping from old schemas to new schemas.
// It is used to rewrite meta kv-event.
func (rc *LogClient) InitSchemasReplaceForDDL(
// BuildTableMappingManager builds the table mapping manager. It reads the full backup storage to get the full backup
// table info to initialize the manager, or it reads the id map from previous task,
// or it loads the saved mapping from last time of run of the same task.
func (rc *LogClient) BuildTableMappingManager(
ctx context.Context,
cfg *InitSchemaConfig,
cipherInfo *backuppb.CipherInfo,
) (*stream.SchemasReplace, error) {
cfg *BuildTableMappingManagerConfig,
) (*stream.TableMappingManager, error) {
var (
err error
dbMaps []*backuppb.PitrDBMap
// the id map doesn't need to construct only when it is not the first execution
needConstructIdMap bool

dbReplaces map[stream.UpstreamID]*stream.DBReplace
dbReplaces map[stream.UpstreamID]*stream.DBReplace
)

// not new task, load schemas map from external storage
if !cfg.IsNewTask {
log.Info("try to load pitr id maps")
// this is a retry, id map saved last time, load it from external storage
if cfg.CurrentIdMapSaved {
log.Info("try to load previously saved pitr id maps")
needConstructIdMap = false
dbMaps, err = rc.initSchemasMap(ctx, rc.restoreTS)
if err != nil {
Expand Down Expand Up @@ -1024,16 +1027,16 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
if len(dbMaps) <= 0 {
log.Info("no id maps, build the table replaces from cluster and full backup schemas")
needConstructIdMap = true
dbReplaces, err = rc.generateDBReplacesFromFullBackupStorage(ctx, cfg, cipherInfo)
dbReplaces, err = rc.generateDBReplacesFromFullBackupStorage(ctx, cfg)
if err != nil {
return nil, errors.Trace(err)
}
} else {
dbReplaces = stream.FromSchemaMaps(dbMaps)
dbReplaces = stream.FromDBMapProto(dbMaps)
}

for oldDBID, dbReplace := range dbReplaces {
log.Info("replace info", func() []zapcore.Field {
log.Info("base replace info", func() []zapcore.Field {
fields := make([]zapcore.Field, 0, (len(dbReplace.TableMap)+1)*3)
fields = append(fields,
zap.String("dbName", dbReplace.Name),
Expand All @@ -1049,10 +1052,16 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
}()...)
}

rp := stream.NewSchemasReplace(
dbReplaces, needConstructIdMap, cfg.TiFlashRecorder, rc.currentTS, cfg.TableFilter, rc.GenGlobalID, rc.GenGlobalIDs,
rc.RecordDeleteRange)
return rp, nil
tableMappingManager := stream.NewTableMappingManager(dbReplaces, rc.GenGlobalID)

// not loaded from previously saved, need to iter meta kv and build and save the map
if needConstructIdMap {
if err = rc.IterMetaKVToBuildAndSaveIdMap(ctx, tableMappingManager, cfg.Files); err != nil {
return nil, errors.Trace(err)
}
}

return tableMappingManager, nil
}

func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo {
Expand All @@ -1068,8 +1077,8 @@ func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo {
return files
}

// RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup.
func (rc *LogClient) RestoreMetaKVFiles(
// RestoreAndRewriteMetaKVFiles tries to restore files about meta kv-event from stream-backup.
func (rc *LogClient) RestoreAndRewriteMetaKVFiles(
ctx context.Context,
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
Expand Down Expand Up @@ -1100,30 +1109,11 @@ func (rc *LogClient) RestoreMetaKVFiles(
filesInDefaultCF = SortMetaKVFiles(filesInDefaultCF)
filesInWriteCF = SortMetaKVFiles(filesInWriteCF)

failpoint.Inject("failed-before-id-maps-saved", func(_ failpoint.Value) {
failpoint.Return(errors.New("failpoint: failed before id maps saved"))
})

log.Info("start to restore meta files",
zap.Int("total files", len(files)),
zap.Int("default files", len(filesInDefaultCF)),
zap.Int("write files", len(filesInWriteCF)))

if schemasReplace.NeedConstructIdMap() {
// Preconstruct the map and save it into external storage.
if err := rc.PreConstructAndSaveIDMap(
ctx,
filesInWriteCF,
filesInDefaultCF,
schemasReplace,
); err != nil {
return errors.Trace(err)
}
}
failpoint.Inject("failed-after-id-maps-saved", func(_ failpoint.Value) {
failpoint.Return(errors.New("failpoint: failed after id maps saved"))
})

// run the rewrite and restore meta-kv into TiKV cluster.
if err := RestoreMetaKVFilesWithBatchMethod(
ctx,
Expand All @@ -1144,31 +1134,84 @@ func (rc *LogClient) RestoreMetaKVFiles(
return nil
}

// PreConstructAndSaveIDMap constructs id mapping and save it.
func (rc *LogClient) PreConstructAndSaveIDMap(
// IterMetaKVToBuildAndSaveIdMap iterates meta kv and builds id mapping and saves it to storage.
func (rc *LogClient) IterMetaKVToBuildAndSaveIdMap(
ctx context.Context,
fsInWriteCF, fsInDefaultCF []*backuppb.DataFileInfo,
sr *stream.SchemasReplace,
tableMappingManager *stream.TableMappingManager,
files []*backuppb.DataFileInfo,
) error {
sr.SetPreConstructMapStatus()
filesInDefaultCF := make([]*backuppb.DataFileInfo, 0, len(files))
// need to look at write cf for "short value", which inlines the actual values without redirecting to default cf
filesInWriteCF := make([]*backuppb.DataFileInfo, 0, len(files))

for _, f := range files {
if f.Type == backuppb.FileType_Delete {
// it should not happen
// only do some preventive checks here.
log.Warn("internal error: detected delete file of meta key, skip it", zap.Any("file", f))
continue
}
if f.Cf == stream.WriteCF {
filesInWriteCF = append(filesInWriteCF, f)
continue
}
if f.Cf == stream.DefaultCF {
filesInDefaultCF = append(filesInDefaultCF, f)
}
}

filesInDefaultCF = SortMetaKVFiles(filesInDefaultCF)
filesInWriteCF = SortMetaKVFiles(filesInWriteCF)

failpoint.Inject("failed-before-id-maps-saved", func(_ failpoint.Value) {
failpoint.Return(errors.New("failpoint: failed before id maps saved"))
})

log.Info("start to iterate meta kv and build id map",
zap.Int("total files", len(files)),
zap.Int("default files", len(filesInDefaultCF)),
zap.Int("write files", len(filesInWriteCF)))

// build the map and save it into external storage.
if err := rc.buildAndSaveIDMap(
ctx,
filesInDefaultCF,
filesInWriteCF,
tableMappingManager,
); err != nil {
return errors.Trace(err)
}
failpoint.Inject("failed-after-id-maps-saved", func(_ failpoint.Value) {
failpoint.Return(errors.New("failpoint: failed after id maps saved"))
})
return nil
}

if err := rc.constructIDMap(ctx, fsInWriteCF, sr); err != nil {
// buildAndSaveIDMap build id mapping and save it.
func (rc *LogClient) buildAndSaveIDMap(
ctx context.Context,
fsInDefaultCF []*backuppb.DataFileInfo,
fsInWriteCF []*backuppb.DataFileInfo,
tableMappingManager *stream.TableMappingManager,
) error {
if err := rc.iterAndBuildIDMap(ctx, fsInWriteCF, tableMappingManager); err != nil {
return errors.Trace(err)
}
if err := rc.constructIDMap(ctx, fsInDefaultCF, sr); err != nil {

if err := rc.iterAndBuildIDMap(ctx, fsInDefaultCF, tableMappingManager); err != nil {
return errors.Trace(err)
}

if err := rc.saveIDMap(ctx, sr); err != nil {
if err := rc.saveIDMap(ctx, tableMappingManager); err != nil {
return errors.Trace(err)
}
return nil
}

func (rc *LogClient) constructIDMap(
func (rc *LogClient) iterAndBuildIDMap(
ctx context.Context,
fs []*backuppb.DataFileInfo,
sr *stream.SchemasReplace,
tableMappingManager *stream.TableMappingManager,
) error {
for _, f := range fs {
entries, _, err := rc.ReadAllEntries(ctx, f, math.MaxUint64)
Expand All @@ -1177,7 +1220,7 @@ func (rc *LogClient) constructIDMap(
}

for _, entry := range entries {
if _, err := sr.RewriteKvEntry(&entry.E, f.GetCf()); err != nil {
if err := tableMappingManager.ParseMetaKvAndUpdateIdMapping(&entry.E, f.GetCf()); err != nil {
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -1218,8 +1261,6 @@ func RestoreMetaKVFilesWithBatchMethod(
defaultKvEntries = make([]*KvEntryWithTS, 0)
writeKvEntries = make([]*KvEntryWithTS, 0)
)
// Set restoreKV to SchemaReplace.
schemasReplace.SetRestoreKVStatus()

for i, f := range defaultFiles {
if i == 0 {
Expand Down Expand Up @@ -1322,11 +1363,9 @@ func (rc *LogClient) RestoreBatchMetaKVFiles(
return nextKvEntries, errors.Trace(err)
}

if schemasReplace.IsRestoreKVStatus() {
updateStats(kvCount, size)
for i := 0; i < len(files); i++ {
progressInc()
}
updateStats(kvCount, size)
for i := 0; i < len(files); i++ {
progressInc()
}
return nextKvEntries, nil
}
Expand Down Expand Up @@ -1777,9 +1816,9 @@ const PITRIdMapBlockSize int = 524288
// saveIDMap saves the id mapping information.
func (rc *LogClient) saveIDMap(
ctx context.Context,
sr *stream.SchemasReplace,
manager *stream.TableMappingManager,
) error {
backupmeta := &backuppb.BackupMeta{DbMaps: sr.TidySchemaMaps()}
backupmeta := &backuppb.BackupMeta{DbMaps: manager.ToProto()}
data, err := proto.Marshal(backupmeta)
if err != nil {
return errors.Trace(err)
Expand Down
25 changes: 11 additions & 14 deletions br/pkg/restore/log_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,10 @@ func MockEmptySchemasReplace() *stream.SchemasReplace {
dbMap := make(map[stream.UpstreamID]*stream.DBReplace)
return stream.NewSchemasReplace(
dbMap,
true,
nil,
1,
filter.All(),
nil,
nil,
nil,
)
}

Expand Down Expand Up @@ -1387,16 +1384,16 @@ func TestInitSchemasReplaceForDDL(t *testing.T) {

{
client := logclient.TEST_NewLogClient(123, 1, 2, 1, domain.NewMockDomain(), fakeSession{})
cfg := &logclient.InitSchemaConfig{IsNewTask: false}
_, err := client.InitSchemasReplaceForDDL(ctx, cfg, nil)
cfg := &logclient.BuildTableMappingManagerConfig{CurrentIdMapSaved: false}
_, err := client.BuildTableMappingManager(ctx, cfg)
require.Error(t, err)
require.Regexp(t, "failed to get pitr id map from mysql.tidb_pitr_id_map.* [2, 1]", err.Error())
}

{
client := logclient.TEST_NewLogClient(123, 1, 2, 1, domain.NewMockDomain(), fakeSession{})
cfg := &logclient.InitSchemaConfig{IsNewTask: true}
_, err := client.InitSchemasReplaceForDDL(ctx, cfg, nil)
cfg := &logclient.BuildTableMappingManagerConfig{CurrentIdMapSaved: true}
_, err := client.BuildTableMappingManager(ctx, cfg)
require.Error(t, err)
require.Regexp(t, "failed to get pitr id map from mysql.tidb_pitr_id_map.* [1, 1]", err.Error())
}
Expand All @@ -1409,8 +1406,8 @@ func TestInitSchemasReplaceForDDL(t *testing.T) {
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
client := logclient.TEST_NewLogClient(123, 1, 2, 1, domain.NewMockDomain(), se)
cfg := &logclient.InitSchemaConfig{IsNewTask: true}
_, err = client.InitSchemasReplaceForDDL(ctx, cfg, nil)
cfg := &logclient.BuildTableMappingManagerConfig{CurrentIdMapSaved: true}
_, err = client.BuildTableMappingManager(ctx, cfg)
require.Error(t, err)
require.Contains(t, err.Error(), "miss upstream table information at `start-ts`(1) but the full backup path is not specified")
}
Expand Down Expand Up @@ -1480,10 +1477,10 @@ func TestPITRIDMap(t *testing.T) {
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
client := logclient.TEST_NewLogClient(123, 1, 2, 3, nil, se)
baseSchemaReplaces := &stream.SchemasReplace{
DbMap: getDBMap(),
baseTableMappingManager := &stream.TableMappingManager{
DbReplaceMap: getDBMap(),
}
err = client.TEST_saveIDMap(ctx, baseSchemaReplaces)
err = client.TEST_saveIDMap(ctx, baseTableMappingManager)
require.NoError(t, err)
newSchemaReplaces, err := client.TEST_initSchemasMap(ctx, 1)
require.NoError(t, err)
Expand All @@ -1495,9 +1492,9 @@ func TestPITRIDMap(t *testing.T) {
newSchemaReplaces, err = client.TEST_initSchemasMap(ctx, 2)
require.NoError(t, err)

require.Equal(t, len(baseSchemaReplaces.DbMap), len(newSchemaReplaces))
require.Equal(t, len(baseTableMappingManager.DbReplaceMap), len(newSchemaReplaces))
for _, dbMap := range newSchemaReplaces {
baseDbMap := baseSchemaReplaces.DbMap[dbMap.IdMap.UpstreamId]
baseDbMap := baseTableMappingManager.DbReplaceMap[dbMap.IdMap.UpstreamId]
require.NotNil(t, baseDbMap)
require.Equal(t, baseDbMap.DbID, dbMap.IdMap.DownstreamId)
require.Equal(t, baseDbMap.Name, dbMap.Name)
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/log_client/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func (m *PhysicalWithMigrations) Physical() *backuppb.DataFileGroup {

func (rc *LogClient) TEST_saveIDMap(
ctx context.Context,
sr *stream.SchemasReplace,
m *stream.TableMappingManager,
) error {
return rc.saveIDMap(ctx, sr)
return rc.saveIDMap(ctx, m)
}

func (rc *LogClient) TEST_initSchemasMap(
Expand Down
Loading

0 comments on commit 27a9a7d

Please sign in to comment.