From a99e37ff273ac2b8e14103eaeb190c3016ceb847 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Thu, 19 Sep 2024 21:26:27 -0700 Subject: [PATCH] Split off SelectUnmatchedLogs from DeleteExpiredLogs --- core/chains/evm/logpoller/log_poller.go | 24 +++++++++++++ core/chains/evm/logpoller/observability.go | 12 +++++++ core/chains/evm/logpoller/orm.go | 41 +++++++++++++++++++--- 3 files changed, 72 insertions(+), 5 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index d5d96f0090..1b3fb37191 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -649,6 +649,8 @@ func (lp *logPoller) backgroundWorkerRun() { blockPruneTick := time.After((5 * time.Minute) + timeutil.JitterPct(1.0).Apply(blockPruneInterval/2)) logPruneTick := time.After((5 * time.Minute) + timeutil.JitterPct(1.0).Apply(logPruneInterval/2)) + successfulExpiredLogPrunes := 0 + for { select { case <-ctx.Done(): @@ -668,6 +670,18 @@ func (lp *logPoller) backgroundWorkerRun() { } else if !allRemoved { // Tick faster when cleanup can't keep up with the pace of new logs logPruneTick = time.After(timeutil.JitterPct(0.1).Apply(logPruneShortInterval)) + } else if successfulExpiredLogPrunes == 20 { + // Only prune unmatched logs if we've successfully pruned all expired logs at least 20 times + // since the last time unmatched logs were pruned + if allRemoved, err := lp.PruneUnmatchedLogs(ctx); err != nil { + lp.lggr.Errorw("Unable to prune unmatched logs", "err", err) + } else if !allRemoved { + logPruneTick = time.After(timeutil.JitterPct(0.1).Apply(logPruneShortInterval)) + } else { + successfulExpiredLogPrunes = 0 + } + } else { + successfulExpiredLogPrunes += 1 } } } @@ -1101,6 +1115,16 @@ func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) { return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err } +func (lp *logPoller) PruneUnmatchedLogs(ctx context.Context) (bool, error) { + ids, err := lp.orm.SelectUnmatchedLogIds(ctx, lp.logPrunePageSize) + if err != nil { + return false, err + } + rowsRemoved, err := lp.orm.DeleteLogsByRowId(ctx, ids) + + return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err +} + // Logs returns logs matching topics and address (exactly) in the given block range, // which are canonical at time of query. func (lp *logPoller) Logs(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address) ([]Log, error) { diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index 782307e7d0..cfc872d142 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -136,6 +136,18 @@ func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) }) } +func (o *ObservedORM) DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) { + return withObservedExecAndRowsAffected(o, "DeleteLogsByRowId", del, func() (int64, error) { + return o.ORM.DeleteLogsByRowId(ctx, rowIds) + }) +} + +func (o *ObservedORM) SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error) { + return withObservedQueryAndResults[uint64](o, "SelectUnmatchedLogIds", func() ([]uint64, error) { + return o.ORM.SelectUnmatchedLogIds(ctx, limit) + }) +} + func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) { return o.ORM.DeleteExpiredLogs(ctx, limit) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index d8e7ed7406..a5401d8784 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -33,9 +33,11 @@ type ORM interface { LoadFilters(ctx context.Context) (map[string]Filter, error) DeleteFilter(ctx context.Context, name string) error + DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64) error DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error + SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error) @@ -365,6 +367,26 @@ type Exp struct { ShouldDelete bool } +func (o *DSORM) SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error) { + query := ` + SELECT l.id FROM evm.logs l JOIN ( + SELECT evm_chain_id, address, event + FROM evm.log_poller_filters + WHERE evm_chain_id = $1 + GROUP BY evm_chain_id, address, event + ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event + WHERE l.evm_chain_id = $1 AND r.id IS NULL + ` + + if limit == 0 { + err = o.ds.SelectContext(ctx, &ids, query, ubig.New(o.chainID)) + return ids, err + } + err = o.ds.SelectContext(ctx, &ids, fmt.Sprintf("%s LIMIT %d", query, limit)) + + return ids, err +} + // DeleteExpiredLogs removes any logs which either: // - don't match any currently registered filters, or // - have a timestamp older than any matching filter's retention, UNLESS there is at @@ -378,14 +400,14 @@ func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, erro query := fmt.Sprintf(` WITH rows_to_delete AS ( SELECT l.id - FROM evm.logs l LEFT JOIN ( - SELECT evm_chain_id, address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention + FROM evm.logs l JOIN ( + SELECT evm_chain_id, address, event, MAX(retention) AS retention FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY evm_chain_id, address, event - ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event - WHERE l.evm_chain_id = $1 AND -- needed because of LEFT JOIN - r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) + HAVING MIN(retention) > 0 + ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event AND + l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second') %s ) DELETE FROM evm.logs WHERE id IN (SELECT id FROM rows_to_delete)`, limitClause) result, err := o.ds.ExecContext(ctx, query, ubig.New(o.chainID)) @@ -1012,3 +1034,12 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitA return logs, nil } + +// DeleteLogsByRowId accepts a list of log row id's to delete +func (o *DSORM) DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) { + result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE id = ANY($1)`, rowIds) + if err != nil { + return 0, err + } + return result.RowsAffected() +}