From 02a231800f064c975cee59796024b886e07dc15f Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Sun, 24 Nov 2024 19:04:12 -0500 Subject: [PATCH] some fixes Signed-off-by: Wenqi Mou --- br/pkg/restore/tiflashrec/tiflash_recorder.go | 8 +-- br/pkg/stream/rewrite_meta_rawkv.go | 5 +- br/pkg/task/restore.go | 16 +++-- br/pkg/task/stream.go | 33 +++++---- br/tests/br_pitr_table_filter/run.sh | 70 +++++++++++++++---- br/tests/run_group_br_tests.sh | 2 +- 6 files changed, 94 insertions(+), 40 deletions(-) diff --git a/br/pkg/restore/tiflashrec/tiflash_recorder.go b/br/pkg/restore/tiflashrec/tiflash_recorder.go index c87f0372f86a6..8cdcfa10182f5 100644 --- a/br/pkg/restore/tiflashrec/tiflash_recorder.go +++ b/br/pkg/restore/tiflashrec/tiflash_recorder.go @@ -130,15 +130,15 @@ func (r *TiFlashRecorder) GenerateResetAlterTableDDLs(info infoschema.InfoSchema func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []string { items := make([]string, 0, len(r.items)) - r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) { - table, ok := info.TableByID(context.Background(), id) + r.Iterate(func(tableId int64, replica model.TiFlashReplicaInfo) { + table, ok := info.TableByID(context.Background(), tableId) if !ok { - log.Warn("Table do not exist, skipping", zap.Int64("id", id)) + log.Warn("Table does not exist, might get filtered out if a custom filter is specified, skipping", zap.Int64("tableId", tableId)) return } schema, ok := infoschema.SchemaByTable(info, table.Meta()) if !ok { - log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name)) + log.Warn("Schema do not exist, skipping", zap.Int64("tableId", tableId), zap.Stringer("table", table.Meta().Name)) return } altTableSpec, err := alterTableSpecOf(replica, false) diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 8e2c452f3941d..eff2e4be1d2c8 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -745,7 +745,7 @@ func (sr *SchemasReplace) tryRecordIngestIndex(job *model.Job) error { } // processIngestIndexAndDeleteRangeFromJob handles two special cases during log backup meta key replay. -// 1. index ingest is not captured by the log backup, thus we need to restore them manually later +// 1. index ingestion is not captured by the log backup, thus we need to restore them manually later // 2. delete range also needs to be handled to clean up dropped table since it was previously relying on GC to clean it up func (sr *SchemasReplace) processIngestIndexAndDeleteRangeFromJob(job *model.Job) error { if ddl.JobNeedGC(job) { @@ -834,7 +834,8 @@ func (bdr *brDelRangeExecWrapper) PrepareParamsList(sz int) { func (bdr *brDelRangeExecWrapper) RewriteTableID(tableID int64) (int64, bool) { newTableID, exists := bdr.globalTableIdMap[tableID] if !exists { - log.Warn("failed to find the downstream id when rewrite delete range", zap.Int64("old tableID", tableID)) + log.Warn("failed to find the downstream id when rewrite delete range, "+ + "it might due to table has been filtered out if filters have been specified", zap.Int64("old tableID", tableID)) } return newTableID, exists } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 8cf278a00c339..58df02811206b 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -835,23 +835,29 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s if err != nil { return errors.Trace(err) } - log.Info("found items to restore after filtering", zap.Int("files", len(fileMap)), zap.Int("tables", len(tableMap)), zap.Int("db", len(dbMap))) + log.Info("found items to restore after filtering", + zap.Int("files", len(fileMap)), + zap.Int("tables", len(tableMap)), + zap.Int("db", len(dbMap))) + if cfg.logBackupTableHistory != nil { - // add additional tables and etc to restore in the snapshot restore phase since it will later be renamed during - // log restore and will fall into filter range. + // adjust tables to restore in the snapshot restore phase since it will later be renamed during + // log restore and will fall into or out of the filter range. err := adjustTablesToRestoreAndCreateFilter(cfg.logBackupTableHistory, cfg.RestoreConfig, client, fileMap, tableMap) if err != nil { return errors.Trace(err) } log.Info("adjusted items to restore", - zap.Int("files", len(fileMap)), zap.Int("tables", len(tableMap)), zap.Int("db", len(dbMap))) + zap.Int("files", len(fileMap)), + zap.Int("tables", len(tableMap)), + zap.Int("db", len(dbMap))) // need to update to include all eligible table id from snapshot restore UpdatePiTRFilter(cfg.RestoreConfig, tableMap) } - files, tables, dbs := convertMapsToSlices(fileMap, tableMap, dbMap) + // after figuring out what files to restore, check if disk has enough space if cfg.CheckRequirements { if err := checkDiskSpace(ctx, mgr, files, tables); err != nil { diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 9b7d00a68fb5f..6cb40c7120bdc 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1217,10 +1217,12 @@ func RunStreamRestore( cfg.tiflashRecorder = recorder // restore full snapshot. if taskInfo.NeedFullRestore { - // if table filter is specified, go through log backup DDL change and build table rename history to figure out - // the tables that need to be restored currently out of filter range but later renamed into the filter range. + // if table filter is specified by user, go through log backup DDL change and build table rename history to + // figure out the tables that need to be restored. + // currently we restore additional tables at full snapshot phase when it is renamed into the filter range + // later in log backup. + // we also ignore the tables that currently in filter range but later renamed out of the filter. var logBackupTableHistory *stream.LogBackupTableHistory - // TODO need to do more if cfg.ExplicitFilter { logClient, err := createLogClient(ctx, g, cfg, mgr) if err != nil { @@ -1235,12 +1237,12 @@ func RunStreamRestore( } logStorage := cfg.Config.Storage cfg.Config.Storage = cfg.FullBackupStorage - // TiFlash replica is restored to down-stream on 'pitr' currently. snapshotRestoreConfig := SnapshotRestoreConfig{ RestoreConfig: cfg, piTRTaskInfo: taskInfo, logBackupTableHistory: logBackupTableHistory, } + // TiFlash replica is restored to down-stream on 'pitr' currently. if err = runSnapshotRestore(ctx, mgr, g, FullRestoreCmd, &snapshotRestoreConfig); err != nil { return errors.Trace(err) } @@ -1249,7 +1251,7 @@ func RunStreamRestore( if err = WriteStringToConsole(g, fmt.Sprintf("%s is skipped due to checkpoint mode for restore\n", FullRestoreCmd)); err != nil { return errors.Trace(err) } - if taskInfo.CheckpointInfo != nil && taskInfo.CheckpointInfo.Metadata != nil && taskInfo.CheckpointInfo.Metadata.TiFlashItems != nil { + if taskInfo.hasTiFlashItemsInCheckpoint() { log.Info("load tiflash records of snapshot restore from checkpoint") cfg.tiflashRecorder.Load(taskInfo.CheckpointInfo.Metadata.TiFlashItems) } @@ -1477,18 +1479,17 @@ func restoreStream( return errors.Annotate(err, "failed to clean up") } - // to delete range that's dropped previously + // to delete range(table, schema) that's dropped during log backup if err = client.InsertGCRows(ctx); err != nil { return errors.Annotate(err, "failed to insert rows into gc_delete_range") } - // might need to skip some index due to filtered out + // index ingestion is not captured by regular log backup, so we need to manually ingest again if err = client.RepairIngestIndex(ctx, ingestRecorder, g); err != nil { return errors.Annotate(err, "failed to repair ingest index") } if cfg.tiflashRecorder != nil { - // might need to check filter too sqls := cfg.tiflashRecorder.GenerateAlterTableDDLs(mgr.GetDomain().InfoSchema()) log.Info("Generating SQLs for restoring TiFlash Replica", zap.Strings("sqls", sqls)) @@ -1562,12 +1563,6 @@ func createLogClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr * return nil, errors.Trace(err) } - migs, err := client.GetMigrations(ctx) - if err != nil { - return nil, errors.Trace(err) - } - client.BuildMigrations(migs) - encryptionManager, err := encryption.NewManager(&cfg.LogBackupCipherInfo, &cfg.MasterKeyConfig) if err != nil { return nil, errors.Annotate(err, "failed to create encryption manager for log restore") @@ -1576,6 +1571,12 @@ func createLogClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr * return nil, errors.Trace(err) } + migs, err := client.GetMigrations(ctx) + if err != nil { + return nil, errors.Trace(err) + } + client.BuildMigrations(migs) + return client, nil } @@ -1809,6 +1810,10 @@ type PiTRTaskInfo struct { FullRestoreCheckErr error } +func (p *PiTRTaskInfo) hasTiFlashItemsInCheckpoint() bool { + return p.CheckpointInfo != nil && p.CheckpointInfo.Metadata != nil && p.CheckpointInfo.Metadata.TiFlashItems != nil +} + func generatePiTRTaskInfo( ctx context.Context, mgr *conn.Mgr, diff --git a/br/tests/br_pitr_table_filter/run.sh b/br/tests/br_pitr_table_filter/run.sh index 1c0ffae034d68..7a687c228f4b7 100755 --- a/br/tests/br_pitr_table_filter/run.sh +++ b/br/tests/br_pitr_table_filter/run.sh @@ -60,7 +60,18 @@ rename_tables() { done } +drop_tables() { + local prefix=$1 # table name prefix + local count=$2 # number of tables to drop + + for i in $(seq 1 $count); do + run_sql "drop table $DB.${prefix}_${i};" + done +} + test_basic_filter() { + restart_services || { echo "Failed to restart services"; exit 1; } + echo "start basic filter testing" run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log" @@ -68,6 +79,7 @@ test_basic_filter() { echo "write initial data and do snapshot backup" create_tables_with_values "full_backup" 3 + create_tables_with_values "table_to_drop" 3 run_br backup full -f "$DB.*" -s "local://$TEST_DIR/$TASK_NAME/full" --pd $PD_ADDR @@ -75,6 +87,7 @@ test_basic_filter() { create_tables_with_values "log_backup_lower" 3 create_tables_with_values "LOG_BACKUP_UPPER" 3 create_tables_with_values "other" 3 + drop_tables "table_to_drop" 3 . "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance "$TASK_NAME" @@ -88,6 +101,7 @@ test_basic_filter() { verify_tables "LOG_BACKUP_UPPER" 3 true verify_tables "full_backup" 3 true verify_tables "other" 3 true + verify_tables "table_to_drop" 3 false echo "case 2 with log backup table filter" run_sql "drop schema $DB;" @@ -97,6 +111,7 @@ test_basic_filter() { verify_tables "LOG_BACKUP_UPPER" 3 true verify_tables "full_backup" 3 false verify_tables "other" 3 false + verify_tables "table_to_drop" 3 false echo "case 3 with multiple filters" run_sql "drop schema $DB;" @@ -106,6 +121,7 @@ test_basic_filter() { verify_tables "LOG_BACKUP_UPPER" 3 true verify_tables "full_backup" 3 true verify_tables "other" 3 false + verify_tables "table_to_drop" 3 false echo "case 4 with negative filters" run_sql "drop schema $DB;" @@ -116,11 +132,27 @@ test_basic_filter() { verify_tables "LOG_BACKUP_UPPER" 3 false verify_tables "full_backup" 3 true verify_tables "other" 3 true + verify_tables "table_to_drop" 3 false + + echo "case 5 restore dropped table" + run_sql "drop schema $DB;" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" -f "$DB.table*" + + verify_tables "log_backup_lower" 3 false + verify_tables "LOG_BACKUP_UPPER" 3 false + verify_tables "full_backup" 3 false + verify_tables "other" 3 false + verify_tables "table_to_drop" 3 false + + # cleanup + rm -rf "$TEST_DIR/$TASK_NAME" echo "basic filter test cases passed" } test_table_rename() { + restart_services || { echo "Failed to restart services"; exit 1; } + echo "start table rename with filter testing" run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log" @@ -155,10 +187,15 @@ test_table_rename() { verify_tables "renamed_out" 3 false verify_tables "log_renamed_out" 3 false + # cleanup + rm -rf "$TEST_DIR/$TASK_NAME" + echo "table rename with filter passed" } test_with_checkpoint() { + restart_services || { echo "Failed to restart services"; exit 1; } + echo "start table filter with checkpoint" run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log" @@ -181,43 +218,40 @@ test_with_checkpoint() { # restart services to clean up the cluster restart_services || { echo "Failed to restart services"; exit 1; } - run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" -f "$DB.log*" - # Using single quotes to prevent shell interpretation export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/restore/snap_client/corrupt-files=return("corrupt-last-table-files")' restore_fail=0 - run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$PREFIX/full" -s "local://$TEST_DIR/$PREFIX/log" || restore_fail=1 + run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" -s "local://$TEST_DIR/$TASK_NAME/log" -f "$DB.log*" || restore_fail=1 export GO_FAILPOINTS="" if [ $restore_fail -ne 1 ]; then - echo 'expecting failure but success' + echo 'expecting full backup last table corruption but success' exit 1 fi # PITR with checkpoint but failed in the log restore metakv stage - export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/snap_client/corrupt-files=return(\"only-last-table-files\");\ - github.com/pingcap/tidb/br/pkg/restore/log_client/failed-after-id-maps-saved=return(true)" + export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/restore/snap_client/corrupt-files=return("only-last-table-files");github.com/pingcap/tidb/br/pkg/restore/log_client/failed-after-id-maps-saved=return(true)' restore_fail=0 - run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$PREFIX/full" -s "local://$TEST_DIR/$PREFIX/log" || restore_fail=1 + run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" -s "local://$TEST_DIR/$TASK_NAME/log" -f "$DB.log*" || restore_fail=1 export GO_FAILPOINTS="" if [ $restore_fail -ne 1 ]; then - echo 'expecting failure but success' + echo 'expecting failed after id map saved but success' exit 1 fi # PITR with checkpoint but failed in the log restore datakv stage # skip the snapshot restore stage - export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/corrupt-files=return(\"corrupt-last-table-files\")" + export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/task/corrupt-files=return("corrupt-last-table-files")' restore_fail=0 - run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$PREFIX/full" -s "local://$TEST_DIR/$PREFIX/log" || restore_fail=1 + run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" -s "local://$TEST_DIR/$TASK_NAME/log" -f "$DB.log*" || restore_fail=1 export GO_FAILPOINTS="" if [ $restore_fail -ne 1 ]; then - echo 'expecting failure but success' + echo 'expecting log restore last table corruption but success' exit 1 fi # PITR with checkpoint - export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/corrupt-files=return(\"only-last-table-files\")" - run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$PREFIX/full" -s "local://$TEST_DIR/$PREFIX/log" + export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/task/corrupt-files=return("only-last-table-files")' + run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" -s "local://$TEST_DIR/$TASK_NAME/log" -f "$DB.log*" export GO_FAILPOINTS="" verify_tables "log_backup" 3 true @@ -230,10 +264,15 @@ test_with_checkpoint() { verify_tables "renamed_out" 3 false verify_tables "log_renamed_out" 3 false - echo "table rename with filter passed" + # cleanup + rm -rf "$TEST_DIR/$TASK_NAME" + + echo "table filter checkpoint passed" } test_exchange_partition() { + restart_services || { echo "Failed to restart services"; exit 1; } + echo "start testing exchange partition with filter" run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log" @@ -287,6 +326,9 @@ test_exchange_partition() { exit 1 fi + # cleanup + rm -rf "$TEST_DIR/$TASK_NAME" + echo "exchange partition with filter test passed" } diff --git a/br/tests/run_group_br_tests.sh b/br/tests/run_group_br_tests.sh index 5064127142204..36e3fb5c5c075 100755 --- a/br/tests/run_group_br_tests.sh +++ b/br/tests/run_group_br_tests.sh @@ -26,7 +26,7 @@ groups=( ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index' ["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' ["G05"]='br_skip_checksum br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter br_partition_add_index' - ["G06"]='br_tikv_outage br_tikv_outage3 br_restore_checkpoint br_encryption' + ["G06"]='br_tikv_outage br_tikv_outage3 br_restore_checkpoint br_encryption br_pitr_table_filter' ["G07"]='br_pitr' ["G08"]='br_tikv_outage2 br_ttl br_views_and_sequences br_z_gc_safepoint br_autorandom br_file_corruption br_tiflash_conflict' )