diff --git a/pkg/executor/brie.go b/pkg/executor/brie.go index 8ad6ec5b9b2f9..f2a544d4bec1b 100644 --- a/pkg/executor/brie.go +++ b/pkg/executor/brie.go @@ -372,11 +372,16 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) case len(s.Tables) != 0: tables := make([]filter.Table, 0, len(s.Tables)) for _, tbl := range s.Tables { - tables = append(tables, filter.Table{Name: tbl.Name.O, Schema: tbl.Schema.O}) + table := filter.Table{Name: tbl.Name.O, Schema: tbl.Schema.O} + tables = append(tables, table) + cfg.FilterStr = append(cfg.FilterStr, table.String()) } cfg.TableFilter = filter.NewTablesFilter(tables...) case len(s.Schemas) != 0: cfg.TableFilter = filter.NewSchemasFilter(s.Schemas...) + for _, schema := range s.Schemas { + cfg.FilterStr = append(cfg.FilterStr, fmt.Sprintf("`%s`.*", schema)) + } default: cfg.TableFilter = filter.All() } diff --git a/tests/realtikvtest/brietest/pitr_test.go b/tests/realtikvtest/brietest/pitr_test.go index 54d28ef3542a4..3d7410c082bed 100644 --- a/tests/realtikvtest/brietest/pitr_test.go +++ b/tests/realtikvtest/brietest/pitr_test.go @@ -7,6 +7,7 @@ import ( "math" "os" "path/filepath" + "regexp" "strings" "testing" "time" @@ -218,10 +219,11 @@ func (kit *LogBackupKit) forceFlush() { cfg := task.DefaultConfig() cfg.PD = append(cfg.PD, config.GetGlobalConfig().Path) err := operator.RunForceFlush(ctx, &operator.ForceFlushConfig{ - Config: cfg, + Config: cfg, + StoresPattern: regexp.MustCompile(".*"), }) if err != nil { - log.Warn("It seems this version of TiKV doesn't support force flush, the test may be much more slower.", + log.Warn("[TEST.forceFlush] It seems this version of TiKV doesn't support force flush, the test may be much more slower.", logutil.ShortError(err)) } return nil @@ -233,7 +235,7 @@ func (kit *LogBackupKit) forceFlushAndWait(taskName string) { kit.forceFlush() require.Eventually(kit.t, func() bool { ckpt := kit.CheckpointTSOf(taskName) - log.Info("checkpoint", zap.Uint64("checkpoint", ckpt), zap.Uint64("ts", ts)) + log.Info("[TEST.forceFlushAndWait] checkpoint", zap.Uint64("checkpoint", ckpt), zap.Uint64("ts", ts)) return ckpt >= ts }, 300*time.Second, 1*time.Second) time.Sleep(6 * time.Second) // Wait the storage checkpoint uploaded... @@ -255,26 +257,35 @@ func verifySimpleData(kit *LogBackupKit) { } func cleanSimpleData(kit *LogBackupKit) { - kit.tk.MustExec(fmt.Sprintf("DROP TABLE test.%s", kit.t.Name())) + kit.tk.MustExec(fmt.Sprintf("DROP TABLE IF EXISTS test.%s", kit.t.Name())) } -func TestPiTR(t *testing.T) { +func TestPiTRAndBackupInSQL(t *testing.T) { kit := NewLogBackupKit(t) - - taskName := "simple" createSimpleTableWithData(kit) - - ts := kit.TSO() - kit.RunFullBackup(func(bc *task.BackupConfig) { bc.BackupTS = ts }) - kit.RunLogStart(taskName, func(sc *task.StreamConfig) { sc.StartTS = ts }) - insertSimpleIncreaseData(kit) - kit.forceFlushAndWait(taskName) + taskName := t.Name() + kit.RunFullBackup(func(bc *task.BackupConfig) {}) cleanSimpleData(kit) + ts := kit.TSO() + kit.RunFullBackup(func(bc *task.BackupConfig) { + bc.Storage = "local://" + kit.base + "/full2" + bc.BackupTS = ts + }) + kit.RunLogStart(taskName, func(sc *task.StreamConfig) { + sc.StartTS = ts + }) + _ = kit.tk.MustQuery(fmt.Sprintf("RESTORE TABLE test.%s FROM '%s'", t.Name(), "local://"+kit.base+"/full")) + verifySimpleData(kit) + kit.forceFlushAndWait(taskName) + + cleanSimpleData(kit) kit.StopTaskIfExists(taskName) - kit.RunStreamRestore(func(rc *task.RestoreConfig) {}) + kit.RunStreamRestore(func(rc *task.RestoreConfig) { + rc.FullBackupStorage = "local://" + kit.base + "/full2" + }) verifySimpleData(kit) }