diff --git a/go.mod b/go.mod index a16efe85e..756efd3a0 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/cockroachdb/pebble v0.0.0-20220726144858-a78491c0086f github.com/filecoin-project/go-dagaggregator-unixfs v0.3.0 github.com/gammazero/channelqueue v0.2.1 + github.com/gammazero/deque v0.2.1 github.com/gammazero/targz v0.0.3 github.com/ipfs/boxo v0.15.0 github.com/ipfs/go-cid v0.4.1 @@ -89,7 +90,6 @@ require ( github.com/flynn/noise v1.0.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect - github.com/gammazero/deque v0.2.1 // indirect github.com/gammazero/radixtree v0.3.1 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect diff --git a/ipni-gc/cmd/ipnigc/daemon.go b/ipni-gc/cmd/ipnigc/daemon.go index 5285b5995..81b450a60 100644 --- a/ipni-gc/cmd/ipnigc/daemon.go +++ b/ipni-gc/cmd/ipnigc/daemon.go @@ -119,7 +119,6 @@ func daemonAction(cctx *cli.Context) error { reaper.WithCarCompress(cfgMirror.Compress), reaper.WithCarDelete(cfgMirror.Write), reaper.WithCarRead(true), - reaper.WithCommit(true), reaper.WithDatastoreDir(dsDir), reaper.WithDatastoreTempDir(dsTmpDir), reaper.WithPCache(pc), diff --git a/ipni-gc/cmd/ipnigc/provider.go b/ipni-gc/cmd/ipnigc/provider.go index 3e2098f81..c5601fc41 100644 --- a/ipni-gc/cmd/ipnigc/provider.go +++ b/ipni-gc/cmd/ipnigc/provider.go @@ -34,11 +34,6 @@ var providerFlags = []cli.Flag{ Aliases: []string{"b"}, Value: 1024, }, - &cli.BoolFlag{ - Name: "commit", - Usage: "Commit changes to storage if set. Otherwise, only report what GC would have deleted.", - Aliases: []string{"w"}, - }, &cli.BoolFlag{ Name: "delete-not-found", Usage: "Delete all provider indexes if provider is not found", @@ -72,6 +67,12 @@ var providerFlags = []cli.Flag{ Usage: "Set log level for other loggers that are not ipni-gc", Value: "error", }, + &cli.IntFlag{ + Name: "segment-size", + Usage: "Set advertisement chain segment size. This specifies how many ads to process at a time.", + Aliases: []string{"ss"}, + Value: 16384, + }, } func providerAction(cctx *cli.Context) error { @@ -150,12 +151,12 @@ func providerAction(cctx *cli.Context) error { reaper.WithCarCompress(cfgMirror.Compress), reaper.WithCarDelete(cfgMirror.Write), reaper.WithCarRead(true), - reaper.WithCommit(cctx.Bool("commit")), reaper.WithDatastoreDir(dsDir), reaper.WithDatastoreTempDir(dsTmpDir), reaper.WithDeleteNotFound(cctx.Bool("delete-not-found")), reaper.WithEntriesFromPublisher(cctx.Bool("ents-from-pub")), reaper.WithPCache(pc), + reaper.WithSegmentSize(cctx.Int("segment-size")), reaper.WithTopicName(cfg.Ingest.PubSubTopic), reaper.WithHttpTimeout(time.Duration(cfg.Ingest.HttpSyncTimeout)), ) @@ -164,11 +165,7 @@ func providerAction(cctx *cli.Context) error { } defer grim.Close() - if cctx.Bool("commit") { - fmt.Println("Starting IPNI GC, committing changes") - } else { - fmt.Println("Starting IPNI GC, dry-run - GC progress and changes will not be saved") - } + fmt.Println("Starting IPNI GC") var gcCount int for _, pid := range peerIDs { diff --git a/ipni-gc/reaper/option.go b/ipni-gc/reaper/option.go index 8429b1ccc..6d94acc04 100644 --- a/ipni-gc/reaper/option.go +++ b/ipni-gc/reaper/option.go @@ -12,6 +12,7 @@ import ( const ( defaultHttpTimeout = 10 * time.Second + defaultSegmentSize = 16384 defaultTopic = "/indexer/ingest/mainnet" ) @@ -19,7 +20,6 @@ type config struct { carCompAlg string carDelete bool carRead bool - commit bool deleteNotFound bool dstoreDir string dstoreTmpDir string @@ -28,6 +28,7 @@ type config struct { httpTimeout time.Duration p2pHost host.Host pcache *pcache.ProviderCache + segmentSize int topic string } @@ -41,6 +42,7 @@ func getOpts(opts []Option) (config, error) { carRead: true, entsFromPub: true, httpTimeout: defaultHttpTimeout, + segmentSize: defaultSegmentSize, topic: defaultTopic, } @@ -80,15 +82,8 @@ func WithCarDelete(del bool) Option { } } -// WithCommit tells GC to commit changes to storage. Otherwise, GC only reports -// information about what would have been collected. -func WithCommit(commit bool) Option { - return func(c *config) error { - c.commit = commit - return nil - } -} - +// WithDatastoreDir tells GC the directory to use as the parent for all +// provider-specific datastores. func WithDatastoreDir(dir string) Option { return func(c *config) error { c.dstoreDir = dir @@ -96,6 +91,8 @@ func WithDatastoreDir(dir string) Option { } } +// WithDatastoreTempDir tells GC the directory to use as the parent for all +// provider-specific temproary datastores. func WithDatastoreTempDir(dir string) Option { return func(c *config) error { c.dstoreTmpDir = dir @@ -137,6 +134,17 @@ func WithPCache(pc *pcache.ProviderCache) Option { } } +// WithSegmentSize sets the size of the segments that the ad chain is broken +// into for processing. +func WithSegmentSize(size int) Option { + return func(c *config) error { + if size > 0 { + c.segmentSize = size + } + return nil + } +} + // WithTopicName sets the topic name on which the provider announces advertised // content. Defaults to '/indexer/ingest/mainnet'. func WithTopicName(topic string) Option { diff --git a/ipni-gc/reaper/reaper.go b/ipni-gc/reaper/reaper.go index b06d2b845..e33d83b6b 100644 --- a/ipni-gc/reaper/reaper.go +++ b/ipni-gc/reaper/reaper.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/gammazero/deque" "github.com/gammazero/targz" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -55,6 +56,7 @@ type GCStats struct { CarsRemoved int CtxIDsKept int CtxIDsRemoved int + EmptyAds int IndexAdsKept int IndexAdsRemoved int IndexesRemoved int @@ -70,6 +72,7 @@ func (s GCStats) String() string { " CarsRemoved:", s.CarsRemoved, " CtxIDsKept:", s.CtxIDsKept, " CtxIDsRemoved:", s.CtxIDsRemoved, + " EmptyAds:", s.EmptyAds, " IndexAdsKept:", s.IndexAdsKept, " IndexAdsRemoved:", s.IndexAdsRemoved, " IndexesRemoved:", s.IndexesRemoved, @@ -82,7 +85,6 @@ func (s GCStats) String() string { type Reaper struct { carDelete bool carReader *carstore.CarReader - commit bool delNotFound bool dsDir string dsTmpDir string @@ -93,6 +95,7 @@ type Reaper struct { httpTimeout time.Duration indexer indexer.Interface pcache *pcache.ProviderCache + segmentSize int stats GCStats statsMutex sync.Mutex topic string @@ -120,6 +123,13 @@ type GCState struct { LastProcessedAdCid cid.Cid } +type adInfo struct { + cid cid.Cid + contextID []byte + empty bool + isRm bool +} + func dsContextPrefix(contextID string) string { return fmt.Sprintf("/ctx/%s/", contextID) } @@ -164,10 +174,13 @@ func New(idxr indexer.Interface, fileStore filestore.Interface, options ...Optio } } + if opts.dstoreTmpDir == "" { + opts.dstoreTmpDir = os.TempDir() + } + return &Reaper{ carDelete: opts.carDelete, carReader: carReader, - commit: opts.commit, delNotFound: opts.deleteNotFound, dsDir: opts.dstoreDir, dsTmpDir: opts.dstoreTmpDir, @@ -178,6 +191,7 @@ func New(idxr indexer.Interface, fileStore filestore.Interface, options ...Optio httpTimeout: opts.httpTimeout, indexer: idxr, pcache: opts.pcache, + segmentSize: opts.segmentSize, topic: opts.topic, }, nil } @@ -199,6 +213,7 @@ func (r *Reaper) AddStats(a GCStats) { r.stats.CarsRemoved += a.CarsRemoved r.stats.CtxIDsKept += a.CtxIDsKept r.stats.CtxIDsRemoved += a.CtxIDsRemoved + r.stats.EmptyAds += a.EmptyAds r.stats.IndexAdsKept += a.IndexAdsKept r.stats.IndexAdsRemoved += a.IndexAdsRemoved r.stats.IndexesRemoved += a.IndexesRemoved @@ -245,19 +260,10 @@ func (r *Reaper) Reap(ctx context.Context, providerID peer.ID) error { }() // Create datastore for temporary ad data. - var tmpDir string - if r.dsTmpDir == "" { - tmpDir, err = os.MkdirTemp("", "gc-tmp-"+providerID.String()) - if err != nil { - return fmt.Errorf("cannot create temp directory for gc: %w", err) - } - defer os.RemoveAll(tmpDir) - } else { - if err = fsutil.DirWritable(r.dsTmpDir); err != nil { - return err - } - tmpDir = filepath.Join(r.dsTmpDir, "gc-tmp-"+providerID.String()) + if err = fsutil.DirWritable(r.dsTmpDir); err != nil { + return err } + tmpDir := filepath.Join(r.dsTmpDir, "gc-tmp-"+providerID.String()) dstoreTmp, err := createDatastore(tmpDir) if err != nil { return fmt.Errorf("failed to create temporary datastore: %w", err) @@ -386,30 +392,26 @@ func (r *Reaper) removeProvider(ctx context.Context, providerID peer.ID) error { dstore.Close() // Delete gc-datastore archive. - if r.commit { - name := ArchiveName(providerID) - err = r.fileStore.Delete(ctx, name) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - log.Errorw("Cannot delete datastore archive for provider", "err", err, "name", name) - } - // Delete gc-datastore. - dstoreDir := filepath.Join(r.dsDir, dstoreDirName(providerID)) - err = os.RemoveAll(dstoreDir) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - log.Errorw("Cannot remove datastore directory for provider", "err", err, "dir", dstoreDir) - } + name := ArchiveName(providerID) + err = r.fileStore.Delete(ctx, name) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("Cannot delete datastore archive for provider", "err", err, "name", name) + } + // Delete gc-datastore. + dstoreDir := filepath.Join(r.dsDir, dstoreDirName(providerID)) + err = os.RemoveAll(dstoreDir) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("Cannot remove datastore directory for provider", "err", err, "dir", dstoreDir) } r.AddStats(s.stats) } // Delete temporary gc-datastore. - if r.dsTmpDir != "" && r.commit { - name := filepath.Join(r.dsTmpDir, "gc-tmp-"+providerID.String()) - err = os.RemoveAll(name) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - log.Errorw("Cannot remove datastore directory for provider", "err", err, "dir", name) - } + name := filepath.Join(r.dsTmpDir, "gc-tmp-"+providerID.String()) + err = os.RemoveAll(name) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("Cannot remove datastore directory for provider", "err", err, "dir", name) } if r.carReader == nil { @@ -427,10 +429,8 @@ func (r *Reaper) removeProvider(ctx context.Context, providerID peer.ID) error { // OK if head file does not exist. No CAR files to clean up. return nil } - if r.commit { - if err = carWriter.DeleteHead(ctx, providerID); err != nil { - log.Errorw("Failed to delete head file for provider", "err", err, "provider", providerID) - } + if err = carWriter.DeleteHead(ctx, providerID); err != nil { + log.Errorw("Failed to delete head file for provider", "err", err, "provider", providerID) } return fmt.Errorf("cannot read head advertisement for provider %s: %w", providerID, err) } @@ -479,20 +479,16 @@ func (r *Reaper) removeProvider(ctx context.Context, providerID peer.ID) error { } if adCid == cid.Undef { - if r.commit { - if err = carWriter.DeleteHead(ctx, providerID); err != nil { - err = fmt.Errorf("failed to delete head file: %w", err) - } + if err = carWriter.DeleteHead(ctx, providerID); err != nil { + err = fmt.Errorf("failed to delete head file: %w", err) } stats.TimeElapsed = time.Since(startTime) log.Infow("Finished GC for removed provider", "provider", providerID, "stats", stats.String()) } else if newHead != cid.Undef { // Did not complete. Save head where GC left off. - if r.commit { - _, err = carWriter.WriteHead(ctx, newHead, providerID) - if err != nil { - err = fmt.Errorf("failed to update head file: %w", err) - } + _, err = carWriter.WriteHead(ctx, newHead, providerID) + if err != nil { + err = fmt.Errorf("failed to update head file: %w", err) } stats.TimeElapsed = time.Since(startTime) log.Infow("Incomplete GC for removed provider", "provider", providerID, "stats", stats.String()) @@ -556,7 +552,6 @@ func (r *Reaper) cleanCarIndexes(ctx context.Context, adCid cid.Cid) (cid.Cid, i MetadataBytes: ad.Metadata, } - commit := r.commit indexer := r.indexer var mhCount int @@ -573,10 +568,8 @@ func (r *Reaper) cleanCarIndexes(ctx context.Context, adCid cid.Cid) (cid.Cid, i if len(chunk.Entries) == 0 { continue } - if commit { - if err = indexer.Remove(value, chunk.Entries...); err != nil { - return cid.Undef, mhCount, fmt.Errorf("%w: %w", errIndexerWrite, err) - } + if err = indexer.Remove(value, chunk.Entries...); err != nil { + return cid.Undef, mhCount, fmt.Errorf("%w: %w", errIndexerWrite, err) } mhCount += len(chunk.Entries) } @@ -596,10 +589,8 @@ func (r *Reaper) deleteCarFile(ctx context.Context, adCid cid.Cid) (int64, error } return 0, err } - if r.commit { - if err = r.fileStore.Delete(ctx, carPath); err != nil { - return 0, fmt.Errorf("failed to remove CAR file: %w", err) - } + if err = r.fileStore.Delete(ctx, carPath); err != nil { + return 0, fmt.Errorf("failed to remove CAR file: %w", err) } log.Infow("Deleted CAR file", "name", carPath, "size", file.Size) return file.Size, nil @@ -638,16 +629,65 @@ func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error { } }() - removedCtxSet := make(map[string]struct{}) - remaining := make(map[string][]cid.Cid) + segSize := s.reaper.segmentSize + segment := deque.New[adInfo](segSize, segSize) - for adCid := latestAdCid; adCid != gcState.LastProcessedAdCid; { - ad, err := s.loadAd(adCid) + for segEnd := gcState.LastProcessedAdCid; segEnd != latestAdCid; { + for adCid := latestAdCid; adCid != segEnd; { + ad, err := s.loadAd(adCid) + if err != nil { + return fmt.Errorf("failed to load advertisement %s: %w", adCid.String(), err) + } + if segment.Len() == segSize { + segment.PopFront() + } + segment.PushBack(adInfo{ + cid: adCid, + contextID: ad.ContextID, + empty: ad.Entries == nil || ad.Entries == schema.NoEntries, + isRm: ad.IsRm, + }) + if ad.PreviousID == nil { + break + } + adCid = ad.PreviousID.(cidlink.Link).Cid + if adCid == cid.Undef { + // This should never happen, unless the provider has a + // different chain that it did previously. + log.Errorw("Did not find last provessed advertisement in chain", "lastProcessed", segEnd, "chainEnd", adCid) + break + } + } + + segEnd = segment.Front().cid + + err = s.reapSegment(ctx, segment) if err != nil { - return fmt.Errorf("failed to load advertisement %s: %w", adCid.String(), err) + return err } - contextID := base64.StdEncoding.EncodeToString(ad.ContextID) - if ad.IsRm { + + // Update GC state. + gcState.LastProcessedAdCid = segEnd + if err = s.saveGCState(ctx, gcState); err != nil { + return err + } + } + return nil +} + +func (s *scythe) reapSegment(ctx context.Context, segment *deque.Deque[adInfo]) error { + log.Infow("Processing advertisements segment", "size", segment.Len()) + + removedCtxSet := make(map[string]struct{}) + remaining := make(map[string][]cid.Cid) + var err error + + for segment.Len() != 0 { + ad := segment.Front() + segment.PopFront() + adCid := ad.cid + contextID := base64.StdEncoding.EncodeToString(ad.contextID) + if ad.isRm { log.Debugw("Processing removal ad", "adCid", adCid) s.stats.RemovalAds++ reused, ok := remaining[contextID] @@ -667,12 +707,13 @@ func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error { if err = s.deleteCarFile(ctx, adCid); err != nil { return err } - } else if ad.Entries == nil || ad.Entries == schema.NoEntries { + } else if ad.empty { log.Debugw("Processing no-content ad", "adCid", adCid) // Delete CAR file of empty ad. if err = s.deleteCarFile(ctx, adCid); err != nil { return err } + s.stats.EmptyAds++ } else { _, ok := removedCtxSet[contextID] if ok { @@ -688,14 +729,7 @@ func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error { } } s.stats.AdsProcessed++ - - if ad.PreviousID == nil { - adCid = cid.Undef - } else { - adCid = ad.PreviousID.(cidlink.Link).Cid - } } - log.Debugw("Done processing advertisements", "processed", s.stats.AdsProcessed, "removed", s.stats.IndexAdsRemoved) // Record which ads remain undeleted. if err = s.saveRemaining(ctx, remaining); err != nil { @@ -704,9 +738,8 @@ func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error { s.stats.CtxIDsKept += len(remaining) s.stats.CtxIDsRemoved += len(removedCtxSet) - // Update GC state. - gcState.LastProcessedAdCid = latestAdCid - return s.saveGCState(ctx, gcState) + log.Infow("Done processing advertisements segment", "stats", s.stats.String()) + return nil } func (s *scythe) saveRemoved(ctx context.Context, adCid cid.Cid) error { @@ -788,8 +821,6 @@ func (s *scythe) reapPrefixedAds(ctx context.Context, prefix string) error { return err } - commit := s.reaper.commit - if len(ents) == 0 { return nil } @@ -800,33 +831,23 @@ func (s *scythe) reapPrefixedAds(ctx context.Context, prefix string) error { adCid, err := cid.Decode(path.Base(key)) if err != nil { log.Errorw("Cannot decode remaining advertisement cid", "err", err) - if commit { - if err = s.dstore.Delete(ctx, datastore.NewKey(key)); err != nil { - return err - } + if err = s.dstore.Delete(ctx, datastore.NewKey(key)); err != nil { + return err } continue } if err = s.saveRemoved(ctx, adCid); err != nil { return err } - if commit { - if err = s.dstore.Delete(ctx, datastore.NewKey(key)); err != nil { - return err - } + if err = s.dstore.Delete(ctx, datastore.NewKey(key)); err != nil { + return err } } - if !commit { - return nil - } return s.dstore.Sync(ctx, datastore.NewKey(prefix)) } func (s *scythe) saveRemaining(ctx context.Context, remaining map[string][]cid.Cid) error { - if !s.reaper.commit { - return nil - } for contextID, adCids := range remaining { ctxPrefix := dsContextPrefix(contextID) for _, adCid := range adCids { @@ -896,7 +917,7 @@ func (s *scythe) loadGCState(ctx context.Context) (GCState, error) { } func (s *scythe) saveGCState(ctx context.Context, gcState GCState) error { - if !s.reaper.commit || s.dstore == nil { + if s.dstore == nil { return nil } @@ -988,7 +1009,6 @@ func (s *scythe) removeEntriesFromCar(ctx context.Context, adCid cid.Cid) error MetadataBytes: ad.Metadata, } - commit := s.reaper.commit indexer := s.reaper.indexer for entryBlock := range adBlock.Entries { @@ -1003,10 +1023,8 @@ func (s *scythe) removeEntriesFromCar(ctx context.Context, adCid cid.Cid) error if len(chunk.Entries) == 0 { continue } - if commit { - if err = indexer.Remove(value, chunk.Entries...); err != nil { - return fmt.Errorf("%w: %w", errIndexerWrite, err) - } + if err = indexer.Remove(value, chunk.Entries...); err != nil { + return fmt.Errorf("%w: %w", errIndexerWrite, err) } s.stats.IndexesRemoved += len(chunk.Entries) } @@ -1040,7 +1058,6 @@ func (s *scythe) removeEntriesFromPublisher(ctx context.Context, adCid cid.Cid) MetadataBytes: ad.Metadata, } - commit := s.reaper.commit indexer := s.reaper.indexer for entsCid != cid.Undef { @@ -1051,13 +1068,11 @@ func (s *scythe) removeEntriesFromPublisher(ctx context.Context, adCid cid.Cid) if err != nil { return fmt.Errorf("failed to load first entry chunk: %w", err) } - if commit { + if err = indexer.Remove(value, chunk.Entries...); err != nil { + log.Errorw("Failed to remove indexes from valuestore, retrying", "err", err, "indexes", len(chunk.Entries)) + time.Sleep(100 * time.Millisecond) if err = indexer.Remove(value, chunk.Entries...); err != nil { - log.Errorw("Failed to remove indexes from valuestore, retrying", "err", err, "indexes", len(chunk.Entries)) - time.Sleep(100 * time.Millisecond) - if err = indexer.Remove(value, chunk.Entries...); err != nil { - return fmt.Errorf("%w: %w", errIndexerWrite, err) - } + return fmt.Errorf("%w: %w", errIndexerWrite, err) } } s.stats.IndexesRemoved += len(chunk.Entries) @@ -1126,9 +1141,6 @@ func (r *Reaper) unarchiveDatastore(ctx context.Context, providerID peer.ID) err // create an archive of `gc-data-PId` named `gc-data-PID.tar.gz` and copy // it to the filestore. func (s *scythe) archiveDatastore(ctx context.Context) error { - if !s.reaper.commit { - return nil - } if s.reaper.fileStore == nil { log.Warn("Filestore not available to save gc datastore to") return nil diff --git a/ipni-gc/reaper/reaper_test.go b/ipni-gc/reaper/reaper_test.go index 7a54174f8..99952d227 100644 --- a/ipni-gc/reaper/reaper_test.go +++ b/ipni-gc/reaper/reaper_test.go @@ -65,7 +65,6 @@ func TestReaper(t *testing.T) { gc, err := reaper.New(idxr, fileStore, reaper.WithCarDelete(true), reaper.WithCarRead(true), - reaper.WithCommit(true), reaper.WithDatastoreDir(dsDir), reaper.WithDatastoreTempDir(dsTmpDir), reaper.WithPCache(pc), @@ -91,7 +90,6 @@ func TestReaper(t *testing.T) { gc2, err := reaper.New(idxr, fileStore, reaper.WithCarDelete(true), reaper.WithCarRead(true), - reaper.WithCommit(true), reaper.WithDatastoreDir(dsDir), reaper.WithDatastoreTempDir(dsTmpDir), reaper.WithDeleteNotFound(true), @@ -114,7 +112,6 @@ func TestReaper(t *testing.T) { gc2, err = reaper.New(idxr, fileStore, reaper.WithCarDelete(true), reaper.WithCarRead(true), - reaper.WithCommit(true), reaper.WithDatastoreDir(dsDir), reaper.WithDatastoreTempDir(dsTmpDir), reaper.WithDeleteNotFound(true),