Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 17 additions & 24 deletions pkg/vm/engine/tae/db/gc/v3/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
)
return
}
var snapshots map[uint32]containers.Vector
var snapshots *logtail.SnapshotInfo
var pitrs *logtail.PitrInfo
pitrs, err = c.GetPITRsLocked(ctx)
if err != nil {
Expand All @@ -437,8 +437,6 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
)
return
}
accountSnapshots := TransformToTSList(snapshots)
logtail.CloseSnapshotList(snapshots)
_, sarg, _ := fault.TriggerFault("replay error UT")
if sarg != "" {
err = moerr.NewInternalErrorNoCtxf("GC-REPLAY-GET-CHECKPOINT-DATA-ERROR %s", sarg)
Expand All @@ -461,13 +459,12 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
c.checkpointCli.GetCatalog().GetUsageMemo().(*logtail.TNUsageMemo),
ckpBatch,
c.mutation.snapshotMeta,
accountSnapshots,
snapshots,
pitrs,
0)
logutil.Info(
"GC-REPLAY-COLLECT-SNAPSHOT-SIZE",
zap.String("task", c.TaskNameLocked()),
zap.Int("size", len(accountSnapshots)),
zap.Duration("duration", time.Since(start)),
zap.String("checkpoint", compacted.String()),
zap.Int("count", ckpBatch.RowCount()),
Expand Down Expand Up @@ -774,7 +771,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked(
ctx context.Context,
checkpointLowWaterMark *types.TS,
memoryBuffer *containers.OneSchemaBatchBuffer,
accountSnapshots map[uint32][]types.TS,
snapshots *logtail.SnapshotInfo,
pitrs *logtail.PitrInfo,
gcFileCount int,
) (err error) {
Expand Down Expand Up @@ -885,7 +882,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked(
c.checkpointCli.GetCatalog().GetUsageMemo().(*logtail.TNUsageMemo),
newCkpData,
c.mutation.snapshotMeta,
accountSnapshots,
snapshots,
pitrs,
gcFileCount)
if newCkp == nil {
Expand Down Expand Up @@ -1066,10 +1063,9 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
memoryBuffer *containers.OneSchemaBatchBuffer,
) (err error) {
now := time.Now()
var snapshots map[uint32]containers.Vector
var snapshots *logtail.SnapshotInfo
var extraErrMsg string
defer func() {
logtail.CloseSnapshotList(snapshots)
logutil.Info(
"GC-TRACE-TRY-GC-AGAINST-GCKP",
zap.String("task", c.TaskNameLocked()),
Expand All @@ -1089,9 +1085,8 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
extraErrMsg = "GetSnapshot failed"
return
}
accountSnapshots := TransformToTSList(snapshots)
filesToGC, err := c.doGCAgainstGlobalCheckpointLocked(
ctx, gckp, accountSnapshots, pitrs, memoryBuffer,
ctx, gckp, snapshots, pitrs, memoryBuffer,
)
if err != nil {
extraErrMsg = "doGCAgainstGlobalCheckpointLocked failed"
Expand Down Expand Up @@ -1129,7 +1124,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
waterMark = scanMark
}
err = c.mergeCheckpointFilesLocked(
ctx, &waterMark, memoryBuffer, accountSnapshots, pitrs, len(filesToGC),
ctx, &waterMark, memoryBuffer, snapshots, pitrs, len(filesToGC),
)
if err != nil {
extraErrMsg = fmt.Sprintf("mergeCheckpointFilesLocked %v failed", waterMark.ToString())
Expand All @@ -1142,7 +1137,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked(
ctx context.Context,
gckp *checkpoint.CheckpointEntry,
accountSnapshots map[uint32][]types.TS,
snapshots *logtail.SnapshotInfo,
pitrs *logtail.PitrInfo,
memoryBuffer *containers.OneSchemaBatchBuffer,
) ([]string, error) {
Expand Down Expand Up @@ -1181,7 +1176,7 @@ func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked(
if filesToGC, metafile, err = scannedWindow.ExecuteGlobalCheckpointBasedGC(
ctx,
gckp,
accountSnapshots,
snapshots,
pitrs,
c.mutation.snapshotMeta,
memoryBuffer,
Expand Down Expand Up @@ -1222,7 +1217,7 @@ func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked(
now = time.Now()
// TODO:
c.updateGCWaterMark(gckp)
c.mutation.snapshotMeta.MergeTableInfo(accountSnapshots, pitrs)
c.mutation.snapshotMeta.MergeTableInfo(snapshots, pitrs)
mergeDuration = time.Since(now)
return filesToGC, nil
}
Expand Down Expand Up @@ -1314,7 +1309,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
// TODO
return err
}
var snapshots map[uint32]containers.Vector
var snapshots *logtail.SnapshotInfo
snapshots, err = c.GetSnapshotsLocked()
if err != nil {
logutil.Error(
Expand All @@ -1324,7 +1319,6 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
)
return err
}
defer logtail.CloseSnapshotList(snapshots)
var pitr *logtail.PitrInfo
pitr, err = c.GetPITRsLocked(c.ctx)
if err != nil {
Expand All @@ -1338,8 +1332,6 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {

mergeWindow := c.GetScannedWindowLocked().Clone()
defer mergeWindow.Close()

accoutSnapshots := TransformToTSList(snapshots)
logutil.Info(
"GC-TRACE-MERGE-WINDOW",
zap.String("task", c.TaskNameLocked()),
Expand All @@ -1348,7 +1340,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
if _, _, err = mergeWindow.ExecuteGlobalCheckpointBasedGC(
c.ctx,
gCkp,
accoutSnapshots,
snapshots,
pitr,
c.mutation.snapshotMeta,
buffer,
Expand All @@ -1370,7 +1362,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
if _, _, err = debugWindow.ExecuteGlobalCheckpointBasedGC(
c.ctx,
gCkp,
accoutSnapshots,
snapshots,
pitr,
c.mutation.snapshotMeta,
buffer,
Expand Down Expand Up @@ -1450,7 +1442,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
}
collectObjectsFromCheckpointData(c.ctx, ckpReader, cptCkpObjects)

tList, pList := c.mutation.snapshotMeta.AccountToTableSnapshots(accoutSnapshots, pitr)
tList, pList := c.mutation.snapshotMeta.AccountToTableSnapshots(snapshots, pitr)
for name, tables := range ickpObjects {
for _, entry := range tables {
if cptCkpObjects[name] != nil {
Expand Down Expand Up @@ -1822,12 +1814,13 @@ func (c *checkpointCleaner) mutUpdateSnapshotMetaLocked(
)
}

func (c *checkpointCleaner) GetSnapshots() (map[uint32]containers.Vector, error) {
func (c *checkpointCleaner) GetSnapshots() (*logtail.SnapshotInfo, error) {
c.mutation.Lock()
defer c.mutation.Unlock()
return c.mutation.snapshotMeta.GetSnapshot(c.ctx, c.sid, c.fs, c.mp)
}
func (c *checkpointCleaner) GetSnapshotsLocked() (map[uint32]containers.Vector, error) {

func (c *checkpointCleaner) GetSnapshotsLocked() (*logtail.SnapshotInfo, error) {
return c.mutation.snapshotMeta.GetSnapshot(c.ctx, c.sid, c.fs, c.mp)
}
func (c *checkpointCleaner) GetTablePK(tid uint64) string {
Expand Down
44 changes: 22 additions & 22 deletions pkg/vm/engine/tae/db/gc/v3/exec_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ type CheckpointBasedGCJob struct {
coarseProbility float64
canGCCacheSize int
}
sourcer engine.BaseReader
snapshotMeta *logtail.SnapshotMeta
accountSnapshots map[uint32][]types.TS
pitr *logtail.PitrInfo
ts *types.TS
globalCkpLoc objectio.Location
globalCkpVer uint32
sourcer engine.BaseReader
snapshotMeta *logtail.SnapshotMeta
snapshots *logtail.SnapshotInfo
pitr *logtail.PitrInfo
ts *types.TS
globalCkpLoc objectio.Location
globalCkpVer uint32

result struct {
vecToGC *vector.Vector
Expand All @@ -83,7 +83,7 @@ func NewCheckpointBasedGCJob(
gckpVersion uint32,
sourcer engine.BaseReader,
pitr *logtail.PitrInfo,
accountSnapshots map[uint32][]types.TS,
snapshots *logtail.SnapshotInfo,
snapshotMeta *logtail.SnapshotMeta,
buffer *containers.OneSchemaBatchBuffer,
isOwner bool,
Expand All @@ -93,13 +93,13 @@ func NewCheckpointBasedGCJob(
opts ...GCJobExecutorOption,
) *CheckpointBasedGCJob {
e := &CheckpointBasedGCJob{
sourcer: sourcer,
snapshotMeta: snapshotMeta,
accountSnapshots: accountSnapshots,
pitr: pitr,
ts: ts,
globalCkpLoc: globalCkpLoc,
globalCkpVer: gckpVersion,
sourcer: sourcer,
snapshotMeta: snapshotMeta,
snapshots: snapshots,
pitr: pitr,
ts: ts,
globalCkpLoc: globalCkpLoc,
globalCkpVer: gckpVersion,
}
for _, opt := range opts {
opt(e)
Expand All @@ -115,7 +115,7 @@ func (e *CheckpointBasedGCJob) Close() error {
e.sourcer = nil
}
e.snapshotMeta = nil
e.accountSnapshots = nil
e.snapshots = nil
e.pitr = nil
e.ts = nil
e.globalCkpLoc = nil
Expand Down Expand Up @@ -165,7 +165,7 @@ func (e *CheckpointBasedGCJob) Execute(ctx context.Context) error {

fineFilter, err := MakeSnapshotAndPitrFineFilter(
e.ts,
e.accountSnapshots,
e.snapshots,
e.pitr,
e.snapshotMeta,
transObjects,
Expand Down Expand Up @@ -314,7 +314,7 @@ func MakeBloomfilterCoarseFilter(

func MakeSnapshotAndPitrFineFilter(
ts *types.TS,
accountSnapshots map[uint32][]types.TS,
snapshots *logtail.SnapshotInfo,
pitrs *logtail.PitrInfo,
snapshotMeta *logtail.SnapshotMeta,
transObjects map[string]map[uint64]*ObjectEntry,
Expand All @@ -323,7 +323,7 @@ func MakeSnapshotAndPitrFineFilter(
err error,
) {
tableSnapshots, tablePitrs := snapshotMeta.AccountToTableSnapshots(
accountSnapshots,
snapshots,
pitrs,
)
return func(
Expand All @@ -343,7 +343,7 @@ func MakeSnapshotAndPitrFineFilter(
createTS := createTSs[i]
deleteTS := deleteTSs[i]

snapshots := tableSnapshots[tableID]
sp := tableSnapshots[tableID]
pitr := tablePitrs[tableID]

if transObjects[name] != nil {
Expand All @@ -357,7 +357,7 @@ func MakeSnapshotAndPitrFineFilter(
}

if !logtail.ObjectIsSnapshotRefers(
entry.stats, pitr, &entry.createTS, &entry.dropTS, snapshots,
entry.stats, pitr, &entry.createTS, &entry.dropTS, sp,
) {
bm.Add(uint64(i))
}
Expand All @@ -374,7 +374,7 @@ func MakeSnapshotAndPitrFineFilter(
continue
}
if !logtail.ObjectIsSnapshotRefers(
&stats, pitr, &createTS, &deleteTS, snapshots,
&stats, pitr, &createTS, &deleteTS, sp,
) {
bm.Add(uint64(i))
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
)
Expand Down Expand Up @@ -139,7 +138,7 @@ func (c *MockCleaner) GetMPool() *mpool.MPool {
return nil
}

func (c *MockCleaner) GetSnapshots() (map[uint32]containers.Vector, error) {
func (c *MockCleaner) GetSnapshots() (*logtail.SnapshotInfo, error) {
return nil, nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/vm/engine/tae/db/gc/v3/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/objectio/ioutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
)
Expand Down Expand Up @@ -148,7 +147,7 @@ type Cleaner interface {
DisableGC()
GCEnabled() bool
GetMPool() *mpool.MPool
GetSnapshots() (map[uint32]containers.Vector, error)
GetSnapshots() (*logtail.SnapshotInfo, error)
GetDetails(ctx context.Context) (map[uint32]*TableStats, error)
Verify(ctx context.Context) string

Expand Down
12 changes: 0 additions & 12 deletions pkg/vm/engine/tae/db/gc/v3/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
Expand Down Expand Up @@ -72,16 +70,6 @@ func MakeLoadFunc(
}, releaseFn
}

func TransformToTSList(
fromKV map[uint32]containers.Vector,
) map[uint32][]types.TS {
newKV := make(map[uint32][]types.TS, len(fromKV))
for k, v := range fromKV {
newKV[k] = vector.MustFixedColWithTypeCheck[types.TS](v.GetDownstreamVector())
}
return newKV
}

func MakeGCWindowBuffer(size int) *containers.OneSchemaBatchBuffer {
return containers.NewOneSchemaBatchBuffer(
size, ObjectTableAttrs, ObjectTableTypes, false,
Expand Down
4 changes: 2 additions & 2 deletions pkg/vm/engine/tae/db/gc/v3/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (w *GCWindow) MakeFilesReader(
func (w *GCWindow) ExecuteGlobalCheckpointBasedGC(
ctx context.Context,
gCkp *checkpoint.CheckpointEntry,
accountSnapshots map[uint32][]types.TS,
snapshots *logtail.SnapshotInfo,
pitrs *logtail.PitrInfo,
snapshotMeta *logtail.SnapshotMeta,
buffer *containers.OneSchemaBatchBuffer,
Expand All @@ -140,7 +140,7 @@ func (w *GCWindow) ExecuteGlobalCheckpointBasedGC(
gCkp.GetVersion(),
sourcer,
pitrs,
accountSnapshots,
snapshots,
snapshotMeta,
buffer,
false,
Expand Down
6 changes: 3 additions & 3 deletions pkg/vm/engine/tae/db/gc/v3/window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ func NewMockSnapshotMeta() *MockSnapshotMeta {

// AccountToTableSnapshots mocks the same method in logtail.SnapshotMeta
func (m *MockSnapshotMeta) AccountToTableSnapshots(
accountSnapshots map[uint32][]types.TS,
snapshots *logtail.SnapshotInfo,
pitrs *logtail.PitrInfo,
) (map[uint64][]types.TS, map[uint64][]types.TS) {
) (map[uint64][]types.TS, map[uint64]*types.TS) {
tableSnapshots := make(map[uint64][]types.TS)
tablePitrs := make(map[uint64][]types.TS)
tablePitrs := make(map[uint64]*types.TS)
return tableSnapshots, tablePitrs
}

Expand Down
Loading
Loading