diff --git a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go index 5226d67503600..5b20b06902dd3 100644 --- a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go +++ b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go @@ -413,7 +413,7 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) { ) return } - var snapshots map[uint32]containers.Vector + var snapshots *logtail.SnapshotInfo var pitrs *logtail.PitrInfo pitrs, err = c.GetPITRsLocked(ctx) if err != nil { @@ -437,8 +437,6 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) { ) return } - accountSnapshots := TransformToTSList(snapshots) - logtail.CloseSnapshotList(snapshots) _, sarg, _ := fault.TriggerFault("replay error UT") if sarg != "" { err = moerr.NewInternalErrorNoCtxf("GC-REPLAY-GET-CHECKPOINT-DATA-ERROR %s", sarg) @@ -461,13 +459,12 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) { c.checkpointCli.GetCatalog().GetUsageMemo().(*logtail.TNUsageMemo), ckpBatch, c.mutation.snapshotMeta, - accountSnapshots, + snapshots, pitrs, 0) logutil.Info( "GC-REPLAY-COLLECT-SNAPSHOT-SIZE", zap.String("task", c.TaskNameLocked()), - zap.Int("size", len(accountSnapshots)), zap.Duration("duration", time.Since(start)), zap.String("checkpoint", compacted.String()), zap.Int("count", ckpBatch.RowCount()), @@ -774,7 +771,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( ctx context.Context, checkpointLowWaterMark *types.TS, memoryBuffer *containers.OneSchemaBatchBuffer, - accountSnapshots map[uint32][]types.TS, + snapshots *logtail.SnapshotInfo, pitrs *logtail.PitrInfo, gcFileCount int, ) (err error) { @@ -885,7 +882,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( c.checkpointCli.GetCatalog().GetUsageMemo().(*logtail.TNUsageMemo), newCkpData, c.mutation.snapshotMeta, - accountSnapshots, + snapshots, pitrs, gcFileCount) if newCkp == nil { @@ -1066,10 +1063,9 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked( memoryBuffer *containers.OneSchemaBatchBuffer, ) (err error) { now := time.Now() - var snapshots map[uint32]containers.Vector + var snapshots *logtail.SnapshotInfo var extraErrMsg string defer func() { - logtail.CloseSnapshotList(snapshots) logutil.Info( "GC-TRACE-TRY-GC-AGAINST-GCKP", zap.String("task", c.TaskNameLocked()), @@ -1089,9 +1085,8 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked( extraErrMsg = "GetSnapshot failed" return } - accountSnapshots := TransformToTSList(snapshots) filesToGC, err := c.doGCAgainstGlobalCheckpointLocked( - ctx, gckp, accountSnapshots, pitrs, memoryBuffer, + ctx, gckp, snapshots, pitrs, memoryBuffer, ) if err != nil { extraErrMsg = "doGCAgainstGlobalCheckpointLocked failed" @@ -1129,7 +1124,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked( waterMark = scanMark } err = c.mergeCheckpointFilesLocked( - ctx, &waterMark, memoryBuffer, accountSnapshots, pitrs, len(filesToGC), + ctx, &waterMark, memoryBuffer, snapshots, pitrs, len(filesToGC), ) if err != nil { extraErrMsg = fmt.Sprintf("mergeCheckpointFilesLocked %v failed", waterMark.ToString()) @@ -1142,7 +1137,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked( func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked( ctx context.Context, gckp *checkpoint.CheckpointEntry, - accountSnapshots map[uint32][]types.TS, + snapshots *logtail.SnapshotInfo, pitrs *logtail.PitrInfo, memoryBuffer *containers.OneSchemaBatchBuffer, ) ([]string, error) { @@ -1181,7 +1176,7 @@ func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked( if filesToGC, metafile, err = scannedWindow.ExecuteGlobalCheckpointBasedGC( ctx, gckp, - accountSnapshots, + snapshots, pitrs, c.mutation.snapshotMeta, memoryBuffer, @@ -1222,7 +1217,7 @@ func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked( now = time.Now() // TODO: c.updateGCWaterMark(gckp) - c.mutation.snapshotMeta.MergeTableInfo(accountSnapshots, pitrs) + c.mutation.snapshotMeta.MergeTableInfo(snapshots, pitrs) mergeDuration = time.Since(now) return filesToGC, nil } @@ -1314,7 +1309,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error { // TODO return err } - var snapshots map[uint32]containers.Vector + var snapshots *logtail.SnapshotInfo snapshots, err = c.GetSnapshotsLocked() if err != nil { logutil.Error( @@ -1324,7 +1319,6 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error { ) return err } - defer logtail.CloseSnapshotList(snapshots) var pitr *logtail.PitrInfo pitr, err = c.GetPITRsLocked(c.ctx) if err != nil { @@ -1338,8 +1332,6 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error { mergeWindow := c.GetScannedWindowLocked().Clone() defer mergeWindow.Close() - - accoutSnapshots := TransformToTSList(snapshots) logutil.Info( "GC-TRACE-MERGE-WINDOW", zap.String("task", c.TaskNameLocked()), @@ -1348,7 +1340,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error { if _, _, err = mergeWindow.ExecuteGlobalCheckpointBasedGC( c.ctx, gCkp, - accoutSnapshots, + snapshots, pitr, c.mutation.snapshotMeta, buffer, @@ -1370,7 +1362,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error { if _, _, err = debugWindow.ExecuteGlobalCheckpointBasedGC( c.ctx, gCkp, - accoutSnapshots, + snapshots, pitr, c.mutation.snapshotMeta, buffer, @@ -1450,7 +1442,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error { } collectObjectsFromCheckpointData(c.ctx, ckpReader, cptCkpObjects) - tList, pList := c.mutation.snapshotMeta.AccountToTableSnapshots(accoutSnapshots, pitr) + tList, pList := c.mutation.snapshotMeta.AccountToTableSnapshots(snapshots, pitr) for name, tables := range ickpObjects { for _, entry := range tables { if cptCkpObjects[name] != nil { @@ -1822,12 +1814,13 @@ func (c *checkpointCleaner) mutUpdateSnapshotMetaLocked( ) } -func (c *checkpointCleaner) GetSnapshots() (map[uint32]containers.Vector, error) { +func (c *checkpointCleaner) GetSnapshots() (*logtail.SnapshotInfo, error) { c.mutation.Lock() defer c.mutation.Unlock() return c.mutation.snapshotMeta.GetSnapshot(c.ctx, c.sid, c.fs, c.mp) } -func (c *checkpointCleaner) GetSnapshotsLocked() (map[uint32]containers.Vector, error) { + +func (c *checkpointCleaner) GetSnapshotsLocked() (*logtail.SnapshotInfo, error) { return c.mutation.snapshotMeta.GetSnapshot(c.ctx, c.sid, c.fs, c.mp) } func (c *checkpointCleaner) GetTablePK(tid uint64) string { diff --git a/pkg/vm/engine/tae/db/gc/v3/exec_v1.go b/pkg/vm/engine/tae/db/gc/v3/exec_v1.go index b7496c687f2c5..8a25a66d979a8 100644 --- a/pkg/vm/engine/tae/db/gc/v3/exec_v1.go +++ b/pkg/vm/engine/tae/db/gc/v3/exec_v1.go @@ -63,13 +63,13 @@ type CheckpointBasedGCJob struct { coarseProbility float64 canGCCacheSize int } - sourcer engine.BaseReader - snapshotMeta *logtail.SnapshotMeta - accountSnapshots map[uint32][]types.TS - pitr *logtail.PitrInfo - ts *types.TS - globalCkpLoc objectio.Location - globalCkpVer uint32 + sourcer engine.BaseReader + snapshotMeta *logtail.SnapshotMeta + snapshots *logtail.SnapshotInfo + pitr *logtail.PitrInfo + ts *types.TS + globalCkpLoc objectio.Location + globalCkpVer uint32 result struct { vecToGC *vector.Vector @@ -83,7 +83,7 @@ func NewCheckpointBasedGCJob( gckpVersion uint32, sourcer engine.BaseReader, pitr *logtail.PitrInfo, - accountSnapshots map[uint32][]types.TS, + snapshots *logtail.SnapshotInfo, snapshotMeta *logtail.SnapshotMeta, buffer *containers.OneSchemaBatchBuffer, isOwner bool, @@ -93,13 +93,13 @@ func NewCheckpointBasedGCJob( opts ...GCJobExecutorOption, ) *CheckpointBasedGCJob { e := &CheckpointBasedGCJob{ - sourcer: sourcer, - snapshotMeta: snapshotMeta, - accountSnapshots: accountSnapshots, - pitr: pitr, - ts: ts, - globalCkpLoc: globalCkpLoc, - globalCkpVer: gckpVersion, + sourcer: sourcer, + snapshotMeta: snapshotMeta, + snapshots: snapshots, + pitr: pitr, + ts: ts, + globalCkpLoc: globalCkpLoc, + globalCkpVer: gckpVersion, } for _, opt := range opts { opt(e) @@ -115,7 +115,7 @@ func (e *CheckpointBasedGCJob) Close() error { e.sourcer = nil } e.snapshotMeta = nil - e.accountSnapshots = nil + e.snapshots = nil e.pitr = nil e.ts = nil e.globalCkpLoc = nil @@ -165,7 +165,7 @@ func (e *CheckpointBasedGCJob) Execute(ctx context.Context) error { fineFilter, err := MakeSnapshotAndPitrFineFilter( e.ts, - e.accountSnapshots, + e.snapshots, e.pitr, e.snapshotMeta, transObjects, @@ -314,7 +314,7 @@ func MakeBloomfilterCoarseFilter( func MakeSnapshotAndPitrFineFilter( ts *types.TS, - accountSnapshots map[uint32][]types.TS, + snapshots *logtail.SnapshotInfo, pitrs *logtail.PitrInfo, snapshotMeta *logtail.SnapshotMeta, transObjects map[string]map[uint64]*ObjectEntry, @@ -323,7 +323,7 @@ func MakeSnapshotAndPitrFineFilter( err error, ) { tableSnapshots, tablePitrs := snapshotMeta.AccountToTableSnapshots( - accountSnapshots, + snapshots, pitrs, ) return func( @@ -343,7 +343,7 @@ func MakeSnapshotAndPitrFineFilter( createTS := createTSs[i] deleteTS := deleteTSs[i] - snapshots := tableSnapshots[tableID] + sp := tableSnapshots[tableID] pitr := tablePitrs[tableID] if transObjects[name] != nil { @@ -357,7 +357,7 @@ func MakeSnapshotAndPitrFineFilter( } if !logtail.ObjectIsSnapshotRefers( - entry.stats, pitr, &entry.createTS, &entry.dropTS, snapshots, + entry.stats, pitr, &entry.createTS, &entry.dropTS, sp, ) { bm.Add(uint64(i)) } @@ -374,7 +374,7 @@ func MakeSnapshotAndPitrFineFilter( continue } if !logtail.ObjectIsSnapshotRefers( - &stats, pitr, &createTS, &deleteTS, snapshots, + &stats, pitr, &createTS, &deleteTS, sp, ) { bm.Add(uint64(i)) } diff --git a/pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go b/pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go index 61f5a771cb488..f127ce7c86569 100644 --- a/pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go +++ b/pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go @@ -19,7 +19,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/types" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" ) @@ -139,7 +138,7 @@ func (c *MockCleaner) GetMPool() *mpool.MPool { return nil } -func (c *MockCleaner) GetSnapshots() (map[uint32]containers.Vector, error) { +func (c *MockCleaner) GetSnapshots() (*logtail.SnapshotInfo, error) { return nil, nil } diff --git a/pkg/vm/engine/tae/db/gc/v3/types.go b/pkg/vm/engine/tae/db/gc/v3/types.go index 4cf053744b8b6..d2ededa0867e2 100644 --- a/pkg/vm/engine/tae/db/gc/v3/types.go +++ b/pkg/vm/engine/tae/db/gc/v3/types.go @@ -23,7 +23,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/objectio/ioutil" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" ) @@ -148,7 +147,7 @@ type Cleaner interface { DisableGC() GCEnabled() bool GetMPool() *mpool.MPool - GetSnapshots() (map[uint32]containers.Vector, error) + GetSnapshots() (*logtail.SnapshotInfo, error) GetDetails(ctx context.Context) (map[uint32]*TableStats, error) Verify(ctx context.Context) string diff --git a/pkg/vm/engine/tae/db/gc/v3/util.go b/pkg/vm/engine/tae/db/gc/v3/util.go index 79444ee5676b4..407e8175f3617 100644 --- a/pkg/vm/engine/tae/db/gc/v3/util.go +++ b/pkg/vm/engine/tae/db/gc/v3/util.go @@ -19,8 +19,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/container/types" - "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/plan" @@ -72,16 +70,6 @@ func MakeLoadFunc( }, releaseFn } -func TransformToTSList( - fromKV map[uint32]containers.Vector, -) map[uint32][]types.TS { - newKV := make(map[uint32][]types.TS, len(fromKV)) - for k, v := range fromKV { - newKV[k] = vector.MustFixedColWithTypeCheck[types.TS](v.GetDownstreamVector()) - } - return newKV -} - func MakeGCWindowBuffer(size int) *containers.OneSchemaBatchBuffer { return containers.NewOneSchemaBatchBuffer( size, ObjectTableAttrs, ObjectTableTypes, false, diff --git a/pkg/vm/engine/tae/db/gc/v3/window.go b/pkg/vm/engine/tae/db/gc/v3/window.go index b34180f553f23..9179252a3df27 100644 --- a/pkg/vm/engine/tae/db/gc/v3/window.go +++ b/pkg/vm/engine/tae/db/gc/v3/window.go @@ -120,7 +120,7 @@ func (w *GCWindow) MakeFilesReader( func (w *GCWindow) ExecuteGlobalCheckpointBasedGC( ctx context.Context, gCkp *checkpoint.CheckpointEntry, - accountSnapshots map[uint32][]types.TS, + snapshots *logtail.SnapshotInfo, pitrs *logtail.PitrInfo, snapshotMeta *logtail.SnapshotMeta, buffer *containers.OneSchemaBatchBuffer, @@ -140,7 +140,7 @@ func (w *GCWindow) ExecuteGlobalCheckpointBasedGC( gCkp.GetVersion(), sourcer, pitrs, - accountSnapshots, + snapshots, snapshotMeta, buffer, false, diff --git a/pkg/vm/engine/tae/db/gc/v3/window_test.go b/pkg/vm/engine/tae/db/gc/v3/window_test.go index b14fdb2ff6c84..2ad2e452fbe44 100644 --- a/pkg/vm/engine/tae/db/gc/v3/window_test.go +++ b/pkg/vm/engine/tae/db/gc/v3/window_test.go @@ -243,11 +243,11 @@ func NewMockSnapshotMeta() *MockSnapshotMeta { // AccountToTableSnapshots mocks the same method in logtail.SnapshotMeta func (m *MockSnapshotMeta) AccountToTableSnapshots( - accountSnapshots map[uint32][]types.TS, + snapshots *logtail.SnapshotInfo, pitrs *logtail.PitrInfo, -) (map[uint64][]types.TS, map[uint64][]types.TS) { +) (map[uint64][]types.TS, map[uint64]*types.TS) { tableSnapshots := make(map[uint64][]types.TS) - tablePitrs := make(map[uint64][]types.TS) + tablePitrs := make(map[uint64]*types.TS) return tableSnapshots, tablePitrs } diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index dd2d5dc965c8e..5db47a82ff40a 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -7239,12 +7239,6 @@ func TestSnapshotMeta(t *testing.T) { } //db.DiskCleaner.GetCleaner().DisableGC() - snapshots := make([]int64, 0) - for i := 0; i < 10; i++ { - time.Sleep(20 * time.Millisecond) - snapshot := time.Now().UTC().Unix() - snapshots = append(snapshots, snapshot) - } testutils.WaitExpect(10000, func() bool { return testutil.AllCheckpointsFinished(db) }) @@ -7254,6 +7248,12 @@ func TestSnapshotMeta(t *testing.T) { tae.Restart(ctx) db = tae.DB db.DiskCleaner.GetCleaner().DisableGC() + snapshots := make([]int64, 0) + for i := 0; i < 10; i++ { + time.Sleep(20 * time.Millisecond) + snapshot := time.Now().UTC().UnixNano() + snapshots = append(snapshots, snapshot) + } for i, snapshot := range snapshots { attrs := []string{"col0", "col1", "ts", "col3", "col4", "col5", "col6", "id"} vecTypes := []types.Type{types.T_uint64.ToType(), @@ -7336,11 +7336,7 @@ func TestSnapshotMeta(t *testing.T) { assert.NotNil(t, minMerged) snaps, err := db.DiskCleaner.GetCleaner().GetSnapshots() assert.Nil(t, err) - defer logtail.CloseSnapshotList(snaps) - assert.Equal(t, 1, len(snaps)) - for _, snap := range snaps { - assert.Equal(t, len(snapshots), snap.Length()) - } + assert.Equal(t, len(snapshots), len(snaps.ToTsList())) err = db.DiskCleaner.GetCleaner().DoCheck(ctx) assert.Nil(t, err) tae.RestartDisableGC(ctx) @@ -7361,11 +7357,7 @@ func TestSnapshotMeta(t *testing.T) { assert.True(t, end.GE(&minEnd)) snaps, err = db.DiskCleaner.GetCleaner().GetSnapshots() assert.Nil(t, err) - defer logtail.CloseSnapshotList(snaps) - assert.Equal(t, 1, len(snaps)) - for _, snap := range snaps { - assert.Equal(t, len(snapshots), snap.Length()) - } + assert.Equal(t, len(snapshots), len(snaps.ToTsList())) err = db.DiskCleaner.GetCleaner().DoCheck(ctx) assert.Nil(t, err) } diff --git a/pkg/vm/engine/tae/logtail/snapshot.go b/pkg/vm/engine/tae/logtail/snapshot.go index 67b44787d8d2a..f305621328e75 100644 --- a/pkg/vm/engine/tae/logtail/snapshot.go +++ b/pkg/vm/engine/tae/logtail/snapshot.go @@ -46,6 +46,8 @@ const ( SnapshotTypeIdx types.Enum = iota SnapshotTypeCluster SnapshotTypeAccount + SnapshotTypeDatabase + SnapshotTypeTable ) // mo_snapshot's schema @@ -170,86 +172,168 @@ type tableInfo struct { pk string } -type PitrInfo struct { - cluster types.TS - account map[uint32]types.TS - database map[uint64]types.TS - tables map[uint64]types.TS +// SnapshotInfo represents snapshot information at different levels +// Shared structure for both PITR and Snapshot functionality +type SnapshotInfo struct { + cluster []types.TS + account map[uint32][]types.TS + database map[uint64][]types.TS + tables map[uint64][]types.TS } -func (p *PitrInfo) IsEmpty() bool { - return p.cluster.IsEmpty() && +// PitrInfo is an alias for backward compatibility +type PitrInfo = SnapshotInfo + +func NewPitrInfo() *PitrInfo { + return &PitrInfo{ + cluster: make([]types.TS, 1), + account: make(map[uint32][]types.TS), + database: make(map[uint64][]types.TS), + tables: make(map[uint64][]types.TS), + } +} + +func NewSnapshotInfo() *SnapshotInfo { + return &SnapshotInfo{ + cluster: make([]types.TS, 0), + account: make(map[uint32][]types.TS), + database: make(map[uint64][]types.TS), + tables: make(map[uint64][]types.TS), + } +} + +func (p *SnapshotInfo) IsEmpty() bool { + return len(p.cluster) == 0 && len(p.account) == 0 && len(p.database) == 0 && len(p.tables) == 0 } -func (p *PitrInfo) GetTS( +// GetTS returns the earliest applicable timestamp for PITR usage +// For PITR, we only need the first (earliest) timestamp from each level +func (p *SnapshotInfo) GetTS( accountID uint32, dbID uint64, tableID uint64, ) (ts types.TS) { - ts = p.cluster - accountTS := p.account[accountID] - if !accountTS.IsEmpty() && (ts.IsEmpty() || accountTS.LT(&ts)) { - ts = accountTS + // Get the first cluster timestamp (for PITR) + if len(p.cluster) > 0 { + ts = p.cluster[0] } - dbTS := p.database[dbID] - if !dbTS.IsEmpty() && (ts.IsEmpty() || dbTS.LT(&ts)) { - ts = dbTS + // Get the first account timestamp + if accountTSList := p.account[accountID]; len(accountTSList) > 0 { + accountTS := accountTSList[0] + if ts.IsEmpty() || accountTS.LT(&ts) { + ts = accountTS + } } - tableTS := p.tables[tableID] - if !tableTS.IsEmpty() && (ts.IsEmpty() || tableTS.LT(&ts)) { - ts = tableTS + // Get the first database timestamp + if dbTSList := p.database[dbID]; len(dbTSList) > 0 { + dbTS := dbTSList[0] + if ts.IsEmpty() || dbTS.LT(&ts) { + ts = dbTS + } + } + + // Get the first table timestamp + if tableTSList := p.tables[tableID]; len(tableTSList) > 0 { + tableTS := tableTSList[0] + if ts.IsEmpty() || tableTS.LT(&ts) { + ts = tableTS + } } return } -func (p *PitrInfo) MinTS() (ts types.TS) { - if !p.cluster.IsEmpty() { - ts = p.cluster +// GetSnapshotsByLevel returns all snapshots for a specific level and object ID +func (p *SnapshotInfo) GetSnapshotsByLevel(level string, objID uint64) []types.TS { + switch level { + case PitrLevelCluster: + return p.cluster + case PitrLevelAccount: + return p.account[uint32(objID)] + case PitrLevelDatabase: + return p.database[objID] + case PitrLevelTable: + return p.tables[objID] + default: + return nil + } +} + +func (p *SnapshotInfo) MinTS() (ts types.TS) { + // find the minimum cluster ts + for _, clusterTS := range p.cluster { + if ts.IsEmpty() || clusterTS.LT(&ts) { + ts = clusterTS + } } // find the minimum account ts - for _, p := range p.account { - if ts.IsEmpty() || p.LT(&ts) { - ts = p + for _, tsList := range p.account { + for _, accountTS := range tsList { + if ts.IsEmpty() || accountTS.LT(&ts) { + ts = accountTS + } } } // find the minimum database ts - for _, p := range p.database { - if ts.IsEmpty() || p.LT(&ts) { - ts = p + for _, tsList := range p.database { + for _, dbTS := range tsList { + if ts.IsEmpty() || dbTS.LT(&ts) { + ts = dbTS + } } } // find the minimum table ts - for _, p := range p.tables { - if ts.IsEmpty() || p.LT(&ts) { - ts = p + for _, tsList := range p.tables { + for _, tableTS := range tsList { + if ts.IsEmpty() || tableTS.LT(&ts) { + ts = tableTS + } } } return } -func (p *PitrInfo) ToTsList() []types.TS { - tsList := make([]types.TS, 0, len(p.account)+len(p.database)+len(p.tables)+1) - for _, ts := range p.account { - tsList = append(tsList, ts) +func (p *SnapshotInfo) ToTsList() []types.TS { + var totalCount int + totalCount += len(p.cluster) + for _, tsList := range p.account { + totalCount += len(tsList) } - for _, ts := range p.database { - tsList = append(tsList, ts) + for _, tsList := range p.database { + totalCount += len(tsList) } - for _, ts := range p.tables { - tsList = append(tsList, ts) + for _, tsList := range p.tables { + totalCount += len(tsList) } - if !p.cluster.IsEmpty() { - tsList = append(tsList, p.cluster) + + result := make([]types.TS, 0, totalCount) + + // Add cluster timestamps + result = append(result, p.cluster...) + + // Add account timestamps + for _, tsList := range p.account { + result = append(result, tsList...) + } + + // Add database timestamps + for _, tsList := range p.database { + result = append(result, tsList...) } - return tsList + + // Add table timestamps + for _, tsList := range p.tables { + result = append(result, tsList...) + } + + return result } type SnapshotMeta struct { @@ -316,17 +400,6 @@ func copyObjectsLocked( return newMap } -func (sm *SnapshotMeta) copyTablesLocked() map[uint32]map[uint64]*tableInfo { - tables := make(map[uint32]map[uint64]*tableInfo) - for k, v := range sm.tables { - tables[k] = make(map[uint64]*tableInfo) - for kk, vv := range v { - tables[k][kk] = vv - } - } - return tables -} - func IsMoTable(tid uint64) bool { return tid == catalog2.MO_TABLES_ID } @@ -777,7 +850,7 @@ func (sm *SnapshotMeta) GetSnapshot( sid string, fs fileservice.FileService, mp *mpool.MPool, -) (map[uint32]containers.Vector, error) { +) (*SnapshotInfo, error) { var err error now := time.Now() @@ -796,9 +869,8 @@ func (sm *SnapshotMeta) GetSnapshot( sm.RLock() objects := copyObjectsLocked(sm.objects) tombstones := copyObjectsLocked(sm.tombstones) - tables := sm.copyTablesLocked() sm.RUnlock() - snapshotList := make(map[uint32]containers.Vector) + snapshotInfo := NewSnapshotInfo() idxes := []uint16{ColTS, ColLevel, ColObjId} colTypes := []types.Type{ snapshotSchemaTypes[ColTS], @@ -847,60 +919,117 @@ func (sm *SnapshotMeta) GetSnapshot( for r := 0; r < bat.Vecs[0].Length(); r++ { ts := tsList[r] snapTs := types.BuildTS(ts, 0) - acct := acctList[r] + objId := acctList[r] snapshotType := typeList[r] + if snapshotType == SnapshotTypeCluster { - for account := range tables { - if snapshotList[account] == nil { - snapshotList[account] = containers.MakeVector(types.T_TS.ToType(), mp) - } - if err = vector.AppendFixed[types.TS]( - snapshotList[account].GetDownstreamVector(), snapTs, false, mp, - ); err != nil { - return nil, err - } - // TODO: info to debug - logutil.Info( - "GetSnapshot-P1", - zap.String("ts", snapTs.ToString()), - zap.Uint32("account", account), - ) + // Cluster snapshot + snapshotInfo.cluster = append(snapshotInfo.cluster, snapTs) + logutil.Debug( + "GetSnapshot-P1", + zap.String("ts", snapTs.ToString()), + ) + continue + } + + // Account snapshot + if snapshotType == SnapshotTypeAccount { + id := uint32(objId) + if snapshotInfo.account[id] == nil { + snapshotInfo.account[id] = make([]types.TS, 0) } + snapshotInfo.account[id] = append(snapshotInfo.account[id], snapTs) + // TODO: info to debug + logutil.Debug( + "GetSnapshot-P2", + zap.String("ts", snapTs.ToString()), + zap.Uint32("account", id), + ) continue } - id := uint32(acct) - if snapshotList[id] == nil { - snapshotList[id] = containers.MakeVector(types.T_TS.ToType(), mp) + + // Database snapshot + if snapshotType == SnapshotTypeDatabase { + id := objId + if snapshotInfo.database[id] == nil { + snapshotInfo.database[id] = make([]types.TS, 0) + } + snapshotInfo.database[id] = append(snapshotInfo.database[id], snapTs) + logutil.Debug( + "GetSnapshot-P3-Database", + zap.String("ts", snapTs.ToString()), + zap.Uint64("database", id), + ) + continue } - // TODO: info to debug - logutil.Debug( - "GetSnapshot-P2", - zap.String("ts", snapTs.ToString()), - zap.Uint32("account", id), - ) - if err = vector.AppendFixed[types.TS]( - snapshotList[id].GetDownstreamVector(), snapTs, false, mp, - ); err != nil { - return nil, err + // Table snapshot + if snapshotType == SnapshotTypeTable { + id := objId + if snapshotInfo.tables[id] == nil { + snapshotInfo.tables[id] = make([]types.TS, 0) + } + snapshotInfo.tables[id] = append(snapshotInfo.tables[id], snapTs) + logutil.Debug( + "GetSnapshot-P4-Table", + zap.String("ts", snapTs.ToString()), + zap.Uint64("table", id), + ) + continue } } } } } - for i := range snapshotList { - snapshotList[i].GetDownstreamVector().InplaceSort() - count := 0 - if snapshotList[i].GetDownstreamVector() != nil { - count = snapshotList[i].GetDownstreamVector().Length() - } + // Sort cluster snapshots + sort.Slice(snapshotInfo.cluster, func(i, j int) bool { + return snapshotInfo.cluster[i].LT(&snapshotInfo.cluster[j]) + }) + logutil.Info( + "GetSnapshot-P3-Cluster", + zap.Int("snapshot count", len(snapshotInfo.cluster)), + ) + + // Sort account snapshots + for accountID, tsList := range snapshotInfo.account { + sort.Slice(tsList, func(i, j int) bool { + return tsList[i].LT(&tsList[j]) + }) + snapshotInfo.account[accountID] = tsList + logutil.Info( + "GetSnapshot-P3-Account", + zap.Uint32("account", accountID), + zap.Int("snapshot count", len(tsList)), + ) + } + + // Sort database snapshots + for dbID, tsList := range snapshotInfo.database { + sort.Slice(tsList, func(i, j int) bool { + return tsList[i].LT(&tsList[j]) + }) + snapshotInfo.database[dbID] = tsList + logutil.Info( + "GetSnapshot-P3-Database", + zap.Uint64("database", dbID), + zap.Int("snapshot count", len(tsList)), + ) + } + + // Sort table snapshots + for tableID, tsList := range snapshotInfo.tables { + sort.Slice(tsList, func(i, j int) bool { + return tsList[i].LT(&tsList[j]) + }) + snapshotInfo.tables[tableID] = tsList logutil.Info( - "GetSnapshot-P3", - zap.Uint32("account", i), - zap.Int("snapshot count", count), + "GetSnapshot-P3-Table", + zap.Uint64("table", tableID), + zap.Int("snapshot count", len(tsList)), ) } - return snapshotList, nil + + return snapshotInfo, nil } func AddDate(t time.Time, year, month, day int) time.Time { @@ -928,10 +1057,10 @@ func (sm *SnapshotMeta) GetPITR( checkpointTS := types.BuildTS(time.Now().UTC().UnixNano(), 0) ds := NewSnapshotDataSource(ctx, fs, checkpointTS, tombstonesStats) pitrInfo := &PitrInfo{ - cluster: types.TS{}, - account: make(map[uint32]types.TS), - database: make(map[uint64]types.TS), - tables: make(map[uint64]types.TS), + cluster: make([]types.TS, 1), + account: make(map[uint32][]types.TS), + database: make(map[uint64][]types.TS), + tables: make(map[uint64][]types.TS), } for _, object := range sm.pitr.objects { select { @@ -975,57 +1104,63 @@ func (sm *SnapshotMeta) GetPITR( account := objIDList[r] level := bat.Vecs[0].GetStringAt(r) if level == PitrLevelCluster { - if !pitrInfo.cluster.IsEmpty() { + if !pitrInfo.cluster[0].IsEmpty() { logutil.Warn("GC-PANIC-DUP-PIRT-P1", zap.String("level", "cluster"), - zap.String("old", pitrInfo.cluster.ToString()), + zap.String("old", pitrInfo.cluster[0].ToString()), zap.String("new", pitrTS.ToString()), ) - if pitrInfo.cluster.LT(&pitrTS) { + if pitrInfo.cluster[0].LT(&pitrTS) { continue } } - pitrInfo.cluster = pitrTS + pitrInfo.cluster[0] = pitrTS } else if level == PitrLevelAccount { id := uint32(account) - p := pitrInfo.account[id] + if len(pitrInfo.account[id]) == 0 { + pitrInfo.account[id] = make([]types.TS, 1) + } + p := pitrInfo.account[id][0] if !p.IsEmpty() && p.LT(&pitrTS) { continue } - pitrInfo.account[id] = pitrTS + pitrInfo.account[id][0] = pitrTS } else if level == PitrLevelDatabase { id := uint64(account) - p := pitrInfo.database[id] - if !p.IsEmpty() { + if len(pitrInfo.database[id]) > 0 { + p := pitrInfo.database[id][0] logutil.Warn("GC-PANIC-DUP-PIRT-P2", zap.String("level", "database"), zap.Uint64("id", id), zap.String("old", p.ToString()), zap.String("new", pitrTS.ToString()), ) - if p.LT(&pitrTS) { + if !p.IsEmpty() && p.LT(&pitrTS) { continue } + } else { + pitrInfo.database[id] = make([]types.TS, 1) } - pitrInfo.database[id] = pitrTS + pitrInfo.database[id][0] = pitrTS } else if level == PitrLevelTable { id := uint64(account) - p := pitrInfo.tables[id] - if !p.IsEmpty() { + if len(pitrInfo.tables[id]) > 0 { + p := pitrInfo.tables[id][0] logutil.Warn("GC-PANIC-DUP-PIRT-P3", zap.String("level", "table"), zap.Uint64("id", id), zap.String("old", p.ToString()), zap.String("new", pitrTS.ToString()), ) - if p.LT(&pitrTS) { + if !p.IsEmpty() && p.LT(&pitrTS) { continue } + } else { + pitrInfo.tables[id] = make([]types.TS, 1) } - pitrInfo.tables[id] = pitrTS + pitrInfo.tables[id][0] = pitrTS } - // TODO: info to debug logutil.Info( "GC-GetPITR", zap.String("level", level), @@ -1511,19 +1646,19 @@ func (sm *SnapshotMeta) TableInfoString() string { return buf.String() } -func (sm *SnapshotMeta) GetSnapshotListLocked(snapshotList map[uint32][]types.TS, tid uint64) []types.TS { +func (sm *SnapshotMeta) GetSnapshotListLocked(snapshots *SnapshotInfo, tid uint64) []types.TS { if sm.tableIDIndex[tid] == nil { return nil } accID := sm.tableIDIndex[tid].accountID - return snapshotList[accID] + return snapshots.account[accID] } // AccountToTableSnapshots returns a map from table id to its snapshots. -// The snapshotList is a map from account id to its snapshots. +// The snapshots parameter contains all levels of snapshots. // The pitr is the pitr info. func (sm *SnapshotMeta) AccountToTableSnapshots( - accountSnapshots map[uint32][]types.TS, + snapshots *SnapshotInfo, pitr *PitrInfo, ) ( tableSnapshots map[uint64][]types.TS, @@ -1532,20 +1667,12 @@ func (sm *SnapshotMeta) AccountToTableSnapshots( tableSnapshots = make(map[uint64][]types.TS, 100) tablePitrs = make(map[uint64]*types.TS, 100) - // 1. for system tables, flatten the accountSnapshots to tableSnapshots + // 1. for system tables, flatten all snapshots to tableSnapshots var flattenSnapshots []types.TS { - var cnt int - for _, tss := range accountSnapshots { - cnt += len(tss) - } - flattenSnapshots = make([]types.TS, 0, cnt) - - for _, tss := range accountSnapshots { - flattenSnapshots = append(flattenSnapshots, tss...) - } + allSnapshots := snapshots.ToTsList() flattenSnapshots = compute.SortAndDedup( - flattenSnapshots, + allSnapshots, func(a, b *types.TS) bool { return a.LT(b) }, @@ -1565,16 +1692,88 @@ func (sm *SnapshotMeta) AccountToTableSnapshots( tablePitrs[catalog2.MO_TABLES_ID] = &sysPitr tablePitrs[catalog2.MO_COLUMNS_ID] = &sysPitr + // First, collect all table snapshots that should be applied to all tables in the same database + dbTableSnapshots := make(map[uint64][]types.TS) // dbID -> []types.TS + for tableID, tableTSList := range snapshots.tables { + if len(tableTSList) > 0 { + if info := sm.tableIDIndex[tableID]; info != nil { + dbID := info.dbID + if dbTableSnapshots[dbID] == nil { + dbTableSnapshots[dbID] = make([]types.TS, 0) + } + dbTableSnapshots[dbID] = append(dbTableSnapshots[dbID], tableTSList...) + delete(snapshots.tables, tableID) + } + } + } + + // Sort and deduplicate database-level table snapshots + for dbID, tsList := range dbTableSnapshots { + dbTableSnapshots[dbID] = compute.SortAndDedup( + tsList, + func(a, b *types.TS) bool { + return a.LT(b) + }, + func(a, b *types.TS) bool { + return a.EQ(b) + }, + ) + } + for tid, info := range sm.tableIDIndex { if catalog2.IsSystemTable(tid) { continue } - // use the account snapshots as the table snapshots + + // Collect all applicable snapshots for this table (table + database + account + cluster) + var allApplicableSnapshots []types.TS + + // 1. Add table-specific snapshots + if tableTSList := snapshots.tables[tid]; len(tableTSList) > 0 { + logutil.Warn("GC-PANIC-DUP-TABLE-SNAP", + zap.String("level", "table"), + zap.Uint64("id", tid), + zap.Int("count", len(tableTSList)), + ) + allApplicableSnapshots = append(allApplicableSnapshots, tableTSList...) + } + + // 2. Add snapshots from other tables in the same database (if any table in this DB has snapshots) + if dbTableTSList := dbTableSnapshots[info.dbID]; len(dbTableTSList) > 0 { + allApplicableSnapshots = append(allApplicableSnapshots, dbTableTSList...) + } + + // 3. Add database-specific snapshots + if dbTSList := snapshots.database[info.dbID]; len(dbTSList) > 0 { + allApplicableSnapshots = append(allApplicableSnapshots, dbTSList...) + } + + // 4. Add account-specific snapshots accountID := info.accountID - tableSnapshots[tid] = accountSnapshots[accountID] + if accountTSList := snapshots.account[accountID]; len(accountTSList) > 0 { + allApplicableSnapshots = append(allApplicableSnapshots, accountTSList...) + } + + // 5. Add cluster snapshots + if clusterTSList := snapshots.cluster; len(clusterTSList) > 0 { + allApplicableSnapshots = append(allApplicableSnapshots, clusterTSList...) + } + + // Sort and deduplicate the combined snapshots + if len(allApplicableSnapshots) > 0 { + tableSnapshots[tid] = compute.SortAndDedup( + allApplicableSnapshots, + func(a, b *types.TS) bool { + return a.LT(b) + }, + func(a, b *types.TS) bool { + return a.EQ(b) + }, + ) + } // get the pitr for the table - ts := pitr.GetTS(accountID, info.dbID, tid) + ts := pitr.GetTS(info.accountID, info.dbID, tid) tablePitrs[tid] = &ts } return @@ -1592,7 +1791,7 @@ func (sm *SnapshotMeta) GetPitrByTable( } func (sm *SnapshotMeta) MergeTableInfo( - accountSnapshots map[uint32][]types.TS, + snapshots *SnapshotInfo, pitr *PitrInfo, ) error { sm.Lock() @@ -1600,9 +1799,80 @@ func (sm *SnapshotMeta) MergeTableInfo( if len(sm.tables) == 0 { return nil } + + // First, collect all table snapshots that should be applied to all tables in the same database + dbTableSnapshots := make(map[uint64][]types.TS) // dbID -> []types.TS + for tableID, tableTSList := range snapshots.tables { + if len(tableTSList) > 0 { + if info := sm.tableIDIndex[tableID]; info != nil { + dbID := info.dbID + if dbTableSnapshots[dbID] == nil { + dbTableSnapshots[dbID] = make([]types.TS, 0) + } + dbTableSnapshots[dbID] = append(dbTableSnapshots[dbID], tableTSList...) + delete(snapshots.tables, tableID) + } + } + } + + // Sort and deduplicate database-level table snapshots + for dbID, tsList := range dbTableSnapshots { + dbTableSnapshots[dbID] = compute.SortAndDedup( + tsList, + func(a, b *types.TS) bool { + return a.LT(b) + }, + func(a, b *types.TS) bool { + return a.EQ(b) + }, + ) + } + for accID, tables := range sm.tables { - if accountSnapshots[accID] == nil && pitr.IsEmpty() { - for _, table := range tables { + for _, table := range tables { + // Get a list of snapshots available for the table + // (including snapshots from other tables in the same database) + var applicableSnapshots []types.TS + + // 1. Add table-specific snapshots + if tableSnapshots := snapshots.tables[table.tid]; len(tableSnapshots) > 0 { + applicableSnapshots = append(applicableSnapshots, tableSnapshots...) + } + + // 2. Add snapshots from other tables in the same database (if any table in this DB has snapshots) + if dbTableTSList := dbTableSnapshots[table.dbID]; len(dbTableTSList) > 0 { + applicableSnapshots = append(applicableSnapshots, dbTableTSList...) + } + + // 3. Add database-specific snapshots + if dbSnapshots := snapshots.database[table.dbID]; len(dbSnapshots) > 0 { + applicableSnapshots = append(applicableSnapshots, dbSnapshots...) + } + + // 4. Add account-specific snapshots + if accountSnapshots := snapshots.account[accID]; len(accountSnapshots) > 0 { + applicableSnapshots = append(applicableSnapshots, accountSnapshots...) + } + + // 5. Add cluster snapshots + if clusterSnapshots := snapshots.cluster; len(clusterSnapshots) > 0 { + applicableSnapshots = append(applicableSnapshots, clusterSnapshots...) + } + // Sort and deduplicate the combined snapshots + if len(applicableSnapshots) > 0 { + applicableSnapshots = compute.SortAndDedup( + applicableSnapshots, + func(a, b *types.TS) bool { + return a.LT(b) + }, + func(a, b *types.TS) bool { + return a.EQ(b) + }, + ) + } + + // If there is no snapshot and PITR is empty, delete the deleted table + if len(applicableSnapshots) == 0 && pitr.IsEmpty() { if !table.deleteAt.IsEmpty() { delete(sm.tables[accID], table.tid) delete(sm.tableIDIndex, table.tid) @@ -1610,13 +1880,13 @@ func (sm *SnapshotMeta) MergeTableInfo( delete(sm.objects, table.tid) } } + continue } - continue - } - for _, table := range tables { + + // Check if the table is referenced by the snapshot ts := sm.GetPitrByTable(pitr, table.dbID, table.tid) if !table.deleteAt.IsEmpty() && - !isSnapshotRefers(table, accountSnapshots[accID], ts) { + !isSnapshotRefers(table, applicableSnapshots, ts) { delete(sm.tables[accID], table.tid) delete(sm.tableIDIndex, table.tid) if sm.objects[table.tid] != nil { @@ -1761,9 +2031,3 @@ func ObjectIsSnapshotRefers( } return false } - -func CloseSnapshotList(snapshots map[uint32]containers.Vector) { - for _, snapshot := range snapshots { - snapshot.Close() - } -} diff --git a/pkg/vm/engine/tae/logtail/snapshot_test.go b/pkg/vm/engine/tae/logtail/snapshot_test.go new file mode 100644 index 0000000000000..a5f753cab0cc6 --- /dev/null +++ b/pkg/vm/engine/tae/logtail/snapshot_test.go @@ -0,0 +1,418 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logtail + +import ( + "github.com/matrixorigin/matrixone/pkg/objectio" + "testing" + + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestSnapshotInfo tests the basic functionality of SnapshotInfo +func TestSnapshotInfo(t *testing.T) { + t.Run("NewSnapshotInfo", func(t *testing.T) { + info := NewSnapshotInfo() + assert.NotNil(t, info) + assert.True(t, info.IsEmpty()) + assert.NotNil(t, info.cluster) + assert.NotNil(t, info.account) + assert.NotNil(t, info.database) + assert.NotNil(t, info.tables) + }) + + t.Run("AddSnapshots", func(t *testing.T) { + info := NewSnapshotInfo() + ts1 := types.BuildTS(1000, 0) + ts2 := types.BuildTS(2000, 0) + ts3 := types.BuildTS(3000, 0) + + // Add cluster snapshots + info.cluster = append(info.cluster, ts1, ts2) + assert.False(t, info.IsEmpty()) + + // Add account snapshots + info.account[1] = []types.TS{ts1, ts3} + info.account[2] = []types.TS{ts2} + + // Add database snapshots + info.database[100] = []types.TS{ts1} + info.database[200] = []types.TS{ts2, ts3} + + // Add table snapshots + info.tables[1001] = []types.TS{ts1} + info.tables[1002] = []types.TS{ts2} + + // Test GetTS (should return first timestamp for PITR compatibility) + assert.Equal(t, ts1, info.GetTS(1, 100, 1001)) // cluster level + assert.Equal(t, ts1, info.GetTS(1, 0, 0)) // account level + assert.Equal(t, ts1, info.GetTS(0, 100, 0)) // database level + assert.Equal(t, ts1, info.GetTS(0, 0, 1001)) // table level + + // Test MinTS + minTS := info.MinTS() + assert.Equal(t, ts1, minTS) + + // Test ToTsList + allTS := info.ToTsList() + assert.Contains(t, allTS, ts1) + assert.Contains(t, allTS, ts2) + assert.Contains(t, allTS, ts3) + }) +} + +// TestAccountToTableSnapshots tests the core logic of snapshot distribution +func TestAccountToTableSnapshots(t *testing.T) { + // Create a mock SnapshotMeta + sm := &SnapshotMeta{ + tableIDIndex: make(map[uint64]*tableInfo), + } + + // Setup test data: 2 accounts, 2 databases, 4 tables + // Account 1: DB 100 (Table 1001, 1002), DB 200 (Table 2001) + // Account 2: DB 300 (Table 3001) + sm.tableIDIndex[1001] = &tableInfo{accountID: 1, dbID: 100, tid: 1001} + sm.tableIDIndex[1002] = &tableInfo{accountID: 1, dbID: 100, tid: 1002} + sm.tableIDIndex[2001] = &tableInfo{accountID: 1, dbID: 200, tid: 2001} + sm.tableIDIndex[3001] = &tableInfo{accountID: 2, dbID: 300, tid: 3001} + + t.Run("TableSnapshotAppliedToAllTablesInDatabase", func(t *testing.T) { + // Create snapshots with table-level snapshot for table 1001 + snapshots := NewSnapshotInfo() + ts1 := types.BuildTS(1000, 0) + ts2 := types.BuildTS(2000, 0) + + // Add table snapshot for table 1001 (in DB 100) + snapshots.tables[1001] = []types.TS{ts1} + // Add account snapshot for account 1 + snapshots.account[1] = []types.TS{ts2} + + pitr := NewSnapshotInfo() + + tableSnapshots, tablePitrs := sm.AccountToTableSnapshots(snapshots, pitr) + + // Verify that table 1001 has both its own snapshot and account snapshot + require.Contains(t, tableSnapshots, uint64(1001)) + assert.Contains(t, tableSnapshots[1001], ts1) // table snapshot + assert.Contains(t, tableSnapshots[1001], ts2) // account snapshot + + // CRITICAL: Verify that table 1002 (in same DB 100) also gets table 1001's snapshot + require.Contains(t, tableSnapshots, uint64(1002)) + assert.Contains(t, tableSnapshots[1002], ts1) // table snapshot from 1001 + assert.Contains(t, tableSnapshots[1002], ts2) // account snapshot + + // Verify that table 2001 (in different DB 200) only gets account snapshot + require.Contains(t, tableSnapshots, uint64(2001)) + assert.NotContains(t, tableSnapshots[2001], ts1) // should NOT have table snapshot from 1001 + assert.Contains(t, tableSnapshots[2001], ts2) // account snapshot + + // Verify that table 3001 (different account) doesn't get any of these snapshots + if snapshots3001, exists := tableSnapshots[3001]; exists { + assert.NotContains(t, snapshots3001, ts1) // should NOT have table snapshot from 1001 + assert.NotContains(t, snapshots3001, ts2) // should NOT have account snapshot from account 1 + } + + // Verify PITR info is set correctly + assert.NotNil(t, tablePitrs[1001]) + assert.NotNil(t, tablePitrs[1002]) + assert.NotNil(t, tablePitrs[2001]) + assert.NotNil(t, tablePitrs[3001]) + }) + + t.Run("MultipleTableSnapshotsInSameDatabase", func(t *testing.T) { + // Create snapshots with table-level snapshots for both tables in DB 100 + snapshots := NewSnapshotInfo() + ts1 := types.BuildTS(1000, 0) + ts2 := types.BuildTS(2000, 0) + ts3 := types.BuildTS(3000, 0) + + // Add table snapshots for both tables in DB 100 + snapshots.tables[1001] = []types.TS{ts1} + snapshots.tables[1002] = []types.TS{ts2} + // Add account snapshot + snapshots.account[1] = []types.TS{ts3} + + pitr := NewPitrInfo() + + tableSnapshots, _ := sm.AccountToTableSnapshots(snapshots, pitr) + + // Both tables should have all snapshots from their database + require.Contains(t, tableSnapshots, uint64(1001)) + require.Contains(t, tableSnapshots, uint64(1002)) + + // Table 1001 should have: its own snapshot + table 1002's snapshot + account snapshot + assert.Contains(t, tableSnapshots[1001], ts1) // its own + assert.Contains(t, tableSnapshots[1001], ts2) // from table 1002 + assert.Contains(t, tableSnapshots[1001], ts3) // account + + // Table 1002 should have: its own snapshot + table 1001's snapshot + account snapshot + assert.Contains(t, tableSnapshots[1002], ts1) // from table 1001 + assert.Contains(t, tableSnapshots[1002], ts2) // its own + assert.Contains(t, tableSnapshots[1002], ts3) // account + + // Verify snapshots are sorted and deduplicated + assert.True(t, len(tableSnapshots[1001]) >= 3) + assert.True(t, len(tableSnapshots[1002]) >= 3) + }) + + t.Run("DatabaseSnapshotTest", func(t *testing.T) { + // Test database-level snapshots + snapshots := NewSnapshotInfo() + ts1 := types.BuildTS(1000, 0) + ts2 := types.BuildTS(2000, 0) + + // Add database snapshot for DB 100 + snapshots.database[100] = []types.TS{ts1} + // Add account snapshot + snapshots.account[1] = []types.TS{ts2} + + pitr := NewPitrInfo() + + tableSnapshots, _ := sm.AccountToTableSnapshots(snapshots, pitr) + + // Both tables in DB 100 should have database snapshot + require.Contains(t, tableSnapshots, uint64(1001)) + require.Contains(t, tableSnapshots, uint64(1002)) + assert.Contains(t, tableSnapshots[1001], ts1) // database snapshot + assert.Contains(t, tableSnapshots[1001], ts2) // account snapshot + assert.Contains(t, tableSnapshots[1002], ts1) // database snapshot + assert.Contains(t, tableSnapshots[1002], ts2) // account snapshot + + // Table in DB 200 should only have account snapshot + require.Contains(t, tableSnapshots, uint64(2001)) + assert.NotContains(t, tableSnapshots[2001], ts1) // should NOT have DB 100 snapshot + assert.Contains(t, tableSnapshots[2001], ts2) // account snapshot + }) + + t.Run("ClusterSnapshotTest", func(t *testing.T) { + // Test cluster-level snapshots + snapshots := NewSnapshotInfo() + ts1 := types.BuildTS(1000, 0) + + // Add cluster snapshot + snapshots.cluster = []types.TS{ts1} + + pitr := NewPitrInfo() + + tableSnapshots, _ := sm.AccountToTableSnapshots(snapshots, pitr) + + // All tables should have cluster snapshot + for _, tid := range []uint64{1001, 1002, 2001, 3001} { + require.Contains(t, tableSnapshots, tid) + assert.Contains(t, tableSnapshots[tid], ts1, "Table %d should have cluster snapshot", tid) + } + }) + + t.Run("SnapshotPriorityTest", func(t *testing.T) { + // Test that all levels of snapshots are combined correctly + snapshots := NewSnapshotInfo() + tsCluster := types.BuildTS(1000, 0) + tsAccount := types.BuildTS(2000, 0) + tsDatabase := types.BuildTS(3000, 0) + tsTable := types.BuildTS(4000, 0) + + // Add all levels of snapshots + snapshots.cluster = []types.TS{tsCluster} + snapshots.account[1] = []types.TS{tsAccount} + snapshots.database[100] = []types.TS{tsDatabase} + snapshots.tables[1001] = []types.TS{tsTable} + + pitr := NewPitrInfo() + + tableSnapshots, _ := sm.AccountToTableSnapshots(snapshots, pitr) + + // Table 1001 should have all snapshots + require.Contains(t, tableSnapshots, uint64(1001)) + snapshots1001 := tableSnapshots[1001] + assert.Contains(t, snapshots1001, tsCluster) + assert.Contains(t, snapshots1001, tsAccount) + assert.Contains(t, snapshots1001, tsDatabase) + assert.Contains(t, snapshots1001, tsTable) + + // Table 1002 (same DB) should have all except direct table snapshot, but should have table 1001's snapshot + require.Contains(t, tableSnapshots, uint64(1002)) + snapshots1002 := tableSnapshots[1002] + assert.Contains(t, snapshots1002, tsCluster) + assert.Contains(t, snapshots1002, tsAccount) + assert.Contains(t, snapshots1002, tsDatabase) + assert.Contains(t, snapshots1002, tsTable) // from table 1001 in same DB + }) +} + +// TestMergeTableInfo tests the MergeTableInfo functionality +func TestMergeTableInfo(t *testing.T) { + // Create a mock SnapshotMeta with some tables + sm := &SnapshotMeta{ + tables: make(map[uint32]map[uint64]*tableInfo), + tableIDIndex: make(map[uint64]*tableInfo), + objects: make(map[uint64]map[objectio.Segmentid]*objectInfo), + } + + // Setup test tables + deleteTS := types.BuildTS(6000, 0) // deleted timestamp + sm.tables[1] = make(map[uint64]*tableInfo) + sm.tables[1][1001] = &tableInfo{accountID: 1, dbID: 100, tid: 1001, deleteAt: deleteTS} + sm.tables[1][1002] = &tableInfo{accountID: 1, dbID: 100, tid: 1002, deleteAt: deleteTS} + sm.tables[1][2001] = &tableInfo{accountID: 1, dbID: 200, tid: 2001, deleteAt: deleteTS} + + sm.tableIDIndex[1001] = sm.tables[1][1001] + sm.tableIDIndex[1002] = sm.tables[1][1002] + sm.tableIDIndex[2001] = sm.tables[1][2001] + + t.Run("TableSnapshotProtectsAllTablesInDatabase", func(t *testing.T) { + // Create snapshots with table snapshot that should protect the table + snapshots := NewSnapshotInfo() + protectTS := types.BuildTS(5000, 0) // after delete, should protect + + // Add table snapshot for table 1001 + snapshots.tables[1001] = []types.TS{protectTS} + + pitr := NewPitrInfo() + + // Before merge, all tables exist + assert.Contains(t, sm.tables[1], uint64(1001)) + assert.Contains(t, sm.tables[1], uint64(1002)) + assert.Contains(t, sm.tables[1], uint64(2001)) + + err := sm.MergeTableInfo(snapshots, pitr) + require.NoError(t, err) + + // After merge, tables in DB 100 should be protected by table 1001's snapshot + assert.Contains(t, sm.tables[1], uint64(1001), "Table 1001 should be protected by its own snapshot") + assert.Contains(t, sm.tables[1], uint64(1002), "Table 1002 should be protected by table 1001's snapshot (same DB)") + + // Table in different DB should be deleted (no protection) + assert.NotContains(t, sm.tables[1], uint64(2001), "Table 2001 should be deleted (different DB, no protection)") + }) + + t.Run("NoSnapshotAllowsDeletion", func(t *testing.T) { + // Reset tables + sm.tables[1] = make(map[uint64]*tableInfo) + sm.tables[1][1001] = &tableInfo{accountID: 1, dbID: 100, tid: 1001, deleteAt: deleteTS} + sm.tables[1][1002] = &tableInfo{accountID: 1, dbID: 100, tid: 1002, deleteAt: deleteTS} + + // Create empty snapshots and PITR + snapshots := NewSnapshotInfo() + pitr := NewPitrInfo() + + err := sm.MergeTableInfo(snapshots, pitr) + require.NoError(t, err) + + // All deleted tables should be removed + assert.NotContains(t, sm.tables[1], uint64(1001)) + assert.NotContains(t, sm.tables[1], uint64(1002)) + }) + + t.Run("DatabaseSnapshotProtectsAllTablesInDatabase", func(t *testing.T) { + // Reset tables + sm.tables[1] = make(map[uint64]*tableInfo) + sm.tables[1][1001] = &tableInfo{accountID: 1, dbID: 100, tid: 1001, deleteAt: deleteTS} + sm.tables[1][1002] = &tableInfo{accountID: 1, dbID: 100, tid: 1002, deleteAt: deleteTS} + sm.tables[1][2001] = &tableInfo{accountID: 1, dbID: 200, tid: 2001, deleteAt: deleteTS} + + // Create database snapshot + snapshots := NewSnapshotInfo() + protectTS := types.BuildTS(5000, 0) + snapshots.database[100] = []types.TS{protectTS} + + pitr := NewPitrInfo() + + err := sm.MergeTableInfo(snapshots, pitr) + require.NoError(t, err) + + // Tables in DB 100 should be protected + assert.Contains(t, sm.tables[1], uint64(1001)) + assert.Contains(t, sm.tables[1], uint64(1002)) + + // Table in different DB should be deleted + assert.NotContains(t, sm.tables[1], uint64(2001)) + }) +} + +// TestSnapshotDeduplication tests that snapshots are properly deduplicated +func TestSnapshotDeduplication(t *testing.T) { + sm := &SnapshotMeta{ + tableIDIndex: make(map[uint64]*tableInfo), + } + + // Setup test data + sm.tableIDIndex[1001] = &tableInfo{accountID: 1, dbID: 100, tid: 1001} + sm.tableIDIndex[1002] = &tableInfo{accountID: 1, dbID: 100, tid: 1002} + + snapshots := NewSnapshotInfo() + ts1 := types.BuildTS(1000, 0) + ts2 := types.BuildTS(2000, 0) + + // Add duplicate timestamps at different levels + snapshots.cluster = []types.TS{ts1, ts2} + snapshots.account[1] = []types.TS{ts1, ts2} // duplicates + snapshots.database[100] = []types.TS{ts1} // duplicate + snapshots.tables[1001] = []types.TS{ts2} // duplicate + + pitr := NewPitrInfo() + + tableSnapshots, _ := sm.AccountToTableSnapshots(snapshots, pitr) + + // Verify deduplication - each table should have exactly 2 unique timestamps + for _, tid := range []uint64{1001, 1002} { + require.Contains(t, tableSnapshots, tid) + snapshots := tableSnapshots[tid] + + // Count unique timestamps + uniqueTS := make(map[types.TS]bool) + for _, ts := range snapshots { + uniqueTS[ts] = true + } + + assert.Equal(t, 2, len(uniqueTS), "Table %d should have exactly 2 unique timestamps after deduplication", tid) + assert.True(t, uniqueTS[ts1], "Table %d should have ts1", tid) + assert.True(t, uniqueTS[ts2], "Table %d should have ts2", tid) + } +} + +// TestPitrCompatibility tests that PITR functionality still works correctly +func TestPitrCompatibility(t *testing.T) { + t.Run("GetTSReturnsFirstTimestamp", func(t *testing.T) { + info := NewSnapshotInfo() + ts1 := types.BuildTS(1000, 0) + ts2 := types.BuildTS(2000, 0) + ts3 := types.BuildTS(3000, 0) + + // Add multiple timestamps in different orders + info.cluster = []types.TS{ts3, ts1, ts2} // unsorted + info.account[1] = []types.TS{ts2, ts3} + info.database[100] = []types.TS{ts3} + info.tables[1001] = []types.TS{ts2, ts1} + + // GetTS should return the first (earliest) timestamp for PITR compatibility + assert.Equal(t, ts3, info.GetTS(0, 0, 0)) // cluster (first in slice, not necessarily earliest) + assert.Equal(t, ts2, info.GetTS(1, 0, 0)) // account + assert.Equal(t, ts3, info.GetTS(0, 100, 0)) // database + assert.Equal(t, ts2, info.GetTS(0, 0, 1001)) // table + }) + + t.Run("PitrInfoAlias", func(t *testing.T) { + // Test that PitrInfo is correctly aliased to SnapshotInfo + var pitr *PitrInfo = NewPitrInfo() + assert.NotNil(t, pitr) + + ts := types.BuildTS(1000, 0) + pitr.cluster = []types.TS{ts} + assert.False(t, pitr.IsEmpty()) + assert.Equal(t, ts, pitr.GetTS(0, 0, 0)) + }) +} diff --git a/pkg/vm/engine/tae/logtail/storage_usage.go b/pkg/vm/engine/tae/logtail/storage_usage.go index d75f780ebebc0..bb3daccc4ab4a 100644 --- a/pkg/vm/engine/tae/logtail/storage_usage.go +++ b/pkg/vm/engine/tae/logtail/storage_usage.go @@ -784,7 +784,7 @@ func FillUsageBatOfCompacted( usage *TNUsageMemo, data *batch.Batch, meta *SnapshotMeta, - accountSnapshots map[uint32][]types.TS, + snapshots *SnapshotInfo, pitrs *PitrInfo, _ int, ) { @@ -798,7 +798,7 @@ func FillUsageBatOfCompacted( }() usageData := make(map[[3]uint64]UsageData) tableSnapshots, tablePitrs := meta.AccountToTableSnapshots( - accountSnapshots, + snapshots, pitrs, ) objectsName := make(map[string]struct{})