From 2989e923e0097ae2e60dfc4db6c1f51468dcdf3f Mon Sep 17 00:00:00 2001 From: gammazero Date: Wed, 20 Dec 2023 16:58:49 -0800 Subject: [PATCH] Provess ad chain in segments during GC. If the amount of the ad chain to process is very long, then do not process all the ads at the same time. This may lead to keeping too many CIDs and context IDs in memory resulting in an out-of-memory condition. Instead process a segment of the chain at a time, starting from the oldest to the newest segment. --- .github/workflows/ecr-publisher.yml | 1 + ipni-gc/cmd/ipnigc/provider.go | 7 +++ ipni-gc/reaper/option.go | 14 +++++ ipni-gc/reaper/reaper.go | 91 +++++++++++++++++++++++------ 4 files changed, 95 insertions(+), 18 deletions(-) diff --git a/.github/workflows/ecr-publisher.yml b/.github/workflows/ecr-publisher.yml index fcf4bb316..ef89a27e0 100644 --- a/.github/workflows/ecr-publisher.yml +++ b/.github/workflows/ecr-publisher.yml @@ -14,6 +14,7 @@ on: - 'doc/**' branches: - main + - gc-by-segment jobs: publisher: diff --git a/ipni-gc/cmd/ipnigc/provider.go b/ipni-gc/cmd/ipnigc/provider.go index 3e2098f81..8ccadd4b5 100644 --- a/ipni-gc/cmd/ipnigc/provider.go +++ b/ipni-gc/cmd/ipnigc/provider.go @@ -72,6 +72,12 @@ var providerFlags = []cli.Flag{ Usage: "Set log level for other loggers that are not ipni-gc", Value: "error", }, + &cli.IntFlag{ + Name: "segment-size", + Usage: "Set advertisement chain segment size. This specifies how many ads to process at a time.", + Aliases: []string{"ss"}, + Value: 16384, + }, } func providerAction(cctx *cli.Context) error { @@ -156,6 +162,7 @@ func providerAction(cctx *cli.Context) error { reaper.WithDeleteNotFound(cctx.Bool("delete-not-found")), reaper.WithEntriesFromPublisher(cctx.Bool("ents-from-pub")), reaper.WithPCache(pc), + reaper.WithSegmentSize(cctx.Int("segment-size")), reaper.WithTopicName(cfg.Ingest.PubSubTopic), reaper.WithHttpTimeout(time.Duration(cfg.Ingest.HttpSyncTimeout)), ) diff --git a/ipni-gc/reaper/option.go b/ipni-gc/reaper/option.go index 8429b1ccc..e7e828225 100644 --- a/ipni-gc/reaper/option.go +++ b/ipni-gc/reaper/option.go @@ -12,6 +12,7 @@ import ( const ( defaultHttpTimeout = 10 * time.Second + defaultSegmentSize = 16384 defaultTopic = "/indexer/ingest/mainnet" ) @@ -28,6 +29,7 @@ type config struct { httpTimeout time.Duration p2pHost host.Host pcache *pcache.ProviderCache + segmentSize int topic string } @@ -41,6 +43,7 @@ func getOpts(opts []Option) (config, error) { carRead: true, entsFromPub: true, httpTimeout: defaultHttpTimeout, + segmentSize: defaultSegmentSize, topic: defaultTopic, } @@ -137,6 +140,17 @@ func WithPCache(pc *pcache.ProviderCache) Option { } } +// WithSegmentSize sets the size of the segments that the ad chain is broken +// into for processing. +func WithSegmentSize(size int) Option { + return func(c *config) error { + if size > 0 { + c.segmentSize = size + } + return nil + } +} + // WithTopicName sets the topic name on which the provider announces advertised // content. Defaults to '/indexer/ingest/mainnet'. func WithTopicName(topic string) Option { diff --git a/ipni-gc/reaper/reaper.go b/ipni-gc/reaper/reaper.go index b06d2b845..4f71ec332 100644 --- a/ipni-gc/reaper/reaper.go +++ b/ipni-gc/reaper/reaper.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/gammazero/deque" "github.com/gammazero/targz" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -93,6 +94,7 @@ type Reaper struct { httpTimeout time.Duration indexer indexer.Interface pcache *pcache.ProviderCache + segmentSize int stats GCStats statsMutex sync.Mutex topic string @@ -120,6 +122,13 @@ type GCState struct { LastProcessedAdCid cid.Cid } +type adInfo struct { + cid cid.Cid + contextID []byte + empty bool + isRm bool +} + func dsContextPrefix(contextID string) string { return fmt.Sprintf("/ctx/%s/", contextID) } @@ -178,6 +187,7 @@ func New(idxr indexer.Interface, fileStore filestore.Interface, options ...Optio httpTimeout: opts.httpTimeout, indexer: idxr, pcache: opts.pcache, + segmentSize: opts.segmentSize, topic: opts.topic, }, nil } @@ -638,16 +648,69 @@ func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error { } }() - removedCtxSet := make(map[string]struct{}) - remaining := make(map[string][]cid.Cid) + segSize := s.reaper.segmentSize + segment := deque.New[adInfo](segSize, segSize) + + for segEnd := gcState.LastProcessedAdCid; segEnd != latestAdCid; { + for adCid := latestAdCid; adCid != segEnd; { + ad, err := s.loadAd(adCid) + if err != nil { + return fmt.Errorf("failed to load advertisement %s: %w", adCid.String(), err) + } + var empty bool + if !ad.IsRm && (ad.Entries == nil || ad.Entries == schema.NoEntries) { + empty = true + } + if segment.Len() == segSize { + segment.PopFront() + } + ai := adInfo{ + cid: adCid, + contextID: ad.ContextID, + empty: empty, + isRm: ad.IsRm, + } + segment.PushBack(ai) + if ad.PreviousID == nil { + break + } + adCid = ad.PreviousID.(cidlink.Link).Cid + if adCid == cid.Undef { + // This should never happen, unless the provider has a + // different chain that it did previously. + log.Errorw("Did not find last provessed advertisement in chain", "lastProcessed", segEnd, "chainEnd", adCid) + break + } + } - for adCid := latestAdCid; adCid != gcState.LastProcessedAdCid; { - ad, err := s.loadAd(adCid) + segEnd = segment.Front().cid + + err = s.reapSegment(ctx, segment) if err != nil { - return fmt.Errorf("failed to load advertisement %s: %w", adCid.String(), err) + return err } - contextID := base64.StdEncoding.EncodeToString(ad.ContextID) - if ad.IsRm { + + // Update GC state. + gcState.LastProcessedAdCid = segEnd + if err = s.saveGCState(ctx, gcState); err != nil { + return err + } + } + return nil +} + +func (s *scythe) reapSegment(ctx context.Context, segment *deque.Deque[adInfo]) error { + removedCtxSet := make(map[string]struct{}) + remaining := make(map[string][]cid.Cid) + segSize := segment.Len() + + var err error + for segment.Len() != 0 { + ad := segment.Front() + segment.PopFront() + adCid := ad.cid + contextID := base64.StdEncoding.EncodeToString(ad.contextID) + if ad.isRm { log.Debugw("Processing removal ad", "adCid", adCid) s.stats.RemovalAds++ reused, ok := remaining[contextID] @@ -667,7 +730,7 @@ func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error { if err = s.deleteCarFile(ctx, adCid); err != nil { return err } - } else if ad.Entries == nil || ad.Entries == schema.NoEntries { + } else if ad.empty { log.Debugw("Processing no-content ad", "adCid", adCid) // Delete CAR file of empty ad. if err = s.deleteCarFile(ctx, adCid); err != nil { @@ -688,14 +751,8 @@ func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error { } } s.stats.AdsProcessed++ - - if ad.PreviousID == nil { - adCid = cid.Undef - } else { - adCid = ad.PreviousID.(cidlink.Link).Cid - } } - log.Debugw("Done processing advertisements", "processed", s.stats.AdsProcessed, "removed", s.stats.IndexAdsRemoved) + log.Debugw("Done processing advertisements segment", "size", segSize, "processed", s.stats.AdsProcessed, "removed", s.stats.IndexAdsRemoved) // Record which ads remain undeleted. if err = s.saveRemaining(ctx, remaining); err != nil { @@ -704,9 +761,7 @@ func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error { s.stats.CtxIDsKept += len(remaining) s.stats.CtxIDsRemoved += len(removedCtxSet) - // Update GC state. - gcState.LastProcessedAdCid = latestAdCid - return s.saveGCState(ctx, gcState) + return nil } func (s *scythe) saveRemoved(ctx context.Context, adCid cid.Cid) error {