Skip to content

Commit b34561d

Browse files
committed
Revert "Snapshot support for databases and tables (matrixorigin#22635) (matrixorigin#22641)"
This reverts commit 7abfd01.
1 parent 16797d9 commit b34561d

File tree

11 files changed

+232
-885
lines changed

11 files changed

+232
-885
lines changed

pkg/vm/engine/tae/db/gc/v3/checkpoint.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
416416
)
417417
return
418418
}
419-
var snapshots *logtail.SnapshotInfo
419+
var snapshots map[uint32]containers.Vector
420420
var pitrs *logtail.PitrInfo
421421
pitrs, err = c.GetPITRsLocked(ctx)
422422
if err != nil {
@@ -440,6 +440,8 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
440440
)
441441
return
442442
}
443+
accountSnapshots := TransformToTSList(snapshots)
444+
logtail.CloseSnapshotList(snapshots)
443445
_, sarg, _ := fault.TriggerFault("replay error UT")
444446
if sarg != "" {
445447
err = moerr.NewInternalErrorNoCtxf("GC-REPLAY-GET-CHECKPOINT-DATA-ERROR %s", sarg)
@@ -462,12 +464,13 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
462464
c.checkpointCli.GetCatalog().GetUsageMemo().(*logtail.TNUsageMemo),
463465
ckpBatch,
464466
c.mutation.snapshotMeta,
465-
snapshots,
467+
accountSnapshots,
466468
pitrs,
467469
0)
468470
logutil.Info(
469471
"GC-REPLAY-COLLECT-SNAPSHOT-SIZE",
470472
zap.String("task", c.TaskNameLocked()),
473+
zap.Int("size", len(accountSnapshots)),
471474
zap.Duration("duration", time.Since(start)),
472475
zap.String("checkpoint", compacted.String()),
473476
zap.Int("count", ckpBatch.RowCount()),
@@ -774,7 +777,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked(
774777
ctx context.Context,
775778
checkpointLowWaterMark *types.TS,
776779
memoryBuffer *containers.OneSchemaBatchBuffer,
777-
snapshots *logtail.SnapshotInfo,
780+
accountSnapshots map[uint32][]types.TS,
778781
pitrs *logtail.PitrInfo,
779782
gcFileCount int,
780783
) (err error) {
@@ -885,7 +888,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked(
885888
c.checkpointCli.GetCatalog().GetUsageMemo().(*logtail.TNUsageMemo),
886889
newCkpData,
887890
c.mutation.snapshotMeta,
888-
snapshots,
891+
accountSnapshots,
889892
pitrs,
890893
gcFileCount)
891894
if newCkp == nil {
@@ -1066,9 +1069,10 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
10661069
memoryBuffer *containers.OneSchemaBatchBuffer,
10671070
) (err error) {
10681071
now := time.Now()
1069-
var snapshots *logtail.SnapshotInfo
1072+
var snapshots map[uint32]containers.Vector
10701073
var extraErrMsg string
10711074
defer func() {
1075+
logtail.CloseSnapshotList(snapshots)
10721076
logutil.Info(
10731077
"GC-TRACE-TRY-GC-AGAINST-GCKP",
10741078
zap.String("task", c.TaskNameLocked()),
@@ -1088,8 +1092,9 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
10881092
extraErrMsg = "GetSnapshot failed"
10891093
return
10901094
}
1095+
accountSnapshots := TransformToTSList(snapshots)
10911096
filesToGC, err := c.doGCAgainstGlobalCheckpointLocked(
1092-
ctx, gckp, snapshots, pitrs, memoryBuffer,
1097+
ctx, gckp, accountSnapshots, pitrs, memoryBuffer,
10931098
)
10941099
if err != nil {
10951100
extraErrMsg = "doGCAgainstGlobalCheckpointLocked failed"
@@ -1127,7 +1132,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
11271132
waterMark = scanMark
11281133
}
11291134
err = c.mergeCheckpointFilesLocked(
1130-
ctx, &waterMark, memoryBuffer, snapshots, pitrs, len(filesToGC),
1135+
ctx, &waterMark, memoryBuffer, accountSnapshots, pitrs, len(filesToGC),
11311136
)
11321137
if err != nil {
11331138
extraErrMsg = fmt.Sprintf("mergeCheckpointFilesLocked %v failed", waterMark.ToString())
@@ -1140,7 +1145,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
11401145
func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked(
11411146
ctx context.Context,
11421147
gckp *checkpoint.CheckpointEntry,
1143-
snapshots *logtail.SnapshotInfo,
1148+
accountSnapshots map[uint32][]types.TS,
11441149
pitrs *logtail.PitrInfo,
11451150
memoryBuffer *containers.OneSchemaBatchBuffer,
11461151
) ([]string, error) {
@@ -1184,7 +1189,7 @@ func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked(
11841189
if filesToGC, metafile, err = scannedWindow.ExecuteGlobalCheckpointBasedGC(
11851190
ctx,
11861191
gckp,
1187-
snapshots,
1192+
accountSnapshots,
11881193
pitrs,
11891194
c.mutation.snapshotMeta,
11901195
iscp,
@@ -1227,7 +1232,7 @@ func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked(
12271232
now = time.Now()
12281233
// TODO:
12291234
c.updateGCWaterMark(gckp)
1230-
c.mutation.snapshotMeta.MergeTableInfo(snapshots, pitrs)
1235+
c.mutation.snapshotMeta.MergeTableInfo(accountSnapshots, pitrs)
12311236
mergeDuration = time.Since(now)
12321237
return filesToGC, nil
12331238
}
@@ -1319,7 +1324,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13191324
// TODO
13201325
return err
13211326
}
1322-
var snapshots *logtail.SnapshotInfo
1327+
var snapshots map[uint32]containers.Vector
13231328
snapshots, err = c.GetSnapshotsLocked()
13241329
if err != nil {
13251330
logutil.Error(
@@ -1329,6 +1334,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13291334
)
13301335
return err
13311336
}
1337+
defer logtail.CloseSnapshotList(snapshots)
13321338
var pitr *logtail.PitrInfo
13331339
pitr, err = c.GetPITRsLocked(c.ctx)
13341340
if err != nil {
@@ -1342,6 +1348,8 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13421348

13431349
mergeWindow := c.GetScannedWindowLocked().Clone()
13441350
defer mergeWindow.Close()
1351+
1352+
accoutSnapshots := TransformToTSList(snapshots)
13451353
logutil.Info(
13461354
"GC-TRACE-MERGE-WINDOW",
13471355
zap.String("task", c.TaskNameLocked()),
@@ -1354,7 +1362,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13541362
if _, _, err = mergeWindow.ExecuteGlobalCheckpointBasedGC(
13551363
c.ctx,
13561364
gCkp,
1357-
snapshots,
1365+
accoutSnapshots,
13581366
pitr,
13591367
c.mutation.snapshotMeta,
13601368
iscp,
@@ -1378,7 +1386,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13781386
if _, _, err = debugWindow.ExecuteGlobalCheckpointBasedGC(
13791387
c.ctx,
13801388
gCkp,
1381-
snapshots,
1389+
accoutSnapshots,
13821390
pitr,
13831391
c.mutation.snapshotMeta,
13841392
iscp,
@@ -1460,7 +1468,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
14601468
}
14611469
collectObjectsFromCheckpointData(c.ctx, ckpReader, cptCkpObjects)
14621470

1463-
tList, pList := c.mutation.snapshotMeta.AccountToTableSnapshots(snapshots, pitr)
1471+
tList, pList := c.mutation.snapshotMeta.AccountToTableSnapshots(accoutSnapshots, pitr)
14641472
for name, tables := range ickpObjects {
14651473
for _, entry := range tables {
14661474
if cptCkpObjects[name] != nil {
@@ -1832,13 +1840,12 @@ func (c *checkpointCleaner) mutUpdateSnapshotMetaLocked(
18321840
)
18331841
}
18341842

1835-
func (c *checkpointCleaner) GetSnapshots() (*logtail.SnapshotInfo, error) {
1843+
func (c *checkpointCleaner) GetSnapshots() (map[uint32]containers.Vector, error) {
18361844
c.mutation.Lock()
18371845
defer c.mutation.Unlock()
18381846
return c.mutation.snapshotMeta.GetSnapshot(c.ctx, c.sid, c.fs, c.mp)
18391847
}
1840-
1841-
func (c *checkpointCleaner) GetSnapshotsLocked() (*logtail.SnapshotInfo, error) {
1848+
func (c *checkpointCleaner) GetSnapshotsLocked() (map[uint32]containers.Vector, error) {
18421849
return c.mutation.snapshotMeta.GetSnapshot(c.ctx, c.sid, c.fs, c.mp)
18431850
}
18441851
func (c *checkpointCleaner) GetTablePK(tid uint64) string {

pkg/vm/engine/tae/db/gc/v3/exec_v1.go

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,15 @@ type CheckpointBasedGCJob struct {
6363
coarseProbility float64
6464
canGCCacheSize int
6565
}
66-
sourcer engine.BaseReader
67-
snapshotMeta *logtail.SnapshotMeta
68-
snapshots *logtail.SnapshotInfo
69-
iscpTables map[uint64]types.TS
70-
pitr *logtail.PitrInfo
71-
ts *types.TS
72-
globalCkpLoc objectio.Location
73-
globalCkpVer uint32
74-
checkpointCli checkpoint.Runner // Added to access catalog
66+
sourcer engine.BaseReader
67+
snapshotMeta *logtail.SnapshotMeta
68+
accountSnapshots map[uint32][]types.TS
69+
iscpTables map[uint64]types.TS
70+
pitr *logtail.PitrInfo
71+
ts *types.TS
72+
globalCkpLoc objectio.Location
73+
globalCkpVer uint32
74+
checkpointCli checkpoint.Runner // Added to access catalog
7575

7676
result struct {
7777
vecToGC *vector.Vector
@@ -85,7 +85,7 @@ func NewCheckpointBasedGCJob(
8585
gckpVersion uint32,
8686
sourcer engine.BaseReader,
8787
pitr *logtail.PitrInfo,
88-
snapshots *logtail.SnapshotInfo,
88+
accountSnapshots map[uint32][]types.TS,
8989
iscpTables map[uint64]types.TS,
9090
snapshotMeta *logtail.SnapshotMeta,
9191
checkpointCli checkpoint.Runner,
@@ -97,15 +97,15 @@ func NewCheckpointBasedGCJob(
9797
opts ...GCJobExecutorOption,
9898
) *CheckpointBasedGCJob {
9999
e := &CheckpointBasedGCJob{
100-
sourcer: sourcer,
101-
snapshotMeta: snapshotMeta,
102-
snapshots: snapshots,
103-
pitr: pitr,
104-
ts: ts,
105-
globalCkpLoc: globalCkpLoc,
106-
globalCkpVer: gckpVersion,
107-
iscpTables: iscpTables,
108-
checkpointCli: checkpointCli,
100+
sourcer: sourcer,
101+
snapshotMeta: snapshotMeta,
102+
accountSnapshots: accountSnapshots,
103+
pitr: pitr,
104+
ts: ts,
105+
globalCkpLoc: globalCkpLoc,
106+
globalCkpVer: gckpVersion,
107+
iscpTables: iscpTables,
108+
checkpointCli: checkpointCli,
109109
}
110110
for _, opt := range opts {
111111
opt(e)
@@ -121,7 +121,7 @@ func (e *CheckpointBasedGCJob) Close() error {
121121
e.sourcer = nil
122122
}
123123
e.snapshotMeta = nil
124-
e.snapshots = nil
124+
e.accountSnapshots = nil
125125
e.pitr = nil
126126
e.ts = nil
127127
e.globalCkpLoc = nil
@@ -171,7 +171,7 @@ func (e *CheckpointBasedGCJob) Execute(ctx context.Context) error {
171171

172172
fineFilter, err := MakeSnapshotAndPitrFineFilter(
173173
e.ts,
174-
e.snapshots,
174+
e.accountSnapshots,
175175
e.pitr,
176176
e.snapshotMeta,
177177
transObjects,
@@ -356,7 +356,7 @@ func buildTableExistenceMap(snapshotMeta *logtail.SnapshotMeta, checkpointCli ch
356356

357357
func MakeSnapshotAndPitrFineFilter(
358358
ts *types.TS,
359-
snapshots *logtail.SnapshotInfo,
359+
accountSnapshots map[uint32][]types.TS,
360360
pitrs *logtail.PitrInfo,
361361
snapshotMeta *logtail.SnapshotMeta,
362362
transObjects map[string]map[uint64]*ObjectEntry,
@@ -373,7 +373,7 @@ func MakeSnapshotAndPitrFineFilter(
373373
}
374374

375375
tableSnapshots, tablePitrs := snapshotMeta.AccountToTableSnapshots(
376-
snapshots,
376+
accountSnapshots,
377377
pitrs,
378378
)
379379
return func(
@@ -393,7 +393,7 @@ func MakeSnapshotAndPitrFineFilter(
393393
createTS := createTSs[i]
394394
deleteTS := deleteTSs[i]
395395

396-
sp := tableSnapshots[tableID]
396+
snapshots := tableSnapshots[tableID]
397397
pitr := tablePitrs[tableID]
398398

399399
if transObjects[name] != nil {
@@ -407,7 +407,7 @@ func MakeSnapshotAndPitrFineFilter(
407407
}
408408

409409
if !logtail.ObjectIsSnapshotRefers(
410-
entry.stats, pitr, &entry.createTS, &entry.dropTS, sp,
410+
entry.stats, pitr, &entry.createTS, &entry.dropTS, snapshots,
411411
) {
412412
if iscpTables == nil {
413413
bm.Add(uint64(i))
@@ -436,7 +436,7 @@ func MakeSnapshotAndPitrFineFilter(
436436
continue
437437
}
438438
if !logtail.ObjectIsSnapshotRefers(
439-
&stats, pitr, &createTS, &deleteTS, sp,
439+
&stats, pitr, &createTS, &deleteTS, snapshots,
440440
) {
441441
if iscpTables == nil {
442442
bm.Add(uint64(i))

pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
"github.com/matrixorigin/matrixone/pkg/common/mpool"
2121
"github.com/matrixorigin/matrixone/pkg/container/types"
22+
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
2223
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
2324
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
2425
)
@@ -138,7 +139,7 @@ func (c *MockCleaner) GetMPool() *mpool.MPool {
138139
return nil
139140
}
140141

141-
func (c *MockCleaner) GetSnapshots() (*logtail.SnapshotInfo, error) {
142+
func (c *MockCleaner) GetSnapshots() (map[uint32]containers.Vector, error) {
142143
return nil, nil
143144
}
144145

pkg/vm/engine/tae/db/gc/v3/types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/matrixorigin/matrixone/pkg/container/vector"
2424
"github.com/matrixorigin/matrixone/pkg/objectio"
2525
"github.com/matrixorigin/matrixone/pkg/objectio/ioutil"
26+
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
2627
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
2728
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
2829
)
@@ -147,7 +148,7 @@ type Cleaner interface {
147148
DisableGC()
148149
GCEnabled() bool
149150
GetMPool() *mpool.MPool
150-
GetSnapshots() (*logtail.SnapshotInfo, error)
151+
GetSnapshots() (map[uint32]containers.Vector, error)
151152
GetDetails(ctx context.Context) (map[uint32]*TableStats, error)
152153
Verify(ctx context.Context) string
153154
ISCPTables() (map[uint64]types.TS, error)

pkg/vm/engine/tae/db/gc/v3/util.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919

2020
"github.com/matrixorigin/matrixone/pkg/common/mpool"
2121
"github.com/matrixorigin/matrixone/pkg/container/batch"
22+
"github.com/matrixorigin/matrixone/pkg/container/types"
23+
"github.com/matrixorigin/matrixone/pkg/container/vector"
2224
"github.com/matrixorigin/matrixone/pkg/fileservice"
2325
"github.com/matrixorigin/matrixone/pkg/objectio"
2426
"github.com/matrixorigin/matrixone/pkg/pb/plan"
@@ -70,6 +72,16 @@ func MakeLoadFunc(
7072
}, releaseFn
7173
}
7274

75+
func TransformToTSList(
76+
fromKV map[uint32]containers.Vector,
77+
) map[uint32][]types.TS {
78+
newKV := make(map[uint32][]types.TS, len(fromKV))
79+
for k, v := range fromKV {
80+
newKV[k] = vector.MustFixedColWithTypeCheck[types.TS](v.GetDownstreamVector())
81+
}
82+
return newKV
83+
}
84+
7385
func MakeGCWindowBuffer(size int) *containers.OneSchemaBatchBuffer {
7486
return containers.NewOneSchemaBatchBuffer(
7587
size, ObjectTableAttrs, ObjectTableTypes, false,

pkg/vm/engine/tae/db/gc/v3/window.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (w *GCWindow) MakeFilesReader(
120120
func (w *GCWindow) ExecuteGlobalCheckpointBasedGC(
121121
ctx context.Context,
122122
gCkp *checkpoint.CheckpointEntry,
123-
snapshots *logtail.SnapshotInfo,
123+
accountSnapshots map[uint32][]types.TS,
124124
pitrs *logtail.PitrInfo,
125125
snapshotMeta *logtail.SnapshotMeta,
126126
iscpTables map[uint64]types.TS,
@@ -142,7 +142,7 @@ func (w *GCWindow) ExecuteGlobalCheckpointBasedGC(
142142
gCkp.GetVersion(),
143143
sourcer,
144144
pitrs,
145-
snapshots,
145+
accountSnapshots,
146146
iscpTables,
147147
snapshotMeta,
148148
checkpointCli,

pkg/vm/engine/tae/db/gc/v3/window_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -243,11 +243,11 @@ func NewMockSnapshotMeta() *MockSnapshotMeta {
243243

244244
// AccountToTableSnapshots mocks the same method in logtail.SnapshotMeta
245245
func (m *MockSnapshotMeta) AccountToTableSnapshots(
246-
snapshots *logtail.SnapshotInfo,
246+
accountSnapshots map[uint32][]types.TS,
247247
pitrs *logtail.PitrInfo,
248-
) (map[uint64][]types.TS, map[uint64]*types.TS) {
248+
) (map[uint64][]types.TS, map[uint64][]types.TS) {
249249
tableSnapshots := make(map[uint64][]types.TS)
250-
tablePitrs := make(map[uint64]*types.TS)
250+
tablePitrs := make(map[uint64][]types.TS)
251251
return tableSnapshots, tablePitrs
252252
}
253253

0 commit comments

Comments
 (0)