diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index bfd977afe1fa6..474578a05e8dc 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -354,6 +354,10 @@ func (rc *LogClient) SetCurrentTS(ts uint64) error { return nil } +func (rc *LogClient) CurrentTS() uint64 { + return rc.currentTS +} + // GetClusterID gets the cluster id from down-stream cluster. func (rc *LogClient) GetClusterID(ctx context.Context) uint64 { if rc.clusterID <= 0 { @@ -906,22 +910,22 @@ type FullBackupStorageConfig struct { Opts *storage.ExternalStorageOptions } -type InitSchemaConfig struct { +type BuildTableMappingManagerConfig struct { // required - IsNewTask bool - TableFilter filter.Filter + CurrentIdMapSaved bool + TableFilter filter.Filter // optional - TiFlashRecorder *tiflashrec.TiFlashRecorder FullBackupStorage *FullBackupStorageConfig + CipherInfo *backuppb.CipherInfo + Files []*backuppb.DataFileInfo } const UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL = "UNSAFE_PITR_LOG_RESTORE_START_BEFORE_ANY_UPSTREAM_USER_DDL" func (rc *LogClient) generateDBReplacesFromFullBackupStorage( ctx context.Context, - cfg *InitSchemaConfig, - cipherInfo *backuppb.CipherInfo, + cfg *BuildTableMappingManagerConfig, ) (map[stream.UpstreamID]*stream.DBReplace, error) { dbReplaces := make(map[stream.UpstreamID]*stream.DBReplace) if cfg.FullBackupStorage == nil { @@ -936,7 +940,7 @@ func (rc *LogClient) generateDBReplacesFromFullBackupStorage( if err != nil { return nil, errors.Trace(err) } - fullBackupTables, err := initFullBackupTables(ctx, s, cfg.TableFilter, cipherInfo) + fullBackupTables, err := initFullBackupTables(ctx, s, cfg.TableFilter, cfg.CipherInfo) if err != nil { return nil, errors.Trace(err) } @@ -974,25 +978,24 @@ func (rc *LogClient) generateDBReplacesFromFullBackupStorage( return dbReplaces, nil } -// InitSchemasReplaceForDDL gets schemas information Mapping from old schemas to new schemas. -// It is used to rewrite meta kv-event. -func (rc *LogClient) InitSchemasReplaceForDDL( +// BuildTableMappingManager builds the table mapping manager. It reads the full backup storage to get the full backup +// table info to initialize the manager, or it reads the id map from previous task, +// or it loads the saved mapping from last time of run of the same task. +func (rc *LogClient) BuildTableMappingManager( ctx context.Context, - cfg *InitSchemaConfig, - cipherInfo *backuppb.CipherInfo, -) (*stream.SchemasReplace, error) { + cfg *BuildTableMappingManagerConfig, +) (*stream.TableMappingManager, error) { var ( err error dbMaps []*backuppb.PitrDBMap // the id map doesn't need to construct only when it is not the first execution needConstructIdMap bool - - dbReplaces map[stream.UpstreamID]*stream.DBReplace + dbReplaces map[stream.UpstreamID]*stream.DBReplace ) - // not new task, load schemas map from external storage - if !cfg.IsNewTask { - log.Info("try to load pitr id maps") + // this is a retry, id map saved last time, load it from external storage + if cfg.CurrentIdMapSaved { + log.Info("try to load previously saved pitr id maps") needConstructIdMap = false dbMaps, err = rc.initSchemasMap(ctx, rc.restoreTS) if err != nil { @@ -1024,16 +1027,16 @@ func (rc *LogClient) InitSchemasReplaceForDDL( if len(dbMaps) <= 0 { log.Info("no id maps, build the table replaces from cluster and full backup schemas") needConstructIdMap = true - dbReplaces, err = rc.generateDBReplacesFromFullBackupStorage(ctx, cfg, cipherInfo) + dbReplaces, err = rc.generateDBReplacesFromFullBackupStorage(ctx, cfg) if err != nil { return nil, errors.Trace(err) } } else { - dbReplaces = stream.FromSchemaMaps(dbMaps) + dbReplaces = stream.FromDBMapProto(dbMaps) } for oldDBID, dbReplace := range dbReplaces { - log.Info("replace info", func() []zapcore.Field { + log.Info("base replace info", func() []zapcore.Field { fields := make([]zapcore.Field, 0, (len(dbReplace.TableMap)+1)*3) fields = append(fields, zap.String("dbName", dbReplace.Name), @@ -1049,10 +1052,16 @@ func (rc *LogClient) InitSchemasReplaceForDDL( }()...) } - rp := stream.NewSchemasReplace( - dbReplaces, needConstructIdMap, cfg.TiFlashRecorder, rc.currentTS, cfg.TableFilter, rc.GenGlobalID, rc.GenGlobalIDs, - rc.RecordDeleteRange) - return rp, nil + tableMappingManager := stream.NewTableMappingManager(dbReplaces, rc.GenGlobalID) + + // not loaded from previously saved, need to iter meta kv and build and save the map + if needConstructIdMap { + if err = rc.IterMetaKVToBuildAndSaveIdMap(ctx, tableMappingManager, cfg.Files); err != nil { + return nil, errors.Trace(err) + } + } + + return tableMappingManager, nil } func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo { @@ -1068,8 +1077,8 @@ func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo { return files } -// RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup. -func (rc *LogClient) RestoreMetaKVFiles( +// RestoreAndRewriteMetaKVFiles tries to restore files about meta kv-event from stream-backup. +func (rc *LogClient) RestoreAndRewriteMetaKVFiles( ctx context.Context, files []*backuppb.DataFileInfo, schemasReplace *stream.SchemasReplace, @@ -1100,30 +1109,11 @@ func (rc *LogClient) RestoreMetaKVFiles( filesInDefaultCF = SortMetaKVFiles(filesInDefaultCF) filesInWriteCF = SortMetaKVFiles(filesInWriteCF) - failpoint.Inject("failed-before-id-maps-saved", func(_ failpoint.Value) { - failpoint.Return(errors.New("failpoint: failed before id maps saved")) - }) - log.Info("start to restore meta files", zap.Int("total files", len(files)), zap.Int("default files", len(filesInDefaultCF)), zap.Int("write files", len(filesInWriteCF))) - if schemasReplace.NeedConstructIdMap() { - // Preconstruct the map and save it into external storage. - if err := rc.PreConstructAndSaveIDMap( - ctx, - filesInWriteCF, - filesInDefaultCF, - schemasReplace, - ); err != nil { - return errors.Trace(err) - } - } - failpoint.Inject("failed-after-id-maps-saved", func(_ failpoint.Value) { - failpoint.Return(errors.New("failpoint: failed after id maps saved")) - }) - // run the rewrite and restore meta-kv into TiKV cluster. if err := RestoreMetaKVFilesWithBatchMethod( ctx, @@ -1144,31 +1134,84 @@ func (rc *LogClient) RestoreMetaKVFiles( return nil } -// PreConstructAndSaveIDMap constructs id mapping and save it. -func (rc *LogClient) PreConstructAndSaveIDMap( +// IterMetaKVToBuildAndSaveIdMap iterates meta kv and builds id mapping and saves it to storage. +func (rc *LogClient) IterMetaKVToBuildAndSaveIdMap( ctx context.Context, - fsInWriteCF, fsInDefaultCF []*backuppb.DataFileInfo, - sr *stream.SchemasReplace, + tableMappingManager *stream.TableMappingManager, + files []*backuppb.DataFileInfo, ) error { - sr.SetPreConstructMapStatus() + filesInDefaultCF := make([]*backuppb.DataFileInfo, 0, len(files)) + // need to look at write cf for "short value", which inlines the actual values without redirecting to default cf + filesInWriteCF := make([]*backuppb.DataFileInfo, 0, len(files)) + + for _, f := range files { + if f.Type == backuppb.FileType_Delete { + // it should not happen + // only do some preventive checks here. + log.Warn("internal error: detected delete file of meta key, skip it", zap.Any("file", f)) + continue + } + if f.Cf == stream.WriteCF { + filesInWriteCF = append(filesInWriteCF, f) + continue + } + if f.Cf == stream.DefaultCF { + filesInDefaultCF = append(filesInDefaultCF, f) + } + } + + filesInDefaultCF = SortMetaKVFiles(filesInDefaultCF) + filesInWriteCF = SortMetaKVFiles(filesInWriteCF) + + failpoint.Inject("failed-before-id-maps-saved", func(_ failpoint.Value) { + failpoint.Return(errors.New("failpoint: failed before id maps saved")) + }) + + log.Info("start to iterate meta kv and build id map", + zap.Int("total files", len(files)), + zap.Int("default files", len(filesInDefaultCF)), + zap.Int("write files", len(filesInWriteCF))) + + // build the map and save it into external storage. + if err := rc.buildAndSaveIDMap( + ctx, + filesInDefaultCF, + filesInWriteCF, + tableMappingManager, + ); err != nil { + return errors.Trace(err) + } + failpoint.Inject("failed-after-id-maps-saved", func(_ failpoint.Value) { + failpoint.Return(errors.New("failpoint: failed after id maps saved")) + }) + return nil +} - if err := rc.constructIDMap(ctx, fsInWriteCF, sr); err != nil { +// buildAndSaveIDMap build id mapping and save it. +func (rc *LogClient) buildAndSaveIDMap( + ctx context.Context, + fsInDefaultCF []*backuppb.DataFileInfo, + fsInWriteCF []*backuppb.DataFileInfo, + tableMappingManager *stream.TableMappingManager, +) error { + if err := rc.iterAndBuildIDMap(ctx, fsInWriteCF, tableMappingManager); err != nil { return errors.Trace(err) } - if err := rc.constructIDMap(ctx, fsInDefaultCF, sr); err != nil { + + if err := rc.iterAndBuildIDMap(ctx, fsInDefaultCF, tableMappingManager); err != nil { return errors.Trace(err) } - if err := rc.saveIDMap(ctx, sr); err != nil { + if err := rc.saveIDMap(ctx, tableMappingManager); err != nil { return errors.Trace(err) } return nil } -func (rc *LogClient) constructIDMap( +func (rc *LogClient) iterAndBuildIDMap( ctx context.Context, fs []*backuppb.DataFileInfo, - sr *stream.SchemasReplace, + tableMappingManager *stream.TableMappingManager, ) error { for _, f := range fs { entries, _, err := rc.ReadAllEntries(ctx, f, math.MaxUint64) @@ -1177,7 +1220,7 @@ func (rc *LogClient) constructIDMap( } for _, entry := range entries { - if _, err := sr.RewriteKvEntry(&entry.E, f.GetCf()); err != nil { + if err := tableMappingManager.ParseMetaKvAndUpdateIdMapping(&entry.E, f.GetCf()); err != nil { return errors.Trace(err) } } @@ -1218,8 +1261,6 @@ func RestoreMetaKVFilesWithBatchMethod( defaultKvEntries = make([]*KvEntryWithTS, 0) writeKvEntries = make([]*KvEntryWithTS, 0) ) - // Set restoreKV to SchemaReplace. - schemasReplace.SetRestoreKVStatus() for i, f := range defaultFiles { if i == 0 { @@ -1322,11 +1363,9 @@ func (rc *LogClient) RestoreBatchMetaKVFiles( return nextKvEntries, errors.Trace(err) } - if schemasReplace.IsRestoreKVStatus() { - updateStats(kvCount, size) - for i := 0; i < len(files); i++ { - progressInc() - } + updateStats(kvCount, size) + for i := 0; i < len(files); i++ { + progressInc() } return nextKvEntries, nil } @@ -1777,9 +1816,9 @@ const PITRIdMapBlockSize int = 524288 // saveIDMap saves the id mapping information. func (rc *LogClient) saveIDMap( ctx context.Context, - sr *stream.SchemasReplace, + manager *stream.TableMappingManager, ) error { - backupmeta := &backuppb.BackupMeta{DbMaps: sr.TidySchemaMaps()} + backupmeta := &backuppb.BackupMeta{DbMaps: manager.ToProto()} data, err := proto.Marshal(backupmeta) if err != nil { return errors.Trace(err) diff --git a/br/pkg/restore/log_client/client_test.go b/br/pkg/restore/log_client/client_test.go index 504d7bb798d72..1b16b25ecfa46 100644 --- a/br/pkg/restore/log_client/client_test.go +++ b/br/pkg/restore/log_client/client_test.go @@ -140,13 +140,10 @@ func MockEmptySchemasReplace() *stream.SchemasReplace { dbMap := make(map[stream.UpstreamID]*stream.DBReplace) return stream.NewSchemasReplace( dbMap, - true, nil, 1, filter.All(), nil, - nil, - nil, ) } @@ -1387,16 +1384,16 @@ func TestInitSchemasReplaceForDDL(t *testing.T) { { client := logclient.TEST_NewLogClient(123, 1, 2, 1, domain.NewMockDomain(), fakeSession{}) - cfg := &logclient.InitSchemaConfig{IsNewTask: false} - _, err := client.InitSchemasReplaceForDDL(ctx, cfg, nil) + cfg := &logclient.BuildTableMappingManagerConfig{CurrentIdMapSaved: false} + _, err := client.BuildTableMappingManager(ctx, cfg) require.Error(t, err) require.Regexp(t, "failed to get pitr id map from mysql.tidb_pitr_id_map.* [2, 1]", err.Error()) } { client := logclient.TEST_NewLogClient(123, 1, 2, 1, domain.NewMockDomain(), fakeSession{}) - cfg := &logclient.InitSchemaConfig{IsNewTask: true} - _, err := client.InitSchemasReplaceForDDL(ctx, cfg, nil) + cfg := &logclient.BuildTableMappingManagerConfig{CurrentIdMapSaved: true} + _, err := client.BuildTableMappingManager(ctx, cfg) require.Error(t, err) require.Regexp(t, "failed to get pitr id map from mysql.tidb_pitr_id_map.* [1, 1]", err.Error()) } @@ -1409,8 +1406,8 @@ func TestInitSchemasReplaceForDDL(t *testing.T) { se, err := g.CreateSession(s.Mock.Storage) require.NoError(t, err) client := logclient.TEST_NewLogClient(123, 1, 2, 1, domain.NewMockDomain(), se) - cfg := &logclient.InitSchemaConfig{IsNewTask: true} - _, err = client.InitSchemasReplaceForDDL(ctx, cfg, nil) + cfg := &logclient.BuildTableMappingManagerConfig{CurrentIdMapSaved: true} + _, err = client.BuildTableMappingManager(ctx, cfg) require.Error(t, err) require.Contains(t, err.Error(), "miss upstream table information at `start-ts`(1) but the full backup path is not specified") } @@ -1480,10 +1477,10 @@ func TestPITRIDMap(t *testing.T) { se, err := g.CreateSession(s.Mock.Storage) require.NoError(t, err) client := logclient.TEST_NewLogClient(123, 1, 2, 3, nil, se) - baseSchemaReplaces := &stream.SchemasReplace{ - DbMap: getDBMap(), + baseTableMappingManager := &stream.TableMappingManager{ + DbReplaceMap: getDBMap(), } - err = client.TEST_saveIDMap(ctx, baseSchemaReplaces) + err = client.TEST_saveIDMap(ctx, baseTableMappingManager) require.NoError(t, err) newSchemaReplaces, err := client.TEST_initSchemasMap(ctx, 1) require.NoError(t, err) @@ -1495,9 +1492,9 @@ func TestPITRIDMap(t *testing.T) { newSchemaReplaces, err = client.TEST_initSchemasMap(ctx, 2) require.NoError(t, err) - require.Equal(t, len(baseSchemaReplaces.DbMap), len(newSchemaReplaces)) + require.Equal(t, len(baseTableMappingManager.DbReplaceMap), len(newSchemaReplaces)) for _, dbMap := range newSchemaReplaces { - baseDbMap := baseSchemaReplaces.DbMap[dbMap.IdMap.UpstreamId] + baseDbMap := baseTableMappingManager.DbReplaceMap[dbMap.IdMap.UpstreamId] require.NotNil(t, baseDbMap) require.Equal(t, baseDbMap.DbID, dbMap.IdMap.DownstreamId) require.Equal(t, baseDbMap.Name, dbMap.Name) diff --git a/br/pkg/restore/log_client/export_test.go b/br/pkg/restore/log_client/export_test.go index 9c95409c9d754..f78a54bf50c8a 100644 --- a/br/pkg/restore/log_client/export_test.go +++ b/br/pkg/restore/log_client/export_test.go @@ -63,9 +63,9 @@ func (m *PhysicalWithMigrations) Physical() *backuppb.DataFileGroup { func (rc *LogClient) TEST_saveIDMap( ctx context.Context, - sr *stream.SchemasReplace, + m *stream.TableMappingManager, ) error { - return rc.saveIDMap(ctx, sr) + return rc.saveIDMap(ctx, m) } func (rc *LogClient) TEST_initSchemasMap( diff --git a/br/pkg/stream/BUILD.bazel b/br/pkg/stream/BUILD.bazel index 225d50cb5a9a9..252f789c78b75 100644 --- a/br/pkg/stream/BUILD.bazel +++ b/br/pkg/stream/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "stream_metas.go", "stream_mgr.go", "stream_status.go", + "table_mapping.go", "util.go", ], importpath = "github.com/pingcap/tidb/br/pkg/stream", @@ -61,11 +62,12 @@ go_test( "search_test.go", "stream_metas_test.go", "stream_misc_test.go", + "table_mapping_test.go", "util_test.go", ], embed = [":stream"], flaky = True, - shard_count = 48, + shard_count = 47, deps = [ "//br/pkg/storage", "//br/pkg/streamhelper", diff --git a/br/pkg/stream/meta_kv.go b/br/pkg/stream/meta_kv.go index 590c16b40aff3..1d150a87baf07 100644 --- a/br/pkg/stream/meta_kv.go +++ b/br/pkg/stream/meta_kv.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/errors" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" ) @@ -53,6 +54,14 @@ func ParseTxnMetaKeyFrom(txnKey kv.Key) (*RawMetaKey, error) { }, nil } +func ParseDBIDFromTableKey(key []byte) (int64, error) { + rawMetaKey, err := ParseTxnMetaKeyFrom(key) + if err != nil { + return 0, errors.Trace(err) + } + return meta.ParseDBKey(rawMetaKey.Key) +} + // UpdateKey updates `key` field in `RawMetaKey` struct. func (k *RawMetaKey) UpdateKey(key []byte) { k.Key = key diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index e0b7ac6252958..94e05221b2424 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -20,7 +20,6 @@ import ( "fmt" "github.com/pingcap/errors" - backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/restore/ingestrec" @@ -39,13 +38,6 @@ const ( WriteCF = "write" ) -type RewriteStatus int - -const ( - RewriteStatusPreConstructMap = iota // represents construct map status. - RewriteStatusRestoreKV // represents restore meta kv status. -) - type UpstreamID = int64 type DownstreamID = int64 @@ -66,10 +58,7 @@ type DBReplace struct { // SchemasReplace specifies schemas information mapping from up-stream cluster to up-stream cluster. type SchemasReplace struct { - status RewriteStatus - DbMap map[UpstreamID]*DBReplace - globalTableIdMap map[UpstreamID]DownstreamID - needConstructIdMap bool + DbMap map[UpstreamID]*DBReplace delRangeRecorder *brDelRangeExecWrapper ingestRecorder *ingestrec.IngestRecorder @@ -77,9 +66,6 @@ type SchemasReplace struct { RewriteTS uint64 // used to rewrite commit ts in meta kv. TableFilter filter.Filter // used to filter schema/table - genGenGlobalID func(ctx context.Context) (int64, error) - genGenGlobalIDs func(ctx context.Context, n int) ([]int64, error) - AfterTableRewritten func(deleted bool, tableInfo *model.TableInfo) } @@ -105,12 +91,9 @@ func NewDBReplace(name string, newID DownstreamID) *DBReplace { // NewSchemasReplace creates a SchemasReplace struct. func NewSchemasReplace( dbMap map[UpstreamID]*DBReplace, - needConstructIdMap bool, tiflashRecorder *tiflashrec.TiFlashRecorder, restoreTS uint64, tableFilter filter.Filter, - genID func(ctx context.Context) (int64, error), - genIDs func(ctx context.Context, n int) ([]int64, error), recordDeleteRange func(*PreDelRangeQuery), ) *SchemasReplace { globalTableIdMap := make(map[UpstreamID]DownstreamID) @@ -124,99 +107,13 @@ func NewSchemasReplace( } return &SchemasReplace{ - DbMap: dbMap, - globalTableIdMap: globalTableIdMap, - needConstructIdMap: needConstructIdMap, - delRangeRecorder: newDelRangeExecWrapper(globalTableIdMap, recordDeleteRange), - ingestRecorder: ingestrec.New(), - TiflashRecorder: tiflashRecorder, - RewriteTS: restoreTS, - TableFilter: tableFilter, - genGenGlobalID: genID, - genGenGlobalIDs: genIDs, - } -} - -func (sr *SchemasReplace) NeedConstructIdMap() bool { - return sr.needConstructIdMap -} - -// TidySchemaMaps produces schemas id maps from up-stream to down-stream. -func (sr *SchemasReplace) TidySchemaMaps() []*backuppb.PitrDBMap { - dbMaps := make([]*backuppb.PitrDBMap, 0, len(sr.DbMap)) - - for dbID, dr := range sr.DbMap { - dbm := backuppb.PitrDBMap{ - Name: dr.Name, - IdMap: &backuppb.IDMap{ - UpstreamId: dbID, - DownstreamId: dr.DbID, - }, - Tables: make([]*backuppb.PitrTableMap, 0, len(dr.TableMap)), - } - - for tblID, tr := range dr.TableMap { - tm := backuppb.PitrTableMap{ - Name: tr.Name, - IdMap: &backuppb.IDMap{ - UpstreamId: tblID, - DownstreamId: tr.TableID, - }, - Partitions: make([]*backuppb.IDMap, 0, len(tr.PartitionMap)), - } - - for upID, downID := range tr.PartitionMap { - pm := backuppb.IDMap{ - UpstreamId: upID, - DownstreamId: downID, - } - tm.Partitions = append(tm.Partitions, &pm) - } - dbm.Tables = append(dbm.Tables, &tm) - } - dbMaps = append(dbMaps, &dbm) + DbMap: dbMap, + delRangeRecorder: newDelRangeExecWrapper(globalTableIdMap, recordDeleteRange), + ingestRecorder: ingestrec.New(), + TiflashRecorder: tiflashRecorder, + RewriteTS: restoreTS, + TableFilter: tableFilter, } - - return dbMaps -} - -func FromSchemaMaps(dbMaps []*backuppb.PitrDBMap) map[UpstreamID]*DBReplace { - dbReplaces := make(map[UpstreamID]*DBReplace) - - for _, db := range dbMaps { - dr := NewDBReplace(db.Name, db.IdMap.DownstreamId) - dbReplaces[db.IdMap.UpstreamId] = dr - - for _, tbl := range db.Tables { - tr := NewTableReplace(tbl.Name, tbl.IdMap.DownstreamId) - dr.TableMap[tbl.IdMap.UpstreamId] = tr - for _, p := range tbl.Partitions { - tr.PartitionMap[p.UpstreamId] = p.DownstreamId - } - } - } - - return dbReplaces -} - -// IsPreConsturctMapStatus checks the status is PreConsturctMap. -func (sr *SchemasReplace) IsPreConsturctMapStatus() bool { - return sr.status == RewriteStatusPreConstructMap -} - -// IsRestoreKVStatus checks the status is RestoreKV. -func (sr *SchemasReplace) IsRestoreKVStatus() bool { - return sr.status == RewriteStatusRestoreKV -} - -// SetPreConstructMapStatus sets the PreConstructMap status. -func (sr *SchemasReplace) SetPreConstructMapStatus() { - sr.status = RewriteStatusPreConstructMap -} - -// SetRestoreKVStatus sets the RestoreKV status. -func (sr *SchemasReplace) SetRestoreKVStatus() { - sr.status = RewriteStatusRestoreKV } func (sr *SchemasReplace) rewriteKeyForDB(key []byte, cf string) ([]byte, error) { @@ -230,21 +127,10 @@ func (sr *SchemasReplace) rewriteKeyForDB(key []byte, cf string) ([]byte, error) return nil, errors.Trace(err) } - if sr.IsPreConsturctMapStatus() { - if _, exist := sr.DbMap[dbID]; !exist { - newID, err := sr.genGenGlobalID(context.Background()) - if err != nil { - return nil, errors.Trace(err) - } - sr.DbMap[dbID] = NewDBReplace("", newID) - sr.globalTableIdMap[dbID] = newID - } - return nil, nil - } - dbMap, exist := sr.DbMap[dbID] if !exist { - return nil, errors.Annotatef(berrors.ErrInvalidArgument, "failed to find id:%v in maps", dbID) + // db filtered out + return nil, nil } rawMetaKey.UpdateField(meta.DBkey(dbMap.DbID)) @@ -260,22 +146,10 @@ func (sr *SchemasReplace) rewriteDBInfo(value []byte) ([]byte, error) { return nil, errors.Trace(err) } - if sr.IsPreConsturctMapStatus() { - if dr, exist := sr.DbMap[dbInfo.ID]; !exist { - newID, err := sr.genGenGlobalID(context.Background()) - if err != nil { - return nil, errors.Trace(err) - } - sr.DbMap[dbInfo.ID] = NewDBReplace(dbInfo.Name.O, newID) - } else { - dr.Name = dbInfo.Name.O - } - return nil, nil - } - dbMap, exist := sr.DbMap[dbInfo.ID] if !exist { - return nil, errors.Annotatef(berrors.ErrInvalidArgument, "failed to find id:%v in maps", dbInfo.ID) + // db filtered out + return nil, nil } dbInfo.ID = dbMap.DbID @@ -307,14 +181,6 @@ func (sr *SchemasReplace) rewriteEntryForDB(e *kv.Entry, cf string) (*kv.Entry, return &kv.Entry{Key: newKey, Value: newValue}, nil } -func (sr *SchemasReplace) getDBIDFromTableKey(key []byte) (int64, error) { - rawMetaKey, err := ParseTxnMetaKeyFrom(key) - if err != nil { - return 0, errors.Trace(err) - } - return meta.ParseDBKey(rawMetaKey.Key) -} - func (sr *SchemasReplace) rewriteKeyForTable( key []byte, cf string, @@ -342,37 +208,13 @@ func (sr *SchemasReplace) rewriteKeyForTable( dbReplace, exist := sr.DbMap[dbID] if !exist { - if !sr.IsPreConsturctMapStatus() { - return nil, errors.Annotatef(berrors.ErrInvalidArgument, "failed to find id:%v in maps", dbID) - } - newID, err := sr.genGenGlobalID(context.Background()) - if err != nil { - return nil, errors.Trace(err) - } - dbReplace = NewDBReplace("", newID) - sr.DbMap[dbID] = dbReplace + // db filtered out + return nil, nil } tableReplace, exist := dbReplace.TableMap[tableID] if !exist { - newID, exist := sr.globalTableIdMap[tableID] - if !exist { - if sr.IsRestoreKVStatus() { - return nil, errors.Annotatef(berrors.ErrInvalidArgument, "failed to find id:%v in maps", tableID) - } - - newID, err = sr.genGenGlobalID(context.Background()) - if err != nil { - return nil, errors.Trace(err) - } - sr.globalTableIdMap[tableID] = newID - } - - tableReplace = NewTableReplace("", newID) - dbReplace.TableMap[tableID] = tableReplace - } - - if sr.IsPreConsturctMapStatus() { + // table filtered out return nil, nil } @@ -399,37 +241,14 @@ func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, er // construct or find the id map. dbReplace, exist = sr.DbMap[dbID] if !exist { - if sr.IsRestoreKVStatus() { - return nil, errors.Annotatef(berrors.ErrInvalidArgument, "failed to find id:%v in maps", dbID) - } - - newID, err := sr.genGenGlobalID(context.Background()) - if err != nil { - return nil, errors.Trace(err) - } - dbReplace = NewDBReplace("", newID) - sr.DbMap[dbID] = dbReplace + // db filtered out + return nil, nil } tableReplace, exist = dbReplace.TableMap[tableInfo.ID] if !exist { - newID, exist := sr.globalTableIdMap[tableInfo.ID] - if !exist { - if sr.IsRestoreKVStatus() { - return nil, errors.Annotatef(berrors.ErrInvalidArgument, "failed to find id:%v in maps", tableInfo.ID) - } - - newID, err = sr.genGenGlobalID(context.Background()) - if err != nil { - return nil, errors.Trace(err) - } - sr.globalTableIdMap[tableInfo.ID] = newID - } - - tableReplace = NewTableReplace(tableInfo.Name.O, newID) - dbReplace.TableMap[tableInfo.ID] = tableReplace - } else { - tableReplace.Name = tableInfo.Name.O + // table filtered out + return nil, nil } // update table ID and partition ID. @@ -439,28 +258,13 @@ func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, er for i, tbl := range partitions.Definitions { newID, exist := tableReplace.PartitionMap[tbl.ID] if !exist { - newID, exist = sr.globalTableIdMap[tbl.ID] - if !exist { - if sr.IsRestoreKVStatus() { - return nil, errors.Annotatef(berrors.ErrInvalidArgument, "failed to find id:%v in maps", tbl.ID) - } - - newID, err = sr.genGenGlobalID(context.Background()) - if err != nil { - return nil, errors.Trace(err) - } - sr.globalTableIdMap[tbl.ID] = newID - } - tableReplace.PartitionMap[tbl.ID] = newID + log.Error("expect partition info in table replace but got none", zap.Int64("partitionID", tbl.ID)) + return nil, errors.Annotatef(berrors.ErrInvalidArgument, "failed to find partition id:%v in replace maps", tbl.ID) } partitions.Definitions[i].ID = newID } } - if sr.IsPreConsturctMapStatus() { - return nil, nil - } - // Force to disable TTL_ENABLE when restore if tableInfo.TTLInfo != nil { tableInfo.TTLInfo.Enable = false @@ -478,7 +282,7 @@ func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, er } func (sr *SchemasReplace) rewriteEntryForTable(e *kv.Entry, cf string) (*kv.Entry, error) { - dbID, err := sr.getDBIDFromTableKey(e.Key) + dbID, err := ParseDBIDFromTableKey(e.Key) if err != nil { return nil, errors.Trace(err) } @@ -503,9 +307,6 @@ func (sr *SchemasReplace) rewriteEntryForTable(e *kv.Entry, cf string) (*kv.Entr return nil, errors.Trace(err) } - if sr.IsPreConsturctMapStatus() { - return nil, nil - } // NOTE: the normal path is in the `SchemaReplace.rewriteTableInfo` // for now, we rewrite key and value separately hence we cannot // get a view of (is_delete, table_id, table_info) at the same time :(. @@ -637,7 +438,7 @@ func (sr *SchemasReplace) GetIngestRecorder() *ingestrec.IngestRecorder { func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, error) { // skip mDDLJob if !IsMetaDBKey(e.Key) { - if sr.IsRestoreKVStatus() && cf == DefaultCF && IsMetaDDLJobHistoryKey(e.Key) { // mDDLJobHistory + if cf == DefaultCF && IsMetaDDLJobHistoryKey(e.Key) { // mDDLJobHistory job := &model.Job{} if err := job.Decode(e.Value); err != nil { log.Debug("failed to decode the job", diff --git a/br/pkg/stream/rewrite_meta_rawkv_test.go b/br/pkg/stream/rewrite_meta_rawkv_test.go index fa8e1cf84bb2b..81d21e5e5b5a7 100644 --- a/br/pkg/stream/rewrite_meta_rawkv_test.go +++ b/br/pkg/stream/rewrite_meta_rawkv_test.go @@ -3,7 +3,6 @@ package stream import ( - "context" "encoding/hex" "encoding/json" "testing" @@ -20,13 +19,6 @@ import ( "github.com/stretchr/testify/require" ) -var increaseID int64 = 100 - -func mockGenGenGlobalID(ctx context.Context) (int64, error) { - increaseID++ - return increaseID, nil -} - func MockEmptySchemasReplace(midr *mockInsertDeleteRange, dbMap map[UpstreamID]*DBReplace) *SchemasReplace { if dbMap == nil { dbMap = make(map[UpstreamID]*DBReplace) @@ -36,12 +28,9 @@ func MockEmptySchemasReplace(midr *mockInsertDeleteRange, dbMap map[UpstreamID]* } return NewSchemasReplace( dbMap, - true, nil, 9527, filter.All(), - mockGenGenGlobalID, - nil, midr.mockRecordDeleteRange, ) } @@ -63,94 +52,32 @@ func produceTableInfoValue(tableName string, tableID int64) ([]byte, error) { return json.Marshal(&tableInfo) } -func TestTidySchemaMaps(t *testing.T) { - var ( - dbName, tblName string = "db1", "t1" - oldDBID UpstreamID = 100 - newDBID DownstreamID = 200 - oldTblID, oldPID1, oldPID2 UpstreamID = 101, 102, 103 - newTblID, newPID1, newPID2 DownstreamID = 201, 202, 203 - ) - - // create table Replace - tr := NewTableReplace(tblName, newTblID) - tr.PartitionMap[oldPID1] = newPID1 - tr.PartitionMap[oldPID2] = newPID2 - - dr := NewDBReplace(dbName, newDBID) - dr.TableMap[oldTblID] = tr - - drs := make(map[UpstreamID]*DBReplace) - drs[oldDBID] = dr - - // create schemas replace and test TidySchemaMaps(). - sr := NewSchemasReplace(drs, true, nil, 0, filter.All(), nil, nil, nil) - globalTableIdMap := sr.globalTableIdMap - require.Equal(t, len(globalTableIdMap), 3) - require.Equal(t, globalTableIdMap[oldTblID], newTblID) - require.Equal(t, globalTableIdMap[oldPID1], newPID1) - require.Equal(t, globalTableIdMap[oldPID2], newPID2) - - dbMap := sr.TidySchemaMaps() - require.Equal(t, len(dbMap), 1) - require.Equal(t, dbMap[0].Name, dbName) - require.Equal(t, dbMap[0].IdMap.UpstreamId, oldDBID) - require.Equal(t, dbMap[0].IdMap.DownstreamId, newDBID) - - tableMap := dbMap[0].Tables - require.Equal(t, len(tableMap), 1) - require.Equal(t, tableMap[0].Name, tblName) - require.Equal(t, tableMap[0].IdMap.UpstreamId, oldTblID) - require.Equal(t, tableMap[0].IdMap.DownstreamId, newTblID) - - partitionMap := tableMap[0].Partitions - require.Equal(t, len(partitionMap), 2) - - if partitionMap[0].UpstreamId == oldPID1 { - require.Equal(t, partitionMap[0].DownstreamId, newPID1) - require.Equal(t, partitionMap[1].UpstreamId, oldPID2) - require.Equal(t, partitionMap[1].DownstreamId, newPID2) - } else { - require.Equal(t, partitionMap[0].DownstreamId, newPID2) - require.Equal(t, partitionMap[1].UpstreamId, oldPID1) - require.Equal(t, partitionMap[1].DownstreamId, newPID1) - } - - // test FromSchemaMaps() - drs2 := FromSchemaMaps(dbMap) - require.Equal(t, drs2, drs) -} - func TestRewriteKeyForDB(t *testing.T) { var ( - dbID int64 = 1 - ts uint64 = 1234 - mDbs = []byte("DBs") + dbID int64 = 1 + dbName = "db" + ts uint64 = 1234 + mDbs = []byte("DBs") ) encodedKey := encodeTxnMetaKey(mDbs, meta.DBkey(dbID), ts) - // create schemasReplace. - sr := MockEmptySchemasReplace(nil, nil) + dbMap := make(map[UpstreamID]*DBReplace) + downstreamID := dbID + 100 + dbMap[dbID] = NewDBReplace(dbName, downstreamID) - // preConstruct Map information. - sr.SetPreConstructMapStatus() - newKey, err := sr.rewriteKeyForDB(encodedKey, WriteCF) - require.Nil(t, err) - require.Nil(t, newKey) - require.Equal(t, len(sr.DbMap[dbID].TableMap), 0) - downID := sr.DbMap[dbID].DbID + // create schemasReplace. + sr := MockEmptySchemasReplace(nil, dbMap) // set restoreKV status and rewrite it. - sr.SetRestoreKVStatus() - newKey, err = sr.rewriteKeyForDB(encodedKey, DefaultCF) + newKey, err := sr.rewriteKeyForDB(encodedKey, DefaultCF) require.Nil(t, err) decodedKey, err := ParseTxnMetaKeyFrom(newKey) require.Nil(t, err) require.Equal(t, decodedKey.Ts, ts) newDBID, err := meta.ParseDBKey(decodedKey.Field) require.Nil(t, err) - require.Equal(t, newDBID, downID) + require.Equal(t, newDBID, downstreamID) // rewrite it again, and get the same result. newKey, err = sr.rewriteKeyForDB(encodedKey, WriteCF) @@ -160,7 +87,7 @@ func TestRewriteKeyForDB(t *testing.T) { require.Equal(t, decodedKey.Ts, sr.RewriteTS) newDBID, err = meta.ParseDBKey(decodedKey.Field) require.Nil(t, err) - require.Equal(t, newDBID, downID) + require.Equal(t, newDBID, downstreamID) } func TestRewriteDBInfo(t *testing.T) { @@ -173,31 +100,20 @@ func TestRewriteDBInfo(t *testing.T) { value, err := produceDBInfoValue(dbName, dbID) require.Nil(t, err) - // create schemasReplace. - sr := MockEmptySchemasReplace(nil, nil) - - // rewrite it directly without preConstruct Map, it will get failed result. - sr.SetRestoreKVStatus() - _, err = sr.rewriteDBInfo(value) - require.Error(t, err) + dbMap := make(map[UpstreamID]*DBReplace) + dbMap[dbID] = NewDBReplace(dbName, dbID+100) - // ConstructMap status. - sr.SetPreConstructMapStatus() - newValue, err := sr.rewriteDBInfo(value) - require.Nil(t, err) - require.Nil(t, newValue) - dr := sr.DbMap[dbID] - require.Equal(t, dr.Name, dbName) + // create schemasReplace. + sr := MockEmptySchemasReplace(nil, dbMap) // set restoreKV status and rewrite it. - sr.SetRestoreKVStatus() - newValue, err = sr.rewriteDBInfo(value) + newValue, err := sr.rewriteDBInfo(value) require.Nil(t, err) err = json.Unmarshal(newValue, &DBInfo) require.Nil(t, err) require.Equal(t, DBInfo.ID, sr.DbMap[dbID].DbID) - // rewrite agagin, and get the same result. + // rewrite again, and get the same result. newId := sr.DbMap[dbID].DbID newValue, err = sr.rewriteDBInfo(value) require.Nil(t, err) @@ -209,9 +125,11 @@ func TestRewriteDBInfo(t *testing.T) { func TestRewriteKeyForTable(t *testing.T) { var ( - dbID int64 = 1 - tableID int64 = 57 - ts uint64 = 400036290571534337 + dbID int64 = 1 + dbName = "db" + tableID int64 = 57 + tableName = "table" + ts uint64 = 400036290571534337 ) cases := []struct { encodeTableFn func(int64) []byte @@ -241,22 +159,18 @@ func TestRewriteKeyForTable(t *testing.T) { for _, ca := range cases { encodedKey := encodeTxnMetaKey(meta.DBkey(dbID), ca.encodeTableFn(tableID), ts) - // create schemasReplace. - sr := MockEmptySchemasReplace(nil, nil) - // set preConstruct status and construct map information. - sr.SetPreConstructMapStatus() - newKey, err := sr.rewriteKeyForTable(encodedKey, WriteCF, ca.decodeTableFn, ca.encodeTableFn) - require.Nil(t, err) - require.Nil(t, newKey) - require.Equal(t, len(sr.DbMap), 1) - require.Equal(t, len(sr.DbMap[dbID].TableMap), 1) - downStreamDbID := sr.DbMap[dbID].DbID - downStreamTblID := sr.DbMap[dbID].TableMap[tableID].TableID + dbMap := make(map[UpstreamID]*DBReplace) + downStreamDbID := dbID + 100 + dbMap[dbID] = NewDBReplace(dbName, downStreamDbID) + downStreamTblID := tableID + 100 + dbMap[dbID].TableMap[tableID] = NewTableReplace(tableName, downStreamTblID) + + // create schemasReplace. + sr := MockEmptySchemasReplace(nil, dbMap) // set restoreKV status and rewrite it. - sr.SetRestoreKVStatus() - newKey, err = sr.rewriteKeyForTable(encodedKey, DefaultCF, ca.decodeTableFn, ca.encodeTableFn) + newKey, err := sr.rewriteKeyForTable(encodedKey, DefaultCF, ca.decodeTableFn, ca.encodeTableFn) require.Nil(t, err) decodedKey, err := ParseTxnMetaKeyFrom(newKey) require.Nil(t, err) @@ -288,6 +202,7 @@ func TestRewriteKeyForTable(t *testing.T) { func TestRewriteTableInfo(t *testing.T) { var ( dbId int64 = 40 + dbName = "db" tableID int64 = 100 tableName = "t1" tableInfo model.TableInfo @@ -296,8 +211,12 @@ func TestRewriteTableInfo(t *testing.T) { value, err := produceTableInfoValue(tableName, tableID) require.Nil(t, err) + dbMap := make(map[UpstreamID]*DBReplace) + dbMap[dbId] = NewDBReplace(dbName, dbId+100) + dbMap[dbId].TableMap[tableID] = NewTableReplace(tableName, tableID+100) + // create schemasReplace. - sr := MockEmptySchemasReplace(nil, nil) + sr := MockEmptySchemasReplace(nil, dbMap) tableCount := 0 sr.AfterTableRewritten = func(deleted bool, tableInfo *model.TableInfo) { tableCount++ @@ -306,20 +225,8 @@ func TestRewriteTableInfo(t *testing.T) { } } - // rewrite it directly without preConstruct Map, it will get failed result. - sr.SetRestoreKVStatus() - _, err = sr.rewriteTableInfo(value, dbId) - require.Error(t, err) - - // ConstructMap status. - sr.SetPreConstructMapStatus() - newValue, err := sr.rewriteTableInfo(value, dbId) - require.Nil(t, err) - require.Nil(t, newValue) - // set restoreKV status, rewrite it. - sr.SetRestoreKVStatus() - newValue, err = sr.rewriteTableInfo(value, dbId) + newValue, err := sr.rewriteTableInfo(value, dbId) require.Nil(t, err) err = json.Unmarshal(newValue, &tableInfo) require.Nil(t, err) @@ -340,6 +247,7 @@ func TestRewriteTableInfo(t *testing.T) { func TestRewriteTableInfoForPartitionTable(t *testing.T) { var ( dbId int64 = 40 + dbName = "db" tableID int64 = 100 pt1ID int64 = 101 pt2ID int64 = 102 @@ -374,16 +282,22 @@ func TestRewriteTableInfoForPartitionTable(t *testing.T) { value, err := json.Marshal(&tbl) require.Nil(t, err) - // create schemasReplace, and preConstructMap. - sr := MockEmptySchemasReplace(nil, nil) - sr.SetPreConstructMapStatus() - newValue, err := sr.rewriteTableInfo(value, dbId) - require.Nil(t, err) - require.Nil(t, newValue) + dbMap := make(map[UpstreamID]*DBReplace) + dbMap[dbId] = NewDBReplace(dbName, dbId+100) + dbMap[dbId].TableMap[tableID] = NewTableReplace(tableName, tableID+100) + dbMap[dbId].TableMap[tableID].PartitionMap[pt1ID] = pt1ID + 100 + dbMap[dbId].TableMap[tableID].PartitionMap[pt2ID] = pt2ID + 100 + + sr := NewSchemasReplace( + dbMap, + nil, + 0, + filter.All(), + nil, + ) // set restoreKV status, and rewrite it. - sr.SetRestoreKVStatus() - newValue, err = sr.rewriteTableInfo(value, dbId) + newValue, err := sr.rewriteTableInfo(value, dbId) require.Nil(t, err) err = json.Unmarshal(newValue, &tableInfo) require.Nil(t, err) @@ -488,26 +402,28 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) { dbMap[dbID2] = NewDBReplace(db2.Name.O, dbID2+100) dbMap[dbID2].TableMap[tableID2] = NewTableReplace(t2.Name.O, tableID2+100) + tc := NewTableMappingManager(dbMap, mockGenGenGlobalID) + + //exchange partition, t1 partition0 with the t2 + t1Copy := t1.Clone() + t2Copy := t2.Clone() + t1Copy.Partition.Definitions[0].ID = tableID2 + t2Copy.ID = pt1ID + value, err := json.Marshal(&t1Copy) + require.Nil(t, err) + + err = tc.parseTableValueAndUpdateIdMapping(dbID1, value) + require.Nil(t, err) + sr := NewSchemasReplace( - dbMap, - true, + tc.DbReplaceMap, nil, 0, filter.All(), - mockGenGenGlobalID, - nil, nil, ) - sr.SetRestoreKVStatus() - //exchange partition, t1 parition0 with the t2 - t1Copy := t1.Clone() - t2Copy := t2.Clone() - t1Copy.Partition.Definitions[0].ID = tableID2 - t2Copy.ID = pt1ID // rewrite partition table - value, err := json.Marshal(&t1Copy) - require.Nil(t, err) value, err = sr.rewriteTableInfo(value, dbID1) require.Nil(t, err) err = json.Unmarshal(value, &tableInfo) @@ -519,6 +435,8 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) { // rewrite no partition table value, err = json.Marshal(&t2Copy) require.Nil(t, err) + err = tc.parseTableValueAndUpdateIdMapping(dbID2, value) + require.Nil(t, err) value, err = sr.rewriteTableInfo(value, dbID2) require.Nil(t, err) err = json.Unmarshal(value, &tableInfo) @@ -529,6 +447,7 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) { func TestRewriteTableInfoForTTLTable(t *testing.T) { var ( dbId int64 = 40 + dbName = "db" tableID int64 = 100 colID int64 = 1000 colName = "t" @@ -556,18 +475,15 @@ func TestRewriteTableInfoForTTLTable(t *testing.T) { value, err := json.Marshal(&tbl) require.Nil(t, err) - // create empty schemasReplace - sr := MockEmptySchemasReplace(nil, nil) + dbMap := make(map[UpstreamID]*DBReplace) + dbMap[dbId] = NewDBReplace(dbName, dbId+100) + dbMap[dbId].TableMap[tableID] = NewTableReplace(tableName, tableID+100) - // preConsutruct Map information. - sr.SetPreConstructMapStatus() - newValue, err := sr.rewriteTableInfo(value, dbId) - require.Nil(t, err) - require.Nil(t, newValue) + // create empty schemasReplace + sr := MockEmptySchemasReplace(nil, dbMap) // set restoreKV status and rewrite it. - sr.SetRestoreKVStatus() - newValue, err = sr.rewriteTableInfo(value, dbId) + newValue, err := sr.rewriteTableInfo(value, dbId) require.Nil(t, err) err = json.Unmarshal(newValue, &tableInfo) @@ -581,18 +497,6 @@ func TestRewriteTableInfoForTTLTable(t *testing.T) { require.False(t, tableInfo.TTLInfo.Enable) } -func TestIsPreConsturctMapStatus(t *testing.T) { - // create empty schemasReplace - sr := MockEmptySchemasReplace(nil, nil) - sr.SetPreConstructMapStatus() - require.True(t, sr.IsPreConsturctMapStatus()) - require.False(t, sr.IsRestoreKVStatus()) - - sr.SetRestoreKVStatus() - require.False(t, sr.IsPreConsturctMapStatus()) - require.True(t, sr.IsRestoreKVStatus()) -} - // db:70->80 - // | - t0:71->81 - // | | - p0:72->82 diff --git a/br/pkg/stream/table_mapping.go b/br/pkg/stream/table_mapping.go new file mode 100644 index 0000000000000..ff44f1fdc7b35 --- /dev/null +++ b/br/pkg/stream/table_mapping.go @@ -0,0 +1,257 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stream + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/pingcap/errors" + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta" + "github.com/pingcap/tidb/pkg/meta/model" +) + +// TableMappingManager iterates on log backup meta kvs and generate new id for DB, table and partition for +// downstream cluster. It maintains the id mapping and passes down later to the rewrite logic. +type TableMappingManager struct { + DbReplaceMap map[UpstreamID]*DBReplace + globalIdMap map[UpstreamID]DownstreamID + genGlobalIdFn func(ctx context.Context) (int64, error) +} + +func NewTableMappingManager( + dbReplaceMap map[UpstreamID]*DBReplace, + genGlobalIdFn func(ctx context.Context) (int64, error)) *TableMappingManager { + if dbReplaceMap == nil { + dbReplaceMap = make(map[UpstreamID]*DBReplace) + } + + globalTableIdMap := make(map[UpstreamID]DownstreamID) + for _, dr := range dbReplaceMap { + for tblID, tr := range dr.TableMap { + globalTableIdMap[tblID] = tr.TableID + for oldpID, newpID := range tr.PartitionMap { + globalTableIdMap[oldpID] = newpID + } + } + } + + return &TableMappingManager{ + DbReplaceMap: dbReplaceMap, + globalIdMap: globalTableIdMap, + genGlobalIdFn: genGlobalIdFn, + } +} + +// ParseMetaKvAndUpdateIdMapping collect table information +func (tc *TableMappingManager) ParseMetaKvAndUpdateIdMapping(e *kv.Entry, cf string) error { + if !IsMetaDBKey(e.Key) { + return nil + } + + rawKey, err := ParseTxnMetaKeyFrom(e.Key) + if err != nil { + return errors.Trace(err) + } + + value, err := extractValue(e, cf) + if err != nil { + return errors.Trace(err) + } + // sanity check + if value == nil { + log.Warn("entry suggests having short value but is nil") + return nil + } + + if meta.IsDBkey(rawKey.Field) { + return tc.parseDBValueAndUpdateIdMapping(value) + } else if !meta.IsDBkey(rawKey.Key) { + return nil + } + + if meta.IsTableKey(rawKey.Field) { + dbID, err := ParseDBIDFromTableKey(e.Key) + if err != nil { + return errors.Trace(err) + } + return tc.parseTableValueAndUpdateIdMapping(dbID, value) + } + return nil +} + +func (tc *TableMappingManager) parseDBValueAndUpdateIdMapping(value []byte) error { + dbInfo := new(model.DBInfo) + if err := json.Unmarshal(value, dbInfo); err != nil { + return errors.Trace(err) + } + + if dr, exist := tc.DbReplaceMap[dbInfo.ID]; !exist { + newID, err := tc.genGlobalIdFn(context.Background()) + if err != nil { + return errors.Trace(err) + } + tc.DbReplaceMap[dbInfo.ID] = NewDBReplace(dbInfo.Name.O, newID) + tc.globalIdMap[dbInfo.ID] = newID + } else { + dr.Name = dbInfo.Name.O + } + return nil +} + +func (tc *TableMappingManager) parseTableValueAndUpdateIdMapping(dbID int64, value []byte) error { + var ( + tableInfo model.TableInfo + err error + exist bool + dbReplace *DBReplace + tableReplace *TableReplace + ) + + if err := json.Unmarshal(value, &tableInfo); err != nil { + return errors.Trace(err) + } + + // construct or find the id map. + dbReplace, exist = tc.DbReplaceMap[dbID] + if !exist { + newID, err := tc.genGlobalIdFn(context.Background()) + if err != nil { + return errors.Trace(err) + } + tc.globalIdMap[dbID] = newID + dbReplace = NewDBReplace("", newID) + tc.DbReplaceMap[dbID] = dbReplace + } + + tableReplace, exist = dbReplace.TableMap[tableInfo.ID] + if !exist { + newID, exist := tc.globalIdMap[tableInfo.ID] + if !exist { + newID, err = tc.genGlobalIdFn(context.Background()) + if err != nil { + return errors.Trace(err) + } + tc.globalIdMap[tableInfo.ID] = newID + } + + tableReplace = NewTableReplace(tableInfo.Name.O, newID) + dbReplace.TableMap[tableInfo.ID] = tableReplace + } else { + tableReplace.Name = tableInfo.Name.O + } + + // update table ID and partition ID. + tableInfo.ID = tableReplace.TableID + partitions := tableInfo.GetPartitionInfo() + if partitions != nil { + for i, partition := range partitions.Definitions { + newID, exist := tableReplace.PartitionMap[partition.ID] + if !exist { + newID, exist = tc.globalIdMap[partition.ID] + if !exist { + newID, err = tc.genGlobalIdFn(context.Background()) + if err != nil { + return errors.Trace(err) + } + tc.globalIdMap[partition.ID] = newID + } + tableReplace.PartitionMap[partition.ID] = newID + } + partitions.Definitions[i].ID = newID + } + } + return nil +} + +// ToProto produces schemas id maps from up-stream to down-stream. +func (tc *TableMappingManager) ToProto() []*backuppb.PitrDBMap { + dbMaps := make([]*backuppb.PitrDBMap, 0, len(tc.DbReplaceMap)) + + for dbID, dr := range tc.DbReplaceMap { + dbm := backuppb.PitrDBMap{ + Name: dr.Name, + IdMap: &backuppb.IDMap{ + UpstreamId: dbID, + DownstreamId: dr.DbID, + }, + Tables: make([]*backuppb.PitrTableMap, 0, len(dr.TableMap)), + } + + for tblID, tr := range dr.TableMap { + tm := backuppb.PitrTableMap{ + Name: tr.Name, + IdMap: &backuppb.IDMap{ + UpstreamId: tblID, + DownstreamId: tr.TableID, + }, + Partitions: make([]*backuppb.IDMap, 0, len(tr.PartitionMap)), + } + + for upID, downID := range tr.PartitionMap { + pm := backuppb.IDMap{ + UpstreamId: upID, + DownstreamId: downID, + } + tm.Partitions = append(tm.Partitions, &pm) + } + dbm.Tables = append(dbm.Tables, &tm) + } + dbMaps = append(dbMaps, &dbm) + } + + return dbMaps +} + +func FromDBMapProto(dbMaps []*backuppb.PitrDBMap) map[UpstreamID]*DBReplace { + dbReplaces := make(map[UpstreamID]*DBReplace) + + for _, db := range dbMaps { + dr := NewDBReplace(db.Name, db.IdMap.DownstreamId) + dbReplaces[db.IdMap.UpstreamId] = dr + + for _, tbl := range db.Tables { + tr := NewTableReplace(tbl.Name, tbl.IdMap.DownstreamId) + dr.TableMap[tbl.IdMap.UpstreamId] = tr + for _, p := range tbl.Partitions { + tr.PartitionMap[p.UpstreamId] = p.DownstreamId + } + } + } + + return dbReplaces +} + +func extractValue(e *kv.Entry, cf string) ([]byte, error) { + switch cf { + case DefaultCF: + return e.Value, nil + case WriteCF: + rawWriteCFValue := new(RawWriteCFValue) + if err := rawWriteCFValue.ParseFrom(e.Value); err != nil { + return nil, errors.Trace(err) + } + if rawWriteCFValue.HasShortValue() { + return rawWriteCFValue.shortValue, nil + } + return nil, nil + default: + panic(fmt.Sprintf("not support cf:%s", cf)) + } +} diff --git a/br/pkg/stream/table_mapping_test.go b/br/pkg/stream/table_mapping_test.go new file mode 100644 index 0000000000000..3f816c5399665 --- /dev/null +++ b/br/pkg/stream/table_mapping_test.go @@ -0,0 +1,82 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stream + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +var increaseID int64 = 100 + +func mockGenGenGlobalID(_ctx context.Context) (int64, error) { + increaseID++ + return increaseID, nil +} + +func TestToProto(t *testing.T) { + var ( + dbName, tblName string = "db1", "t1" + oldDBID UpstreamID = 100 + newDBID DownstreamID = 200 + oldTblID, oldPID1, oldPID2 UpstreamID = 101, 102, 103 + newTblID, newPID1, newPID2 DownstreamID = 201, 202, 203 + ) + + // create table Replace + tr := NewTableReplace(tblName, newTblID) + tr.PartitionMap[oldPID1] = newPID1 + tr.PartitionMap[oldPID2] = newPID2 + + dr := NewDBReplace(dbName, newDBID) + dr.TableMap[oldTblID] = tr + + drs := make(map[UpstreamID]*DBReplace) + drs[oldDBID] = dr + + // create schemas replace and test ToProto(). + tc := NewTableMappingManager(drs, mockGenGenGlobalID) + + dbMap := tc.ToProto() + require.Equal(t, len(dbMap), 1) + require.Equal(t, dbMap[0].Name, dbName) + require.Equal(t, dbMap[0].IdMap.UpstreamId, oldDBID) + require.Equal(t, dbMap[0].IdMap.DownstreamId, newDBID) + + tableMap := dbMap[0].Tables + require.Equal(t, len(tableMap), 1) + require.Equal(t, tableMap[0].Name, tblName) + require.Equal(t, tableMap[0].IdMap.UpstreamId, oldTblID) + require.Equal(t, tableMap[0].IdMap.DownstreamId, newTblID) + + partitionMap := tableMap[0].Partitions + require.Equal(t, len(partitionMap), 2) + + if partitionMap[0].UpstreamId == oldPID1 { + require.Equal(t, partitionMap[0].DownstreamId, newPID1) + require.Equal(t, partitionMap[1].UpstreamId, oldPID2) + require.Equal(t, partitionMap[1].DownstreamId, newPID2) + } else { + require.Equal(t, partitionMap[0].DownstreamId, newPID2) + require.Equal(t, partitionMap[1].UpstreamId, oldPID1) + require.Equal(t, partitionMap[1].DownstreamId, newPID1) + } + + // test FromDBMapProto() + drs2 := FromDBMapProto(dbMap) + require.Equal(t, drs2, drs) +} diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 07990cc382d34..5bf2c0eddc51a 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1419,21 +1419,31 @@ func restoreStream( return errors.Trace(err) } // load the id maps only when the checkpoint mode is used and not the first execution - newTask := true + currentIdMapSaved := false if taskInfo != nil && taskInfo.Progress == checkpoint.InLogRestoreAndIdMapPersist { - newTask = false + currentIdMapSaved = true } + + ddlFiles, err := client.LoadDDLFilesAndCountDMLFiles(ctx) + if err != nil { + return err + } + // get the schemas ID replace information. // since targeted full backup storage, need to use the full backup cipher - schemasReplace, err := client.InitSchemasReplaceForDDL(ctx, &logclient.InitSchemaConfig{ - IsNewTask: newTask, + tableMappingManager, err := client.BuildTableMappingManager(ctx, &logclient.BuildTableMappingManagerConfig{ + CurrentIdMapSaved: currentIdMapSaved, TableFilter: cfg.TableFilter, - TiFlashRecorder: cfg.tiflashRecorder, FullBackupStorage: fullBackupStorage, - }, &cfg.Config.CipherInfo) + CipherInfo: &cfg.Config.CipherInfo, + Files: ddlFiles, + }) if err != nil { return errors.Trace(err) } + + schemasReplace := stream.NewSchemasReplace(tableMappingManager.DbReplaceMap, cfg.tiflashRecorder, + client.CurrentTS(), cfg.TableFilter, client.RecordDeleteRange) schemasReplace.AfterTableRewritten = func(deleted bool, tableInfo *model.TableInfo) { // When the table replica changed to 0, the tiflash replica might be set to `nil`. // We should remove the table if we meet. @@ -1452,14 +1462,11 @@ func restoreStream( totalKVCount += kvCount totalSize += size } - ddlFiles, err := client.LoadDDLFilesAndCountDMLFiles(ctx) - if err != nil { - return err - } + pm := g.StartProgress(ctx, "Restore Meta Files", int64(len(ddlFiles)), !cfg.LogProgress) if err = withProgress(pm, func(p glue.Progress) error { client.RunGCRowsLoader(ctx) - return client.RestoreMetaKVFiles(ctx, ddlFiles, schemasReplace, updateStats, p.Inc) + return client.RestoreAndRewriteMetaKVFiles(ctx, ddlFiles, schemasReplace, updateStats, p.Inc) }); err != nil { return errors.Annotate(err, "failed to restore meta files") }