Skip to content

Commit

Permalink
Provess ad chain in segments during GC.
Browse files Browse the repository at this point in the history
If the amount of the ad chain to process is very long, then do not process all the ads at the same time. This may lead to keeping too many CIDs and context IDs in memory resulting in an out-of-memory condition. Instead process a segment of the chain at a time, starting from the oldest to the newest segment.
  • Loading branch information
gammazero committed Dec 21, 2023
1 parent 4ee3e51 commit 2989e92
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 18 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ecr-publisher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ on:
- 'doc/**'
branches:
- main
- gc-by-segment

jobs:
publisher:
Expand Down
7 changes: 7 additions & 0 deletions ipni-gc/cmd/ipnigc/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ var providerFlags = []cli.Flag{
Usage: "Set log level for other loggers that are not ipni-gc",
Value: "error",
},
&cli.IntFlag{
Name: "segment-size",
Usage: "Set advertisement chain segment size. This specifies how many ads to process at a time.",
Aliases: []string{"ss"},
Value: 16384,
},
}

func providerAction(cctx *cli.Context) error {
Expand Down Expand Up @@ -156,6 +162,7 @@ func providerAction(cctx *cli.Context) error {
reaper.WithDeleteNotFound(cctx.Bool("delete-not-found")),
reaper.WithEntriesFromPublisher(cctx.Bool("ents-from-pub")),
reaper.WithPCache(pc),
reaper.WithSegmentSize(cctx.Int("segment-size")),
reaper.WithTopicName(cfg.Ingest.PubSubTopic),
reaper.WithHttpTimeout(time.Duration(cfg.Ingest.HttpSyncTimeout)),
)
Expand Down
14 changes: 14 additions & 0 deletions ipni-gc/reaper/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

const (
defaultHttpTimeout = 10 * time.Second
defaultSegmentSize = 16384
defaultTopic = "/indexer/ingest/mainnet"
)

Expand All @@ -28,6 +29,7 @@ type config struct {
httpTimeout time.Duration
p2pHost host.Host
pcache *pcache.ProviderCache
segmentSize int
topic string
}

Expand All @@ -41,6 +43,7 @@ func getOpts(opts []Option) (config, error) {
carRead: true,
entsFromPub: true,
httpTimeout: defaultHttpTimeout,
segmentSize: defaultSegmentSize,
topic: defaultTopic,
}

Expand Down Expand Up @@ -137,6 +140,17 @@ func WithPCache(pc *pcache.ProviderCache) Option {
}
}

// WithSegmentSize sets the size of the segments that the ad chain is broken
// into for processing.
func WithSegmentSize(size int) Option {
return func(c *config) error {
if size > 0 {
c.segmentSize = size
}
return nil
}
}

// WithTopicName sets the topic name on which the provider announces advertised
// content. Defaults to '/indexer/ingest/mainnet'.
func WithTopicName(topic string) Option {
Expand Down
91 changes: 73 additions & 18 deletions ipni-gc/reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"
"time"

"github.com/gammazero/deque"
"github.com/gammazero/targz"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -93,6 +94,7 @@ type Reaper struct {
httpTimeout time.Duration
indexer indexer.Interface
pcache *pcache.ProviderCache
segmentSize int
stats GCStats
statsMutex sync.Mutex
topic string
Expand Down Expand Up @@ -120,6 +122,13 @@ type GCState struct {
LastProcessedAdCid cid.Cid
}

type adInfo struct {
cid cid.Cid
contextID []byte
empty bool
isRm bool
}

func dsContextPrefix(contextID string) string {
return fmt.Sprintf("/ctx/%s/", contextID)
}
Expand Down Expand Up @@ -178,6 +187,7 @@ func New(idxr indexer.Interface, fileStore filestore.Interface, options ...Optio
httpTimeout: opts.httpTimeout,
indexer: idxr,
pcache: opts.pcache,
segmentSize: opts.segmentSize,
topic: opts.topic,
}, nil
}
Expand Down Expand Up @@ -638,16 +648,69 @@ func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error {
}
}()

removedCtxSet := make(map[string]struct{})
remaining := make(map[string][]cid.Cid)
segSize := s.reaper.segmentSize
segment := deque.New[adInfo](segSize, segSize)

for segEnd := gcState.LastProcessedAdCid; segEnd != latestAdCid; {
for adCid := latestAdCid; adCid != segEnd; {
ad, err := s.loadAd(adCid)
if err != nil {
return fmt.Errorf("failed to load advertisement %s: %w", adCid.String(), err)
}
var empty bool
if !ad.IsRm && (ad.Entries == nil || ad.Entries == schema.NoEntries) {
empty = true
}
if segment.Len() == segSize {
segment.PopFront()
}
ai := adInfo{
cid: adCid,
contextID: ad.ContextID,
empty: empty,
isRm: ad.IsRm,
}
segment.PushBack(ai)
if ad.PreviousID == nil {
break
}
adCid = ad.PreviousID.(cidlink.Link).Cid
if adCid == cid.Undef {
// This should never happen, unless the provider has a
// different chain that it did previously.
log.Errorw("Did not find last provessed advertisement in chain", "lastProcessed", segEnd, "chainEnd", adCid)
break
}
}

for adCid := latestAdCid; adCid != gcState.LastProcessedAdCid; {
ad, err := s.loadAd(adCid)
segEnd = segment.Front().cid

err = s.reapSegment(ctx, segment)
if err != nil {
return fmt.Errorf("failed to load advertisement %s: %w", adCid.String(), err)
return err
}
contextID := base64.StdEncoding.EncodeToString(ad.ContextID)
if ad.IsRm {

// Update GC state.
gcState.LastProcessedAdCid = segEnd
if err = s.saveGCState(ctx, gcState); err != nil {
return err
}
}
return nil
}

func (s *scythe) reapSegment(ctx context.Context, segment *deque.Deque[adInfo]) error {
removedCtxSet := make(map[string]struct{})
remaining := make(map[string][]cid.Cid)
segSize := segment.Len()

var err error
for segment.Len() != 0 {
ad := segment.Front()
segment.PopFront()
adCid := ad.cid
contextID := base64.StdEncoding.EncodeToString(ad.contextID)
if ad.isRm {
log.Debugw("Processing removal ad", "adCid", adCid)
s.stats.RemovalAds++
reused, ok := remaining[contextID]
Expand All @@ -667,7 +730,7 @@ func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error {
if err = s.deleteCarFile(ctx, adCid); err != nil {
return err
}
} else if ad.Entries == nil || ad.Entries == schema.NoEntries {
} else if ad.empty {
log.Debugw("Processing no-content ad", "adCid", adCid)
// Delete CAR file of empty ad.
if err = s.deleteCarFile(ctx, adCid); err != nil {
Expand All @@ -688,14 +751,8 @@ func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error {
}
}
s.stats.AdsProcessed++

if ad.PreviousID == nil {
adCid = cid.Undef
} else {
adCid = ad.PreviousID.(cidlink.Link).Cid
}
}
log.Debugw("Done processing advertisements", "processed", s.stats.AdsProcessed, "removed", s.stats.IndexAdsRemoved)
log.Debugw("Done processing advertisements segment", "size", segSize, "processed", s.stats.AdsProcessed, "removed", s.stats.IndexAdsRemoved)

// Record which ads remain undeleted.
if err = s.saveRemaining(ctx, remaining); err != nil {
Expand All @@ -704,9 +761,7 @@ func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error {
s.stats.CtxIDsKept += len(remaining)
s.stats.CtxIDsRemoved += len(removedCtxSet)

// Update GC state.
gcState.LastProcessedAdCid = latestAdCid
return s.saveGCState(ctx, gcState)
return nil
}

func (s *scythe) saveRemoved(ctx context.Context, adCid cid.Cid) error {
Expand Down

0 comments on commit 2989e92

Please sign in to comment.