Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BCFR-899] MaxLogsKept implementation #14574

Merged
merged 15 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/flat-horses-argue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added LogPoller MaxLogsKept feature: recency count-based instead of time based log retention
94 changes: 80 additions & 14 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ type logPoller struct {
// Usually the only way to recover is to manually remove the offending logs and block from the database.
// LogPoller keeps running in infinite loop, so whenever the invalid state is removed from the database it should
// recover automatically without needing to restart the LogPoller.
finalityViolated *atomic.Bool
finalityViolated atomic.Bool
countBasedLogPruningActive atomic.Bool
}

type Opts struct {
Expand Down Expand Up @@ -179,7 +180,6 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracke
clientErrors: opts.ClientErrors,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
finalityViolated: new(atomic.Bool),
}
}

Expand Down Expand Up @@ -217,6 +217,12 @@ func (filter *Filter) Contains(other *Filter) bool {
if other == nil {
return true
}
if other.Retention != filter.Retention {
return false
}
if other.MaxLogsKept != filter.MaxLogsKept {
return false
}
reductionista marked this conversation as resolved.
Show resolved Hide resolved
addresses := make(map[common.Address]interface{})
for _, addr := range filter.Addresses {
addresses[addr] = struct{}{}
Expand Down Expand Up @@ -282,14 +288,17 @@ func (lp *logPoller) RegisterFilter(ctx context.Context, filter Filter) error {
lp.lggr.Warnw("Filter already present, no-op", "name", filter.Name, "filter", filter)
return nil
}
lp.lggr.Warnw("Updating existing filter with more events or addresses", "name", filter.Name, "filter", filter)
lp.lggr.Warnw("Updating existing filter", "name", filter.Name, "filter", filter)
}

if err := lp.orm.InsertFilter(ctx, filter); err != nil {
return pkgerrors.Wrap(err, "error inserting filter")
}
lp.filters[filter.Name] = filter
lp.filterDirty = true
if filter.MaxLogsKept > 0 {
lp.countBasedLogPruningActive.Store(true)
reductionista marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

Expand Down Expand Up @@ -545,18 +554,38 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i
return mathutil.Min(requested, lastProcessed.BlockNumber), nil
}

// loadFilters loads the filters from db, and activates count-based Log Pruning
// if required by any of the filters
func (lp *logPoller) loadFilters(ctx context.Context) error {
filters, err := lp.lockAndLoadFilters(ctx)
if err != nil {
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
}
if lp.countBasedLogPruningActive.Load() {
return nil
}
for _, filter := range filters {
if filter.MaxLogsKept != 0 {
lp.countBasedLogPruningActive.Store(true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, but you can return immediately when setting that to true, right? Don't need to keep iterating

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

return nil
}
}
return nil
}

// lockAndLoadFilters is the part of loadFilters() requiring a filterMu lock
func (lp *logPoller) lockAndLoadFilters(ctx context.Context) (filters map[string]Filter, err error) {
lp.filterMu.Lock()
defer lp.filterMu.Unlock()
filters, err := lp.orm.LoadFilters(ctx)

filters, err = lp.orm.LoadFilters(ctx)
if err != nil {
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
return filters, err
}

lp.filters = filters
lp.filterDirty = true
return nil
return filters, nil
}

// tickStaggeredDelay chooses a uniformly random amount of time to delay between minDelay and minDelay + period
Expand Down Expand Up @@ -665,31 +694,41 @@ func (lp *logPoller) backgroundWorkerRun() {
case <-ctx.Done():
return
case <-blockPruneTick:
lp.lggr.Infow("pruning old blocks")
blockPruneTick = tickWithDefaultJitter(blockPruneInterval)
if allRemoved, err := lp.PruneOldBlocks(ctx); err != nil {
lp.lggr.Errorw("Unable to prune old blocks", "err", err)
lp.lggr.Errorw("unable to prune old blocks", "err", err)
} else if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new blocks
blockPruneTick = tickWithDefaultJitter(blockPruneShortInterval)
lp.lggr.Warnw("reached page limit while pruning old blocks")
} else {
lp.lggr.Debugw("finished pruning old blocks")
}
case <-logPruneTick:
logPruneTick = tickWithDefaultJitter(logPruneInterval)
lp.lggr.Infof("pruning expired logs")
if allRemoved, err := lp.PruneExpiredLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
lp.lggr.Errorw("unable to prune expired logs", "err", err)
} else if !allRemoved {
lp.lggr.Warnw("reached page limit while pruning expired logs")
// Tick faster when cleanup can't keep up with the pace of new logs
logPruneTick = tickWithDefaultJitter(logPruneShortInterval)
} else if successfulExpiredLogPrunes == 20 {
} else if successfulExpiredLogPrunes >= 20 {
reductionista marked this conversation as resolved.
Show resolved Hide resolved
// Only prune unmatched logs if we've successfully pruned all expired logs at least 20 times
// since the last time unmatched logs were pruned
lp.lggr.Infof("finished pruning expired logs: pruning unmatched logs")
if allRemoved, err := lp.PruneUnmatchedLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune unmatched logs", "err", err)
lp.lggr.Errorw("unable to prune unmatched logs", "err", err)
} else if !allRemoved {
lp.lggr.Warnw("reached page limit while pruning unmatched logs")
logPruneTick = tickWithDefaultJitter(logPruneShortInterval)
} else {
lp.lggr.Debugw("finished pruning unmatched logs")
successfulExpiredLogPrunes = 0
}
} else {
lp.lggr.Debugw("finished pruning expired logs")
successfulExpiredLogPrunes++
}
}
Expand Down Expand Up @@ -1097,7 +1136,8 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He
}

// PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block.
// Returns whether all blocks eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true.
// Returns whether all blocks eligible for pruning were removed. If logPrunePageSize is set to 0, then it
// will always return true unless there is an actual error.
func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) {
latestBlock, err := lp.orm.SelectLatestBlock(ctx)
if err != nil {
Expand All @@ -1121,13 +1161,39 @@ func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) {
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

// PruneExpiredLogs logs that are older than their retention period defined in Filter.
// Returns whether all logs eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true.
// PruneExpiredLogs will attempt to remove any logs which have passed their retention period. Returns whether all expired
// logs were removed. If logPrunePageSize is set to 0, it will always return true unless an actual error is encountered
func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) {
done := true

rowsRemoved, err := lp.orm.DeleteExpiredLogs(ctx, lp.logPrunePageSize)
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
if err != nil {
lp.lggr.Errorw("Unable to find excess logs for pruning", "err", err)
return false, err
} else if lp.logPrunePageSize != 0 && rowsRemoved == lp.logPrunePageSize {
done = false
}

if !lp.countBasedLogPruningActive.Load() {
reductionista marked this conversation as resolved.
Show resolved Hide resolved
return done, err
}

rowIDs, err := lp.orm.SelectExcessLogIDs(ctx, lp.logPrunePageSize)
dhaidashenko marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
lp.lggr.Errorw("Unable to find excess logs for pruning", "err", err)
return false, err
}
rowsRemoved, err = lp.orm.DeleteLogsByRowID(ctx, rowIDs)
if err != nil {
lp.lggr.Errorw("Unable to prune excess logs", "err", err)
} else if lp.logPrunePageSize != 0 && rowsRemoved == lp.logPrunePageSize {
done = false
}
return done, err
}

// PruneUnmatchedLogs will attempt to remove any logs which no longer match a registered filter. Returns whether all unmatched
// logs were removed. If logPrunePageSize is set to 0, it will always return true unless an actual error is encountered
func (lp *logPoller) PruneUnmatchedLogs(ctx context.Context) (bool, error) {
ids, err := lp.orm.SelectUnmatchedLogIDs(ctx, lp.logPrunePageSize)
if err != nil {
Expand Down
18 changes: 12 additions & 6 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64)
})
}

func (o *ObservedORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteLogsByRowID", del, func() (int64, error) {
return o.ORM.DeleteLogsByRowID(ctx, rowIDs)
func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) {
return o.ORM.DeleteExpiredLogs(ctx, limit)
})
}

Expand All @@ -148,9 +148,15 @@ func (o *ObservedORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (i
})
}

func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) {
return o.ORM.DeleteExpiredLogs(ctx, limit)
func (o *ObservedORM) SelectExcessLogIDs(ctx context.Context, limit int64) ([]uint64, error) {
return withObservedQueryAndResults[uint64](o, "SelectExcessLogIDs", func() ([]uint64, error) {
return o.ORM.SelectExcessLogIDs(ctx, limit)
})
}

func (o *ObservedORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteLogsByRowID", del, func() (int64, error) {
return o.ORM.DeleteLogsByRowID(ctx, rowIDs)
})
}

Expand Down
Loading
Loading