Skip to content

Commit

Permalink
Split off SelectUnmatchedLogs from DeleteExpiredLogs
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Sep 21, 2024
1 parent 1ff9472 commit c6c76ae
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 19 deletions.
24 changes: 24 additions & 0 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,8 @@ func (lp *logPoller) backgroundWorkerRun() {
blockPruneTick := time.After((5 * time.Minute) + timeutil.JitterPct(1.0).Apply(blockPruneInterval/2))
logPruneTick := time.After((5 * time.Minute) + timeutil.JitterPct(1.0).Apply(logPruneInterval/2))

successfulExpiredLogPrunes := 0

for {
select {
case <-ctx.Done():
Expand All @@ -668,6 +670,18 @@ func (lp *logPoller) backgroundWorkerRun() {
} else if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new logs
logPruneTick = time.After(timeutil.JitterPct(0.1).Apply(logPruneShortInterval))
} else if successfulExpiredLogPrunes == 20 {
// Only prune unmatched logs if we've successfully pruned all expired logs at least 20 times
// since the last time unmatched logs were pruned
if allRemoved, err := lp.PruneUnmatchedLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune unmatched logs", "err", err)
} else if !allRemoved {
logPruneTick = time.After(timeutil.JitterPct(0.1).Apply(logPruneShortInterval))
} else {
successfulExpiredLogPrunes = 0
}
} else {
successfulExpiredLogPrunes += 1

Check failure on line 684 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / lint

increment-decrement: should replace successfulExpiredLogPrunes += 1 with successfulExpiredLogPrunes++ (revive)
}
}
}
Expand Down Expand Up @@ -1101,6 +1115,16 @@ func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) {
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

func (lp *logPoller) PruneUnmatchedLogs(ctx context.Context) (bool, error) {
ids, err := lp.orm.SelectUnmatchedLogIds(ctx, lp.logPrunePageSize)
if err != nil {
return false, err
}
rowsRemoved, err := lp.orm.DeleteLogsByRowId(ctx, ids)

return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

// Logs returns logs matching topics and address (exactly) in the given block range,
// which are canonical at time of query.
func (lp *logPoller) Logs(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address) ([]Log, error) {
Expand Down
12 changes: 12 additions & 0 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64)
})
}

func (o *ObservedORM) DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteLogsByRowId", del, func() (int64, error) {
return o.ORM.DeleteLogsByRowId(ctx, rowIds)
})
}

func (o *ObservedORM) SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error) {
return withObservedQueryAndResults[uint64](o, "SelectUnmatchedLogIds", func() ([]uint64, error) {
return o.ORM.SelectUnmatchedLogIds(ctx, limit)
})
}

func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) {
return o.ORM.DeleteExpiredLogs(ctx, limit)
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/logpoller/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) {

rowsAffected, err := orm.DeleteExpiredLogs(ctx, 3)
require.NoError(t, err)
require.Equal(t, int64(3), rowsAffected)
assert.Equal(t, 3, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete"))
require.Equal(t, int64(0), rowsAffected)
assert.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete"))

rowsAffected, err = orm.DeleteBlocksBefore(ctx, 30, 0)
require.NoError(t, err)
Expand Down
64 changes: 52 additions & 12 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ type ORM interface {
LoadFilters(ctx context.Context) (map[string]Filter, error)
DeleteFilter(ctx context.Context, name string) error

DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error)
InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64) error
DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error)
DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error
SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error)
DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error)

GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error)
Expand Down Expand Up @@ -291,7 +293,7 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig
return &l, nil
}

// DeleteBlocksBefore delete blocks before and including end. When limit is set, it will delete at most limit blocks.
// DeleteBlocksBefore delete blocks before end. When limit is set, it will delete at most limit blocks.
// Otherwise, it will delete all blocks at once.
func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) {
var result sql.Result
Expand All @@ -306,19 +308,28 @@ func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64)
return result.RowsAffected()
}

var limitBlock int64

err = o.ds.GetContext(ctx, &limitBlock, `SELECT MIN(block_number) FROM evm.log_poller_blocks`)
if err != nil {
return 0, err
}

var deleted int64
// Remove up to limit blocks at a time, until we've reached the limit or removed everything eligible for deletion
for startOffset := limit; deleted < limit; startOffset += limit {
result, err = o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks
WHERE block_number <= $1 AND block_number < MIN(block_number) + $2 AND evm_chain_id = $3`, end, startOffset, ubig.New(o.chainID))
for limitBlock += limit; deleted < limit; limitBlock += limit {
if limitBlock > end {
limitBlock = end
}
result, err = o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE block_number < $1 AND evm_chain_id = $2`, limitBlock, ubig.New(o.chainID))
if err != nil {
return deleted, err
}
if rows, err := result.RowsAffected(); err != nil {
var rows int64
if rows, err = result.RowsAffected(); err != nil {
return deleted, err
} else {
deleted += rows
}
deleted += rows
}
return deleted, err
}
Expand Down Expand Up @@ -365,6 +376,26 @@ type Exp struct {
ShouldDelete bool
}

func (o *DSORM) SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error) {
query := `
SELECT l.id FROM evm.logs l JOIN (
SELECT evm_chain_id, address, event
FROM evm.log_poller_filters
WHERE evm_chain_id = $1
GROUP BY evm_chain_id, address, event
) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event
WHERE l.evm_chain_id = $1 AND r.id IS NULL
`

if limit == 0 {
err = o.ds.SelectContext(ctx, &ids, query, ubig.New(o.chainID))
return ids, err
}
err = o.ds.SelectContext(ctx, &ids, fmt.Sprintf("%s LIMIT %d", query, limit))

return ids, err
}

// DeleteExpiredLogs removes any logs which either:
// - don't match any currently registered filters, or
// - have a timestamp older than any matching filter's retention, UNLESS there is at
Expand All @@ -378,14 +409,14 @@ func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, erro
query := fmt.Sprintf(`
WITH rows_to_delete AS (
SELECT l.id
FROM evm.logs l LEFT JOIN (
SELECT evm_chain_id, address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention
FROM evm.logs l JOIN (
SELECT evm_chain_id, address, event, MAX(retention) AS retention
FROM evm.log_poller_filters
WHERE evm_chain_id = $1
GROUP BY evm_chain_id, address, event
) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event
WHERE l.evm_chain_id = $1 AND -- needed because of LEFT JOIN
r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second'))
HAVING MIN(retention) > 0
) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event AND
l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')
%s
) DELETE FROM evm.logs WHERE id IN (SELECT id FROM rows_to_delete)`, limitClause)
result, err := o.ds.ExecContext(ctx, query, ubig.New(o.chainID))
Expand Down Expand Up @@ -1012,3 +1043,12 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitA

return logs, nil
}

// DeleteLogsByRowId accepts a list of log row id's to delete
func (o *DSORM) DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) {
result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE id = ANY($1)`, rowIds)
if err != nil {
return 0, err
}
return result.RowsAffected()
}
10 changes: 5 additions & 5 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,14 +457,14 @@ func TestORM(t *testing.T) {
time.Sleep(2 * time.Millisecond) // just in case we haven't reached the end of the 1ms retention period
deleted, err := o1.DeleteExpiredLogs(ctx, 0)
require.NoError(t, err)
assert.Equal(t, int64(4), deleted)
assert.Equal(t, int64(1), deleted)

logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber)
require.NoError(t, err)
// It should have retained the log matching filter0 (due to ret=0 meaning permanent retention) as well as all
// 3 logs matching filter12 (ret=1 hour). It should have deleted 3 logs not matching any filter, as well as 1
// of the 2 logs matching filter1 (ret=1ms)--the one that doesn't also match filter12.
assert.Len(t, logs, 4)
// The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour)
// Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything
// matching filter12 should be kept regardless of what other filters it matches.
assert.Len(t, logs, 7)

// Delete logs after should delete all logs.
err = o1.DeleteLogsAndBlocksAfter(ctx, 1)
Expand Down

0 comments on commit c6c76ae

Please sign in to comment.