Skip to content

Commit 7abfd01

Browse files
authored
Snapshot support for databases and tables (#22635) (#22641)
Snapshot support for databases and tables Approved by: @XuPeng-SH
1 parent caf1ce5 commit 7abfd01

File tree

11 files changed

+887
-233
lines changed

11 files changed

+887
-233
lines changed

pkg/vm/engine/tae/db/gc/v3/checkpoint.go

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
416416
)
417417
return
418418
}
419-
var snapshots map[uint32]containers.Vector
419+
var snapshots *logtail.SnapshotInfo
420420
var pitrs *logtail.PitrInfo
421421
pitrs, err = c.GetPITRsLocked(ctx)
422422
if err != nil {
@@ -440,8 +440,6 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
440440
)
441441
return
442442
}
443-
accountSnapshots := TransformToTSList(snapshots)
444-
logtail.CloseSnapshotList(snapshots)
445443
_, sarg, _ := fault.TriggerFault("replay error UT")
446444
if sarg != "" {
447445
err = moerr.NewInternalErrorNoCtxf("GC-REPLAY-GET-CHECKPOINT-DATA-ERROR %s", sarg)
@@ -464,13 +462,12 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
464462
c.checkpointCli.GetCatalog().GetUsageMemo().(*logtail.TNUsageMemo),
465463
ckpBatch,
466464
c.mutation.snapshotMeta,
467-
accountSnapshots,
465+
snapshots,
468466
pitrs,
469467
0)
470468
logutil.Info(
471469
"GC-REPLAY-COLLECT-SNAPSHOT-SIZE",
472470
zap.String("task", c.TaskNameLocked()),
473-
zap.Int("size", len(accountSnapshots)),
474471
zap.Duration("duration", time.Since(start)),
475472
zap.String("checkpoint", compacted.String()),
476473
zap.Int("count", ckpBatch.RowCount()),
@@ -777,7 +774,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked(
777774
ctx context.Context,
778775
checkpointLowWaterMark *types.TS,
779776
memoryBuffer *containers.OneSchemaBatchBuffer,
780-
accountSnapshots map[uint32][]types.TS,
777+
snapshots *logtail.SnapshotInfo,
781778
pitrs *logtail.PitrInfo,
782779
gcFileCount int,
783780
) (err error) {
@@ -888,7 +885,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked(
888885
c.checkpointCli.GetCatalog().GetUsageMemo().(*logtail.TNUsageMemo),
889886
newCkpData,
890887
c.mutation.snapshotMeta,
891-
accountSnapshots,
888+
snapshots,
892889
pitrs,
893890
gcFileCount)
894891
if newCkp == nil {
@@ -1069,10 +1066,9 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
10691066
memoryBuffer *containers.OneSchemaBatchBuffer,
10701067
) (err error) {
10711068
now := time.Now()
1072-
var snapshots map[uint32]containers.Vector
1069+
var snapshots *logtail.SnapshotInfo
10731070
var extraErrMsg string
10741071
defer func() {
1075-
logtail.CloseSnapshotList(snapshots)
10761072
logutil.Info(
10771073
"GC-TRACE-TRY-GC-AGAINST-GCKP",
10781074
zap.String("task", c.TaskNameLocked()),
@@ -1092,9 +1088,8 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
10921088
extraErrMsg = "GetSnapshot failed"
10931089
return
10941090
}
1095-
accountSnapshots := TransformToTSList(snapshots)
10961091
filesToGC, err := c.doGCAgainstGlobalCheckpointLocked(
1097-
ctx, gckp, accountSnapshots, pitrs, memoryBuffer,
1092+
ctx, gckp, snapshots, pitrs, memoryBuffer,
10981093
)
10991094
if err != nil {
11001095
extraErrMsg = "doGCAgainstGlobalCheckpointLocked failed"
@@ -1132,7 +1127,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
11321127
waterMark = scanMark
11331128
}
11341129
err = c.mergeCheckpointFilesLocked(
1135-
ctx, &waterMark, memoryBuffer, accountSnapshots, pitrs, len(filesToGC),
1130+
ctx, &waterMark, memoryBuffer, snapshots, pitrs, len(filesToGC),
11361131
)
11371132
if err != nil {
11381133
extraErrMsg = fmt.Sprintf("mergeCheckpointFilesLocked %v failed", waterMark.ToString())
@@ -1145,7 +1140,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
11451140
func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked(
11461141
ctx context.Context,
11471142
gckp *checkpoint.CheckpointEntry,
1148-
accountSnapshots map[uint32][]types.TS,
1143+
snapshots *logtail.SnapshotInfo,
11491144
pitrs *logtail.PitrInfo,
11501145
memoryBuffer *containers.OneSchemaBatchBuffer,
11511146
) ([]string, error) {
@@ -1189,7 +1184,7 @@ func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked(
11891184
if filesToGC, metafile, err = scannedWindow.ExecuteGlobalCheckpointBasedGC(
11901185
ctx,
11911186
gckp,
1192-
accountSnapshots,
1187+
snapshots,
11931188
pitrs,
11941189
c.mutation.snapshotMeta,
11951190
iscp,
@@ -1232,7 +1227,7 @@ func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked(
12321227
now = time.Now()
12331228
// TODO:
12341229
c.updateGCWaterMark(gckp)
1235-
c.mutation.snapshotMeta.MergeTableInfo(accountSnapshots, pitrs)
1230+
c.mutation.snapshotMeta.MergeTableInfo(snapshots, pitrs)
12361231
mergeDuration = time.Since(now)
12371232
return filesToGC, nil
12381233
}
@@ -1324,7 +1319,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13241319
// TODO
13251320
return err
13261321
}
1327-
var snapshots map[uint32]containers.Vector
1322+
var snapshots *logtail.SnapshotInfo
13281323
snapshots, err = c.GetSnapshotsLocked()
13291324
if err != nil {
13301325
logutil.Error(
@@ -1334,7 +1329,6 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13341329
)
13351330
return err
13361331
}
1337-
defer logtail.CloseSnapshotList(snapshots)
13381332
var pitr *logtail.PitrInfo
13391333
pitr, err = c.GetPITRsLocked(c.ctx)
13401334
if err != nil {
@@ -1348,8 +1342,6 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13481342

13491343
mergeWindow := c.GetScannedWindowLocked().Clone()
13501344
defer mergeWindow.Close()
1351-
1352-
accoutSnapshots := TransformToTSList(snapshots)
13531345
logutil.Info(
13541346
"GC-TRACE-MERGE-WINDOW",
13551347
zap.String("task", c.TaskNameLocked()),
@@ -1362,7 +1354,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13621354
if _, _, err = mergeWindow.ExecuteGlobalCheckpointBasedGC(
13631355
c.ctx,
13641356
gCkp,
1365-
accoutSnapshots,
1357+
snapshots,
13661358
pitr,
13671359
c.mutation.snapshotMeta,
13681360
iscp,
@@ -1386,7 +1378,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13861378
if _, _, err = debugWindow.ExecuteGlobalCheckpointBasedGC(
13871379
c.ctx,
13881380
gCkp,
1389-
accoutSnapshots,
1381+
snapshots,
13901382
pitr,
13911383
c.mutation.snapshotMeta,
13921384
iscp,
@@ -1468,7 +1460,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
14681460
}
14691461
collectObjectsFromCheckpointData(c.ctx, ckpReader, cptCkpObjects)
14701462

1471-
tList, pList := c.mutation.snapshotMeta.AccountToTableSnapshots(accoutSnapshots, pitr)
1463+
tList, pList := c.mutation.snapshotMeta.AccountToTableSnapshots(snapshots, pitr)
14721464
for name, tables := range ickpObjects {
14731465
for _, entry := range tables {
14741466
if cptCkpObjects[name] != nil {
@@ -1840,12 +1832,13 @@ func (c *checkpointCleaner) mutUpdateSnapshotMetaLocked(
18401832
)
18411833
}
18421834

1843-
func (c *checkpointCleaner) GetSnapshots() (map[uint32]containers.Vector, error) {
1835+
func (c *checkpointCleaner) GetSnapshots() (*logtail.SnapshotInfo, error) {
18441836
c.mutation.Lock()
18451837
defer c.mutation.Unlock()
18461838
return c.mutation.snapshotMeta.GetSnapshot(c.ctx, c.sid, c.fs, c.mp)
18471839
}
1848-
func (c *checkpointCleaner) GetSnapshotsLocked() (map[uint32]containers.Vector, error) {
1840+
1841+
func (c *checkpointCleaner) GetSnapshotsLocked() (*logtail.SnapshotInfo, error) {
18491842
return c.mutation.snapshotMeta.GetSnapshot(c.ctx, c.sid, c.fs, c.mp)
18501843
}
18511844
func (c *checkpointCleaner) GetTablePK(tid uint64) string {

pkg/vm/engine/tae/db/gc/v3/exec_v1.go

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,15 @@ type CheckpointBasedGCJob struct {
6363
coarseProbility float64
6464
canGCCacheSize int
6565
}
66-
sourcer engine.BaseReader
67-
snapshotMeta *logtail.SnapshotMeta
68-
accountSnapshots map[uint32][]types.TS
69-
iscpTables map[uint64]types.TS
70-
pitr *logtail.PitrInfo
71-
ts *types.TS
72-
globalCkpLoc objectio.Location
73-
globalCkpVer uint32
74-
checkpointCli checkpoint.Runner // Added to access catalog
66+
sourcer engine.BaseReader
67+
snapshotMeta *logtail.SnapshotMeta
68+
snapshots *logtail.SnapshotInfo
69+
iscpTables map[uint64]types.TS
70+
pitr *logtail.PitrInfo
71+
ts *types.TS
72+
globalCkpLoc objectio.Location
73+
globalCkpVer uint32
74+
checkpointCli checkpoint.Runner // Added to access catalog
7575

7676
result struct {
7777
vecToGC *vector.Vector
@@ -85,7 +85,7 @@ func NewCheckpointBasedGCJob(
8585
gckpVersion uint32,
8686
sourcer engine.BaseReader,
8787
pitr *logtail.PitrInfo,
88-
accountSnapshots map[uint32][]types.TS,
88+
snapshots *logtail.SnapshotInfo,
8989
iscpTables map[uint64]types.TS,
9090
snapshotMeta *logtail.SnapshotMeta,
9191
checkpointCli checkpoint.Runner,
@@ -97,15 +97,15 @@ func NewCheckpointBasedGCJob(
9797
opts ...GCJobExecutorOption,
9898
) *CheckpointBasedGCJob {
9999
e := &CheckpointBasedGCJob{
100-
sourcer: sourcer,
101-
snapshotMeta: snapshotMeta,
102-
accountSnapshots: accountSnapshots,
103-
pitr: pitr,
104-
ts: ts,
105-
globalCkpLoc: globalCkpLoc,
106-
globalCkpVer: gckpVersion,
107-
iscpTables: iscpTables,
108-
checkpointCli: checkpointCli,
100+
sourcer: sourcer,
101+
snapshotMeta: snapshotMeta,
102+
snapshots: snapshots,
103+
pitr: pitr,
104+
ts: ts,
105+
globalCkpLoc: globalCkpLoc,
106+
globalCkpVer: gckpVersion,
107+
iscpTables: iscpTables,
108+
checkpointCli: checkpointCli,
109109
}
110110
for _, opt := range opts {
111111
opt(e)
@@ -121,7 +121,7 @@ func (e *CheckpointBasedGCJob) Close() error {
121121
e.sourcer = nil
122122
}
123123
e.snapshotMeta = nil
124-
e.accountSnapshots = nil
124+
e.snapshots = nil
125125
e.pitr = nil
126126
e.ts = nil
127127
e.globalCkpLoc = nil
@@ -171,7 +171,7 @@ func (e *CheckpointBasedGCJob) Execute(ctx context.Context) error {
171171

172172
fineFilter, err := MakeSnapshotAndPitrFineFilter(
173173
e.ts,
174-
e.accountSnapshots,
174+
e.snapshots,
175175
e.pitr,
176176
e.snapshotMeta,
177177
transObjects,
@@ -356,7 +356,7 @@ func buildTableExistenceMap(snapshotMeta *logtail.SnapshotMeta, checkpointCli ch
356356

357357
func MakeSnapshotAndPitrFineFilter(
358358
ts *types.TS,
359-
accountSnapshots map[uint32][]types.TS,
359+
snapshots *logtail.SnapshotInfo,
360360
pitrs *logtail.PitrInfo,
361361
snapshotMeta *logtail.SnapshotMeta,
362362
transObjects map[string]map[uint64]*ObjectEntry,
@@ -373,7 +373,7 @@ func MakeSnapshotAndPitrFineFilter(
373373
}
374374

375375
tableSnapshots, tablePitrs := snapshotMeta.AccountToTableSnapshots(
376-
accountSnapshots,
376+
snapshots,
377377
pitrs,
378378
)
379379
return func(
@@ -393,7 +393,7 @@ func MakeSnapshotAndPitrFineFilter(
393393
createTS := createTSs[i]
394394
deleteTS := deleteTSs[i]
395395

396-
snapshots := tableSnapshots[tableID]
396+
sp := tableSnapshots[tableID]
397397
pitr := tablePitrs[tableID]
398398

399399
if transObjects[name] != nil {
@@ -407,7 +407,7 @@ func MakeSnapshotAndPitrFineFilter(
407407
}
408408

409409
if !logtail.ObjectIsSnapshotRefers(
410-
entry.stats, pitr, &entry.createTS, &entry.dropTS, snapshots,
410+
entry.stats, pitr, &entry.createTS, &entry.dropTS, sp,
411411
) {
412412
if iscpTables == nil {
413413
bm.Add(uint64(i))
@@ -436,7 +436,7 @@ func MakeSnapshotAndPitrFineFilter(
436436
continue
437437
}
438438
if !logtail.ObjectIsSnapshotRefers(
439-
&stats, pitr, &createTS, &deleteTS, snapshots,
439+
&stats, pitr, &createTS, &deleteTS, sp,
440440
) {
441441
if iscpTables == nil {
442442
bm.Add(uint64(i))

pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919

2020
"github.com/matrixorigin/matrixone/pkg/common/mpool"
2121
"github.com/matrixorigin/matrixone/pkg/container/types"
22-
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
2322
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
2423
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
2524
)
@@ -139,7 +138,7 @@ func (c *MockCleaner) GetMPool() *mpool.MPool {
139138
return nil
140139
}
141140

142-
func (c *MockCleaner) GetSnapshots() (map[uint32]containers.Vector, error) {
141+
func (c *MockCleaner) GetSnapshots() (*logtail.SnapshotInfo, error) {
143142
return nil, nil
144143
}
145144

pkg/vm/engine/tae/db/gc/v3/types.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/matrixorigin/matrixone/pkg/container/vector"
2424
"github.com/matrixorigin/matrixone/pkg/objectio"
2525
"github.com/matrixorigin/matrixone/pkg/objectio/ioutil"
26-
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
2726
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
2827
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
2928
)
@@ -148,7 +147,7 @@ type Cleaner interface {
148147
DisableGC()
149148
GCEnabled() bool
150149
GetMPool() *mpool.MPool
151-
GetSnapshots() (map[uint32]containers.Vector, error)
150+
GetSnapshots() (*logtail.SnapshotInfo, error)
152151
GetDetails(ctx context.Context) (map[uint32]*TableStats, error)
153152
Verify(ctx context.Context) string
154153
ISCPTables() (map[uint64]types.TS, error)

pkg/vm/engine/tae/db/gc/v3/util.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ import (
1919

2020
"github.com/matrixorigin/matrixone/pkg/common/mpool"
2121
"github.com/matrixorigin/matrixone/pkg/container/batch"
22-
"github.com/matrixorigin/matrixone/pkg/container/types"
23-
"github.com/matrixorigin/matrixone/pkg/container/vector"
2422
"github.com/matrixorigin/matrixone/pkg/fileservice"
2523
"github.com/matrixorigin/matrixone/pkg/objectio"
2624
"github.com/matrixorigin/matrixone/pkg/pb/plan"
@@ -72,16 +70,6 @@ func MakeLoadFunc(
7270
}, releaseFn
7371
}
7472

75-
func TransformToTSList(
76-
fromKV map[uint32]containers.Vector,
77-
) map[uint32][]types.TS {
78-
newKV := make(map[uint32][]types.TS, len(fromKV))
79-
for k, v := range fromKV {
80-
newKV[k] = vector.MustFixedColWithTypeCheck[types.TS](v.GetDownstreamVector())
81-
}
82-
return newKV
83-
}
84-
8573
func MakeGCWindowBuffer(size int) *containers.OneSchemaBatchBuffer {
8674
return containers.NewOneSchemaBatchBuffer(
8775
size, ObjectTableAttrs, ObjectTableTypes, false,

pkg/vm/engine/tae/db/gc/v3/window.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (w *GCWindow) MakeFilesReader(
120120
func (w *GCWindow) ExecuteGlobalCheckpointBasedGC(
121121
ctx context.Context,
122122
gCkp *checkpoint.CheckpointEntry,
123-
accountSnapshots map[uint32][]types.TS,
123+
snapshots *logtail.SnapshotInfo,
124124
pitrs *logtail.PitrInfo,
125125
snapshotMeta *logtail.SnapshotMeta,
126126
iscpTables map[uint64]types.TS,
@@ -142,7 +142,7 @@ func (w *GCWindow) ExecuteGlobalCheckpointBasedGC(
142142
gCkp.GetVersion(),
143143
sourcer,
144144
pitrs,
145-
accountSnapshots,
145+
snapshots,
146146
iscpTables,
147147
snapshotMeta,
148148
checkpointCli,

pkg/vm/engine/tae/db/gc/v3/window_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -243,11 +243,11 @@ func NewMockSnapshotMeta() *MockSnapshotMeta {
243243

244244
// AccountToTableSnapshots mocks the same method in logtail.SnapshotMeta
245245
func (m *MockSnapshotMeta) AccountToTableSnapshots(
246-
accountSnapshots map[uint32][]types.TS,
246+
snapshots *logtail.SnapshotInfo,
247247
pitrs *logtail.PitrInfo,
248-
) (map[uint64][]types.TS, map[uint64][]types.TS) {
248+
) (map[uint64][]types.TS, map[uint64]*types.TS) {
249249
tableSnapshots := make(map[uint64][]types.TS)
250-
tablePitrs := make(map[uint64][]types.TS)
250+
tablePitrs := make(map[uint64]*types.TS)
251251
return tableSnapshots, tablePitrs
252252
}
253253

0 commit comments

Comments
 (0)