@@ -416,7 +416,7 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
416416 )
417417 return
418418 }
419- var snapshots * logtail. SnapshotInfo
419+ var snapshots map [ uint32 ]containers. Vector
420420 var pitrs * logtail.PitrInfo
421421 pitrs , err = c .GetPITRsLocked (ctx )
422422 if err != nil {
@@ -440,6 +440,8 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
440440 )
441441 return
442442 }
443+ accountSnapshots := TransformToTSList (snapshots )
444+ logtail .CloseSnapshotList (snapshots )
443445 _ , sarg , _ := fault .TriggerFault ("replay error UT" )
444446 if sarg != "" {
445447 err = moerr .NewInternalErrorNoCtxf ("GC-REPLAY-GET-CHECKPOINT-DATA-ERROR %s" , sarg )
@@ -462,12 +464,13 @@ func (c *checkpointCleaner) Replay(inputCtx context.Context) (err error) {
462464 c .checkpointCli .GetCatalog ().GetUsageMemo ().(* logtail.TNUsageMemo ),
463465 ckpBatch ,
464466 c .mutation .snapshotMeta ,
465- snapshots ,
467+ accountSnapshots ,
466468 pitrs ,
467469 0 )
468470 logutil .Info (
469471 "GC-REPLAY-COLLECT-SNAPSHOT-SIZE" ,
470472 zap .String ("task" , c .TaskNameLocked ()),
473+ zap .Int ("size" , len (accountSnapshots )),
471474 zap .Duration ("duration" , time .Since (start )),
472475 zap .String ("checkpoint" , compacted .String ()),
473476 zap .Int ("count" , ckpBatch .RowCount ()),
@@ -774,7 +777,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked(
774777 ctx context.Context ,
775778 checkpointLowWaterMark * types.TS ,
776779 memoryBuffer * containers.OneSchemaBatchBuffer ,
777- snapshots * logtail. SnapshotInfo ,
780+ accountSnapshots map [ uint32 ][]types. TS ,
778781 pitrs * logtail.PitrInfo ,
779782 gcFileCount int ,
780783) (err error ) {
@@ -885,7 +888,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked(
885888 c .checkpointCli .GetCatalog ().GetUsageMemo ().(* logtail.TNUsageMemo ),
886889 newCkpData ,
887890 c .mutation .snapshotMeta ,
888- snapshots ,
891+ accountSnapshots ,
889892 pitrs ,
890893 gcFileCount )
891894 if newCkp == nil {
@@ -1066,9 +1069,10 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
10661069 memoryBuffer * containers.OneSchemaBatchBuffer ,
10671070) (err error ) {
10681071 now := time .Now ()
1069- var snapshots * logtail. SnapshotInfo
1072+ var snapshots map [ uint32 ]containers. Vector
10701073 var extraErrMsg string
10711074 defer func () {
1075+ logtail .CloseSnapshotList (snapshots )
10721076 logutil .Info (
10731077 "GC-TRACE-TRY-GC-AGAINST-GCKP" ,
10741078 zap .String ("task" , c .TaskNameLocked ()),
@@ -1088,8 +1092,9 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
10881092 extraErrMsg = "GetSnapshot failed"
10891093 return
10901094 }
1095+ accountSnapshots := TransformToTSList (snapshots )
10911096 filesToGC , err := c .doGCAgainstGlobalCheckpointLocked (
1092- ctx , gckp , snapshots , pitrs , memoryBuffer ,
1097+ ctx , gckp , accountSnapshots , pitrs , memoryBuffer ,
10931098 )
10941099 if err != nil {
10951100 extraErrMsg = "doGCAgainstGlobalCheckpointLocked failed"
@@ -1127,7 +1132,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
11271132 waterMark = scanMark
11281133 }
11291134 err = c .mergeCheckpointFilesLocked (
1130- ctx , & waterMark , memoryBuffer , snapshots , pitrs , len (filesToGC ),
1135+ ctx , & waterMark , memoryBuffer , accountSnapshots , pitrs , len (filesToGC ),
11311136 )
11321137 if err != nil {
11331138 extraErrMsg = fmt .Sprintf ("mergeCheckpointFilesLocked %v failed" , waterMark .ToString ())
@@ -1140,7 +1145,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked(
11401145func (c * checkpointCleaner ) doGCAgainstGlobalCheckpointLocked (
11411146 ctx context.Context ,
11421147 gckp * checkpoint.CheckpointEntry ,
1143- snapshots * logtail. SnapshotInfo ,
1148+ accountSnapshots map [ uint32 ][]types. TS ,
11441149 pitrs * logtail.PitrInfo ,
11451150 memoryBuffer * containers.OneSchemaBatchBuffer ,
11461151) ([]string , error ) {
@@ -1184,7 +1189,7 @@ func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked(
11841189 if filesToGC , metafile , err = scannedWindow .ExecuteGlobalCheckpointBasedGC (
11851190 ctx ,
11861191 gckp ,
1187- snapshots ,
1192+ accountSnapshots ,
11881193 pitrs ,
11891194 c .mutation .snapshotMeta ,
11901195 iscp ,
@@ -1227,7 +1232,7 @@ func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked(
12271232 now = time .Now ()
12281233 // TODO:
12291234 c .updateGCWaterMark (gckp )
1230- c .mutation .snapshotMeta .MergeTableInfo (snapshots , pitrs )
1235+ c .mutation .snapshotMeta .MergeTableInfo (accountSnapshots , pitrs )
12311236 mergeDuration = time .Since (now )
12321237 return filesToGC , nil
12331238}
@@ -1319,7 +1324,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13191324 // TODO
13201325 return err
13211326 }
1322- var snapshots * logtail. SnapshotInfo
1327+ var snapshots map [ uint32 ]containers. Vector
13231328 snapshots , err = c .GetSnapshotsLocked ()
13241329 if err != nil {
13251330 logutil .Error (
@@ -1329,6 +1334,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13291334 )
13301335 return err
13311336 }
1337+ defer logtail .CloseSnapshotList (snapshots )
13321338 var pitr * logtail.PitrInfo
13331339 pitr , err = c .GetPITRsLocked (c .ctx )
13341340 if err != nil {
@@ -1342,6 +1348,8 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13421348
13431349 mergeWindow := c .GetScannedWindowLocked ().Clone ()
13441350 defer mergeWindow .Close ()
1351+
1352+ accoutSnapshots := TransformToTSList (snapshots )
13451353 logutil .Info (
13461354 "GC-TRACE-MERGE-WINDOW" ,
13471355 zap .String ("task" , c .TaskNameLocked ()),
@@ -1354,7 +1362,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13541362 if _ , _ , err = mergeWindow .ExecuteGlobalCheckpointBasedGC (
13551363 c .ctx ,
13561364 gCkp ,
1357- snapshots ,
1365+ accoutSnapshots ,
13581366 pitr ,
13591367 c .mutation .snapshotMeta ,
13601368 iscp ,
@@ -1378,7 +1386,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
13781386 if _ , _ , err = debugWindow .ExecuteGlobalCheckpointBasedGC (
13791387 c .ctx ,
13801388 gCkp ,
1381- snapshots ,
1389+ accoutSnapshots ,
13821390 pitr ,
13831391 c .mutation .snapshotMeta ,
13841392 iscp ,
@@ -1460,7 +1468,7 @@ func (c *checkpointCleaner) DoCheck(ctx context.Context) error {
14601468 }
14611469 collectObjectsFromCheckpointData (c .ctx , ckpReader , cptCkpObjects )
14621470
1463- tList , pList := c .mutation .snapshotMeta .AccountToTableSnapshots (snapshots , pitr )
1471+ tList , pList := c .mutation .snapshotMeta .AccountToTableSnapshots (accoutSnapshots , pitr )
14641472 for name , tables := range ickpObjects {
14651473 for _ , entry := range tables {
14661474 if cptCkpObjects [name ] != nil {
@@ -1832,13 +1840,12 @@ func (c *checkpointCleaner) mutUpdateSnapshotMetaLocked(
18321840 )
18331841}
18341842
1835- func (c * checkpointCleaner ) GetSnapshots () (* logtail. SnapshotInfo , error ) {
1843+ func (c * checkpointCleaner ) GetSnapshots () (map [ uint32 ]containers. Vector , error ) {
18361844 c .mutation .Lock ()
18371845 defer c .mutation .Unlock ()
18381846 return c .mutation .snapshotMeta .GetSnapshot (c .ctx , c .sid , c .fs , c .mp )
18391847}
1840-
1841- func (c * checkpointCleaner ) GetSnapshotsLocked () (* logtail.SnapshotInfo , error ) {
1848+ func (c * checkpointCleaner ) GetSnapshotsLocked () (map [uint32 ]containers.Vector , error ) {
18421849 return c .mutation .snapshotMeta .GetSnapshot (c .ctx , c .sid , c .fs , c .mp )
18431850}
18441851func (c * checkpointCleaner ) GetTablePK (tid uint64 ) string {
0 commit comments