Skip to content

Commit

Permalink
fixed BRIE via SQL
Browse files Browse the repository at this point in the history
Signed-off-by: hillium <[email protected]>
  • Loading branch information
YuJuncen committed Dec 23, 2024
1 parent c1d7800 commit 5b262a4
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 15 deletions.
7 changes: 6 additions & 1 deletion pkg/executor/brie.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,16 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema)
case len(s.Tables) != 0:
tables := make([]filter.Table, 0, len(s.Tables))
for _, tbl := range s.Tables {
tables = append(tables, filter.Table{Name: tbl.Name.O, Schema: tbl.Schema.O})
table := filter.Table{Name: tbl.Name.O, Schema: tbl.Schema.O}
tables = append(tables, table)
cfg.FilterStr = append(cfg.FilterStr, table.String())
}
cfg.TableFilter = filter.NewTablesFilter(tables...)
case len(s.Schemas) != 0:
cfg.TableFilter = filter.NewSchemasFilter(s.Schemas...)
for _, schema := range s.Schemas {
cfg.FilterStr = append(cfg.FilterStr, fmt.Sprintf("`%s`.*", schema))
}
default:
cfg.TableFilter = filter.All()
}
Expand Down
39 changes: 25 additions & 14 deletions tests/realtikvtest/brietest/pitr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"os"
"path/filepath"
"regexp"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -218,10 +219,11 @@ func (kit *LogBackupKit) forceFlush() {
cfg := task.DefaultConfig()
cfg.PD = append(cfg.PD, config.GetGlobalConfig().Path)
err := operator.RunForceFlush(ctx, &operator.ForceFlushConfig{
Config: cfg,
Config: cfg,
StoresPattern: regexp.MustCompile(".*"),
})
if err != nil {
log.Warn("It seems this version of TiKV doesn't support force flush, the test may be much more slower.",
log.Warn("[TEST.forceFlush] It seems this version of TiKV doesn't support force flush, the test may be much more slower.",
logutil.ShortError(err))
}
return nil
Expand All @@ -233,7 +235,7 @@ func (kit *LogBackupKit) forceFlushAndWait(taskName string) {
kit.forceFlush()
require.Eventually(kit.t, func() bool {
ckpt := kit.CheckpointTSOf(taskName)
log.Info("checkpoint", zap.Uint64("checkpoint", ckpt), zap.Uint64("ts", ts))
log.Info("[TEST.forceFlushAndWait] checkpoint", zap.Uint64("checkpoint", ckpt), zap.Uint64("ts", ts))
return ckpt >= ts
}, 300*time.Second, 1*time.Second)
time.Sleep(6 * time.Second) // Wait the storage checkpoint uploaded...
Expand All @@ -255,26 +257,35 @@ func verifySimpleData(kit *LogBackupKit) {
}

func cleanSimpleData(kit *LogBackupKit) {
kit.tk.MustExec(fmt.Sprintf("DROP TABLE test.%s", kit.t.Name()))
kit.tk.MustExec(fmt.Sprintf("DROP TABLE IF EXISTS test.%s", kit.t.Name()))
}

func TestPiTR(t *testing.T) {
func TestPiTRAndBackupInSQL(t *testing.T) {
kit := NewLogBackupKit(t)

taskName := "simple"
createSimpleTableWithData(kit)

ts := kit.TSO()
kit.RunFullBackup(func(bc *task.BackupConfig) { bc.BackupTS = ts })
kit.RunLogStart(taskName, func(sc *task.StreamConfig) { sc.StartTS = ts })

insertSimpleIncreaseData(kit)

kit.forceFlushAndWait(taskName)
taskName := t.Name()
kit.RunFullBackup(func(bc *task.BackupConfig) {})
cleanSimpleData(kit)

ts := kit.TSO()
kit.RunFullBackup(func(bc *task.BackupConfig) {
bc.Storage = "local://" + kit.base + "/full2"
bc.BackupTS = ts
})
kit.RunLogStart(taskName, func(sc *task.StreamConfig) {
sc.StartTS = ts
})
_ = kit.tk.MustQuery(fmt.Sprintf("RESTORE TABLE test.%s FROM '%s'", t.Name(), "local://"+kit.base+"/full"))
verifySimpleData(kit)
kit.forceFlushAndWait(taskName)

cleanSimpleData(kit)
kit.StopTaskIfExists(taskName)
kit.RunStreamRestore(func(rc *task.RestoreConfig) {})
kit.RunStreamRestore(func(rc *task.RestoreConfig) {
rc.FullBackupStorage = "local://" + kit.base + "/full2"
})
verifySimpleData(kit)
}

Expand Down

0 comments on commit 5b262a4

Please sign in to comment.