Skip to content

Commit

Permalink
Split off SelectUnmatchedLogs from DeleteExpiredLogs
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Sep 20, 2024
1 parent 069660e commit 94e15fa
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 5 deletions.
24 changes: 24 additions & 0 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,8 @@ func (lp *logPoller) backgroundWorkerRun() {
blockPruneTick := time.After((5 * time.Minute) + timeutil.JitterPct(1.0).Apply(blockPruneInterval/2))
logPruneTick := time.After((5 * time.Minute) + timeutil.JitterPct(1.0).Apply(logPruneInterval/2))

successfulExpiredLogPrunes := 0

for {
select {
case <-ctx.Done():
Expand All @@ -668,6 +670,18 @@ func (lp *logPoller) backgroundWorkerRun() {
} else if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new logs
logPruneTick = time.After(timeutil.JitterPct(0.1).Apply(logPruneShortInterval))
} else if successfulExpiredLogPrunes == 20 {
// Only prune unmatched logs if we've successfully pruned all expired logs at least 20 times
// since the last time unmatched logs were pruned
if allRemoved, err := lp.PruneUnmatchedLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune unmatched logs", "err", err)
} else if !allRemoved {
logPruneTick = time.After(timeutil.JitterPct(0.1).Apply(logPruneShortInterval))
} else {
successfulExpiredLogPrunes = 0
}
} else {
successfulExpiredLogPrunes += 1

Check failure on line 684 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / lint

increment-decrement: should replace successfulExpiredLogPrunes += 1 with successfulExpiredLogPrunes++ (revive)
}
}
}
Expand Down Expand Up @@ -1101,6 +1115,16 @@ func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) {
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

func (lp *logPoller) PruneUnmatchedLogs(ctx context.Context) (bool, error) {
ids, err := lp.orm.SelectUnmatchedLogIds(ctx, lp.logPrunePageSize)
if err != nil {
return false, err
}
rowsRemoved, err := lp.orm.DeleteLogsByRowId(ctx, ids)

return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

// Logs returns logs matching topics and address (exactly) in the given block range,
// which are canonical at time of query.
func (lp *logPoller) Logs(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address) ([]Log, error) {
Expand Down
12 changes: 12 additions & 0 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64)
})
}

func (o *ObservedORM) DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteLogsByRowId", del, func() (int64, error) {
return o.ORM.DeleteLogsByRowId(ctx, rowIds)
})
}

func (o *ObservedORM) SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error) {
return withObservedQueryAndResults[uint64](o, "SelectUnmatchedLogIds", func() ([]uint64, error) {
return o.ORM.SelectUnmatchedLogIds(ctx, limit)
})
}

func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) {
return o.ORM.DeleteExpiredLogs(ctx, limit)
Expand Down
41 changes: 36 additions & 5 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ type ORM interface {
LoadFilters(ctx context.Context) (map[string]Filter, error)
DeleteFilter(ctx context.Context, name string) error

DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error)
InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64) error
DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error)
DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error
SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error)
DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error)

GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error)
Expand Down Expand Up @@ -365,6 +367,26 @@ type Exp struct {
ShouldDelete bool
}

func (o *DSORM) SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error) {
query := `
SELECT l.id FROM evm.logs l JOIN (
SELECT evm_chain_id, address, event
FROM evm.log_poller_filters
WHERE evm_chain_id = $1
GROUP BY evm_chain_id, address, event
) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event
WHERE l.evm_chain_id = $1 AND r.id IS NULL
`

if limit == 0 {
err = o.ds.SelectContext(ctx, &ids, query, ubig.New(o.chainID))
return ids, err
}
err = o.ds.SelectContext(ctx, &ids, fmt.Sprintf("%s LIMIT %d", query, limit))

return ids, err
}

// DeleteExpiredLogs removes any logs which either:
// - don't match any currently registered filters, or
// - have a timestamp older than any matching filter's retention, UNLESS there is at
Expand All @@ -378,14 +400,14 @@ func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, erro
query := fmt.Sprintf(`
WITH rows_to_delete AS (
SELECT l.id
FROM evm.logs l LEFT JOIN (
SELECT evm_chain_id, address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention
FROM evm.logs l JOIN (
SELECT evm_chain_id, address, event, MAX(retention) AS retention
FROM evm.log_poller_filters
WHERE evm_chain_id = $1
GROUP BY evm_chain_id, address, event
) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event
WHERE l.evm_chain_id = $1 AND -- needed because of LEFT JOIN
r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second'))
HAVING MIN(retention) > 0
) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event AND
l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')
%s
) DELETE FROM evm.logs WHERE id IN (SELECT id FROM rows_to_delete)`, limitClause)
result, err := o.ds.ExecContext(ctx, query, ubig.New(o.chainID))
Expand Down Expand Up @@ -1012,3 +1034,12 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitA

return logs, nil
}

// DeleteLogsByRowId accepts a list of log row id's to delete
func (o *DSORM) DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) {
result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE id = ANY($1)`, rowIds)
if err != nil {
return 0, err
}
return result.RowsAffected()
}

0 comments on commit 94e15fa

Please sign in to comment.