Skip to content

Commit

Permalink
some fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Nov 25, 2024
1 parent fddc0ce commit 02a2318
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 40 deletions.
8 changes: 4 additions & 4 deletions br/pkg/restore/tiflashrec/tiflash_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,15 @@ func (r *TiFlashRecorder) GenerateResetAlterTableDDLs(info infoschema.InfoSchema

func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []string {
items := make([]string, 0, len(r.items))
r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) {
table, ok := info.TableByID(context.Background(), id)
r.Iterate(func(tableId int64, replica model.TiFlashReplicaInfo) {
table, ok := info.TableByID(context.Background(), tableId)
if !ok {
log.Warn("Table do not exist, skipping", zap.Int64("id", id))
log.Warn("Table does not exist, might get filtered out if a custom filter is specified, skipping", zap.Int64("tableId", tableId))
return
}
schema, ok := infoschema.SchemaByTable(info, table.Meta())
if !ok {
log.Warn("Schema do not exist, skipping", zap.Int64("id", id), zap.Stringer("table", table.Meta().Name))
log.Warn("Schema do not exist, skipping", zap.Int64("tableId", tableId), zap.Stringer("table", table.Meta().Name))
return
}
altTableSpec, err := alterTableSpecOf(replica, false)
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ func (sr *SchemasReplace) tryRecordIngestIndex(job *model.Job) error {
}

// processIngestIndexAndDeleteRangeFromJob handles two special cases during log backup meta key replay.
// 1. index ingest is not captured by the log backup, thus we need to restore them manually later
// 1. index ingestion is not captured by the log backup, thus we need to restore them manually later
// 2. delete range also needs to be handled to clean up dropped table since it was previously relying on GC to clean it up
func (sr *SchemasReplace) processIngestIndexAndDeleteRangeFromJob(job *model.Job) error {
if ddl.JobNeedGC(job) {
Expand Down Expand Up @@ -834,7 +834,8 @@ func (bdr *brDelRangeExecWrapper) PrepareParamsList(sz int) {
func (bdr *brDelRangeExecWrapper) RewriteTableID(tableID int64) (int64, bool) {
newTableID, exists := bdr.globalTableIdMap[tableID]
if !exists {
log.Warn("failed to find the downstream id when rewrite delete range", zap.Int64("old tableID", tableID))
log.Warn("failed to find the downstream id when rewrite delete range, "+
"it might due to table has been filtered out if filters have been specified", zap.Int64("old tableID", tableID))
}
return newTableID, exists
}
Expand Down
16 changes: 11 additions & 5 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,23 +835,29 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s
if err != nil {
return errors.Trace(err)
}
log.Info("found items to restore after filtering", zap.Int("files", len(fileMap)), zap.Int("tables", len(tableMap)), zap.Int("db", len(dbMap)))
log.Info("found items to restore after filtering",
zap.Int("files", len(fileMap)),
zap.Int("tables", len(tableMap)),
zap.Int("db", len(dbMap)))

if cfg.logBackupTableHistory != nil {
// add additional tables and etc to restore in the snapshot restore phase since it will later be renamed during
// log restore and will fall into filter range.
// adjust tables to restore in the snapshot restore phase since it will later be renamed during
// log restore and will fall into or out of the filter range.
err := adjustTablesToRestoreAndCreateFilter(cfg.logBackupTableHistory, cfg.RestoreConfig, client, fileMap, tableMap)
if err != nil {
return errors.Trace(err)
}

log.Info("adjusted items to restore",
zap.Int("files", len(fileMap)), zap.Int("tables", len(tableMap)), zap.Int("db", len(dbMap)))
zap.Int("files", len(fileMap)),
zap.Int("tables", len(tableMap)),
zap.Int("db", len(dbMap)))

// need to update to include all eligible table id from snapshot restore
UpdatePiTRFilter(cfg.RestoreConfig, tableMap)
}

files, tables, dbs := convertMapsToSlices(fileMap, tableMap, dbMap)

// after figuring out what files to restore, check if disk has enough space
if cfg.CheckRequirements {
if err := checkDiskSpace(ctx, mgr, files, tables); err != nil {
Expand Down
33 changes: 19 additions & 14 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,10 +1217,12 @@ func RunStreamRestore(
cfg.tiflashRecorder = recorder
// restore full snapshot.
if taskInfo.NeedFullRestore {
// if table filter is specified, go through log backup DDL change and build table rename history to figure out
// the tables that need to be restored currently out of filter range but later renamed into the filter range.
// if table filter is specified by user, go through log backup DDL change and build table rename history to
// figure out the tables that need to be restored.
// currently we restore additional tables at full snapshot phase when it is renamed into the filter range
// later in log backup.
// we also ignore the tables that currently in filter range but later renamed out of the filter.
var logBackupTableHistory *stream.LogBackupTableHistory
// TODO need to do more
if cfg.ExplicitFilter {
logClient, err := createLogClient(ctx, g, cfg, mgr)
if err != nil {
Expand All @@ -1235,12 +1237,12 @@ func RunStreamRestore(
}
logStorage := cfg.Config.Storage
cfg.Config.Storage = cfg.FullBackupStorage
// TiFlash replica is restored to down-stream on 'pitr' currently.
snapshotRestoreConfig := SnapshotRestoreConfig{
RestoreConfig: cfg,
piTRTaskInfo: taskInfo,
logBackupTableHistory: logBackupTableHistory,
}
// TiFlash replica is restored to down-stream on 'pitr' currently.
if err = runSnapshotRestore(ctx, mgr, g, FullRestoreCmd, &snapshotRestoreConfig); err != nil {
return errors.Trace(err)
}
Expand All @@ -1249,7 +1251,7 @@ func RunStreamRestore(
if err = WriteStringToConsole(g, fmt.Sprintf("%s is skipped due to checkpoint mode for restore\n", FullRestoreCmd)); err != nil {
return errors.Trace(err)
}
if taskInfo.CheckpointInfo != nil && taskInfo.CheckpointInfo.Metadata != nil && taskInfo.CheckpointInfo.Metadata.TiFlashItems != nil {
if taskInfo.hasTiFlashItemsInCheckpoint() {
log.Info("load tiflash records of snapshot restore from checkpoint")
cfg.tiflashRecorder.Load(taskInfo.CheckpointInfo.Metadata.TiFlashItems)
}
Expand Down Expand Up @@ -1477,18 +1479,17 @@ func restoreStream(
return errors.Annotate(err, "failed to clean up")
}

// to delete range that's dropped previously
// to delete range(table, schema) that's dropped during log backup
if err = client.InsertGCRows(ctx); err != nil {
return errors.Annotate(err, "failed to insert rows into gc_delete_range")
}

// might need to skip some index due to filtered out
// index ingestion is not captured by regular log backup, so we need to manually ingest again
if err = client.RepairIngestIndex(ctx, ingestRecorder, g); err != nil {
return errors.Annotate(err, "failed to repair ingest index")
}

if cfg.tiflashRecorder != nil {
// might need to check filter too
sqls := cfg.tiflashRecorder.GenerateAlterTableDDLs(mgr.GetDomain().InfoSchema())
log.Info("Generating SQLs for restoring TiFlash Replica",
zap.Strings("sqls", sqls))
Expand Down Expand Up @@ -1562,12 +1563,6 @@ func createLogClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr *
return nil, errors.Trace(err)
}

migs, err := client.GetMigrations(ctx)
if err != nil {
return nil, errors.Trace(err)
}
client.BuildMigrations(migs)

encryptionManager, err := encryption.NewManager(&cfg.LogBackupCipherInfo, &cfg.MasterKeyConfig)
if err != nil {
return nil, errors.Annotate(err, "failed to create encryption manager for log restore")
Expand All @@ -1576,6 +1571,12 @@ func createLogClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, mgr *
return nil, errors.Trace(err)
}

migs, err := client.GetMigrations(ctx)
if err != nil {
return nil, errors.Trace(err)
}
client.BuildMigrations(migs)

return client, nil
}

Expand Down Expand Up @@ -1809,6 +1810,10 @@ type PiTRTaskInfo struct {
FullRestoreCheckErr error
}

func (p *PiTRTaskInfo) hasTiFlashItemsInCheckpoint() bool {
return p.CheckpointInfo != nil && p.CheckpointInfo.Metadata != nil && p.CheckpointInfo.Metadata.TiFlashItems != nil
}

func generatePiTRTaskInfo(
ctx context.Context,
mgr *conn.Mgr,
Expand Down
70 changes: 56 additions & 14 deletions br/tests/br_pitr_table_filter/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,34 @@ rename_tables() {
done
}

drop_tables() {
local prefix=$1 # table name prefix
local count=$2 # number of tables to drop

for i in $(seq 1 $count); do
run_sql "drop table $DB.${prefix}_${i};"
done
}

test_basic_filter() {
restart_services || { echo "Failed to restart services"; exit 1; }

echo "start basic filter testing"
run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log"

run_sql "create schema $DB;"

echo "write initial data and do snapshot backup"
create_tables_with_values "full_backup" 3
create_tables_with_values "table_to_drop" 3

run_br backup full -f "$DB.*" -s "local://$TEST_DIR/$TASK_NAME/full" --pd $PD_ADDR

echo "write more data and wait for log backup to catch up"
create_tables_with_values "log_backup_lower" 3
create_tables_with_values "LOG_BACKUP_UPPER" 3
create_tables_with_values "other" 3
drop_tables "table_to_drop" 3

. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance "$TASK_NAME"

Expand All @@ -88,6 +101,7 @@ test_basic_filter() {
verify_tables "LOG_BACKUP_UPPER" 3 true
verify_tables "full_backup" 3 true
verify_tables "other" 3 true
verify_tables "table_to_drop" 3 false

echo "case 2 with log backup table filter"
run_sql "drop schema $DB;"
Expand All @@ -97,6 +111,7 @@ test_basic_filter() {
verify_tables "LOG_BACKUP_UPPER" 3 true
verify_tables "full_backup" 3 false
verify_tables "other" 3 false
verify_tables "table_to_drop" 3 false

echo "case 3 with multiple filters"
run_sql "drop schema $DB;"
Expand All @@ -106,6 +121,7 @@ test_basic_filter() {
verify_tables "LOG_BACKUP_UPPER" 3 true
verify_tables "full_backup" 3 true
verify_tables "other" 3 false
verify_tables "table_to_drop" 3 false

echo "case 4 with negative filters"
run_sql "drop schema $DB;"
Expand All @@ -116,11 +132,27 @@ test_basic_filter() {
verify_tables "LOG_BACKUP_UPPER" 3 false
verify_tables "full_backup" 3 true
verify_tables "other" 3 true
verify_tables "table_to_drop" 3 false

echo "case 5 restore dropped table"
run_sql "drop schema $DB;"
run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" -f "$DB.table*"

verify_tables "log_backup_lower" 3 false
verify_tables "LOG_BACKUP_UPPER" 3 false
verify_tables "full_backup" 3 false
verify_tables "other" 3 false
verify_tables "table_to_drop" 3 false

# cleanup
rm -rf "$TEST_DIR/$TASK_NAME"

echo "basic filter test cases passed"
}

test_table_rename() {
restart_services || { echo "Failed to restart services"; exit 1; }

echo "start table rename with filter testing"
run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log"

Expand Down Expand Up @@ -155,10 +187,15 @@ test_table_rename() {
verify_tables "renamed_out" 3 false
verify_tables "log_renamed_out" 3 false

# cleanup
rm -rf "$TEST_DIR/$TASK_NAME"

echo "table rename with filter passed"
}

test_with_checkpoint() {
restart_services || { echo "Failed to restart services"; exit 1; }

echo "start table filter with checkpoint"
run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log"

Expand All @@ -181,43 +218,40 @@ test_with_checkpoint() {
# restart services to clean up the cluster
restart_services || { echo "Failed to restart services"; exit 1; }

run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" -f "$DB.log*"

# Using single quotes to prevent shell interpretation
export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/restore/snap_client/corrupt-files=return("corrupt-last-table-files")'
restore_fail=0
run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$PREFIX/full" -s "local://$TEST_DIR/$PREFIX/log" || restore_fail=1
run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" -s "local://$TEST_DIR/$TASK_NAME/log" -f "$DB.log*" || restore_fail=1
export GO_FAILPOINTS=""
if [ $restore_fail -ne 1 ]; then
echo 'expecting failure but success'
echo 'expecting full backup last table corruption but success'
exit 1
fi

# PITR with checkpoint but failed in the log restore metakv stage
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/snap_client/corrupt-files=return(\"only-last-table-files\");\
github.com/pingcap/tidb/br/pkg/restore/log_client/failed-after-id-maps-saved=return(true)"
export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/restore/snap_client/corrupt-files=return("only-last-table-files");github.com/pingcap/tidb/br/pkg/restore/log_client/failed-after-id-maps-saved=return(true)'
restore_fail=0
run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$PREFIX/full" -s "local://$TEST_DIR/$PREFIX/log" || restore_fail=1
run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" -s "local://$TEST_DIR/$TASK_NAME/log" -f "$DB.log*" || restore_fail=1
export GO_FAILPOINTS=""
if [ $restore_fail -ne 1 ]; then
echo 'expecting failure but success'
echo 'expecting failed after id map saved but success'
exit 1
fi

# PITR with checkpoint but failed in the log restore datakv stage
# skip the snapshot restore stage
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/corrupt-files=return(\"corrupt-last-table-files\")"
export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/task/corrupt-files=return("corrupt-last-table-files")'
restore_fail=0
run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$PREFIX/full" -s "local://$TEST_DIR/$PREFIX/log" || restore_fail=1
run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" -s "local://$TEST_DIR/$TASK_NAME/log" -f "$DB.log*" || restore_fail=1
export GO_FAILPOINTS=""
if [ $restore_fail -ne 1 ]; then
echo 'expecting failure but success'
echo 'expecting log restore last table corruption but success'
exit 1
fi

# PITR with checkpoint
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/corrupt-files=return(\"only-last-table-files\")"
run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$PREFIX/full" -s "local://$TEST_DIR/$PREFIX/log"
export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/task/corrupt-files=return("only-last-table-files")'
run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" -s "local://$TEST_DIR/$TASK_NAME/log" -f "$DB.log*"
export GO_FAILPOINTS=""

verify_tables "log_backup" 3 true
Expand All @@ -230,10 +264,15 @@ test_with_checkpoint() {
verify_tables "renamed_out" 3 false
verify_tables "log_renamed_out" 3 false

echo "table rename with filter passed"
# cleanup
rm -rf "$TEST_DIR/$TASK_NAME"

echo "table filter checkpoint passed"
}

test_exchange_partition() {
restart_services || { echo "Failed to restart services"; exit 1; }

echo "start testing exchange partition with filter"
run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log"

Expand Down Expand Up @@ -287,6 +326,9 @@ test_exchange_partition() {
exit 1
fi

# cleanup
rm -rf "$TEST_DIR/$TASK_NAME"

echo "exchange partition with filter test passed"
}

Expand Down
2 changes: 1 addition & 1 deletion br/tests/run_group_br_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ groups=(
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index'
["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table'
["G05"]='br_skip_checksum br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter br_partition_add_index'
["G06"]='br_tikv_outage br_tikv_outage3 br_restore_checkpoint br_encryption'
["G06"]='br_tikv_outage br_tikv_outage3 br_restore_checkpoint br_encryption br_pitr_table_filter'
["G07"]='br_pitr'
["G08"]='br_tikv_outage2 br_ttl br_views_and_sequences br_z_gc_safepoint br_autorandom br_file_corruption br_tiflash_conflict'
)
Expand Down

0 comments on commit 02a2318

Please sign in to comment.