Skip to content

Commit

Permalink
Re-use block-range paging from DeleteBlocksBefore for SelectExcessLogs
Browse files Browse the repository at this point in the history
Also: add block_number >= lower
  • Loading branch information
reductionista committed Sep 30, 2024
1 parent dd6e0fe commit 0b02ede
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 59 deletions.
129 changes: 70 additions & 59 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,23 +295,19 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig
return &l, nil
}

// DeleteBlocksBefore delete blocks before and including end. When limit is set, it will delete at most limit blocks.
// Otherwise, it will delete all blocks at once.
func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) {
var result sql.Result
var err error

// ExecPagedQuery runs a query accepting an upper limit block (end) in a fast paged way. limit is the maximum number
// of results to be returned, but it is also used to break the query up into smaller queries restricted to limit # of blocks.
// The first range of blocks will be from MIN(block_number) to MIN(block_number) + limit. The iterative process ends either once
// the limit on results is reached or block_number = end. The query will never be exeucted on blocks where
// block_number > end, and it will never be executed on block_number = B unless it has also been executed on all
// blocks with block_number < B
func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, query func(lower, upper int64) (int64, error)) (numResults int64, err error) {
if limit == 0 {
result, err = o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks
WHERE block_number <= $1 AND evm_chain_id = $2`, end, ubig.New(o.chainID))
if err != nil {
return 0, err
}
return result.RowsAffected()
return query(0, end)
}

var limitBlock int64
err = o.ds.GetContext(ctx, &limitBlock, `SELECT MIN(block_number) FROM evm.log_poller_blocks
var start int64
err = o.ds.GetContext(ctx, &start, `SELECT MIN(block_number) FROM evm.log_poller_blocks
WHERE evm_chain_id = $1`, ubig.New(o.chainID))
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
Expand All @@ -321,27 +317,35 @@ func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64)
}

// Remove up to limit blocks at a time, until we've reached the limit or removed everything eligible for deletion
var deleted, rows int64
for limitBlock += (limit - 1); deleted < limit; limitBlock += limit {
if limitBlock > end {
limitBlock = end
}
result, err = o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE block_number <= $1 AND evm_chain_id = $2`, limitBlock, ubig.New(o.chainID))
if err != nil {
return deleted, err
for lower, upper := start, start+limit-1; numResults < limit; lower = upper + 1 {
upper = lower + limit - 1
if upper > end {
upper = end
}

if rows, err = result.RowsAffected(); err != nil {
return deleted, err
rows, err2 := query(lower, upper)
if err2 != nil {
return numResults, err
}
numResults += rows

deleted += rows

if limitBlock == end {
if upper == end {
break
}
}
return deleted, err
return numResults, nil
}

// DeleteBlocksBefore delete blocks before and including end. When limit is set, it will delete at most limit blocks.
// Otherwise, it will delete all blocks at once.
func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) {
return o.ExecPagedQuery(ctx, limit, end, func(lower, upper int64) (int64, error) {
result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3`,
ubig.New(o.chainID), lower, upper)
if err != nil {
return 0, err
}
return result.RowsAffected()
})
}

func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error {
Expand Down Expand Up @@ -407,42 +411,49 @@ func (o *DSORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []u
}

// SelectExcessLogIDs finds any logs old enough that MaxLogsKept has been exceeded for every filter they match.
func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (rowIDs []uint64, err error) {
var limitClause string
if limit > 0 {
// We have to count the logs in descending order first, to know which ones to keep. But then reverse the order
// of them all if we want to delete only the oldest {limit} # of logs. Omitting this 2nd ORDER BY is fine if we're
// deleting all prunable logs, but if paging is enabled we don't want to have non-contiguous
// block ranges of logs in the db while waiting for the next page to be pruned
limitClause = fmt.Sprintf(" ORDER BY block_number, log_index LIMIT %d", limit)
}

// Allow SELECT query to run for up to 3 minutes. DELETE query will still have default 10s timeout
selectCtx, cancel := context.WithTimeout(sqlutil.WithoutDefaultTimeout(ctx), 3*time.Minute)
defer cancel()

query := `
WITH filters AS (SELECT name,
func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results []uint64, err error) {
withSubQuery := ` -- Roll up the filter table into 1 row per filter
SELECT name,
ARRAY_AGG(address) AS addresses, ARRAY_AGG(event) AS events,
(ARRAY_AGG(topic2) FILTER(WHERE topic2 IS NOT NULL)) AS topic2,
(ARRAY_AGG(topic3) FILTER(WHERE topic3 IS NOT NULL)) AS topic3,
(ARRAY_AGG(topic4) FILTER(WHERE topic4 IS NOT NULL)) AS topic4,
MAX(max_logs_kept) AS max_logs_kept -- Should all be the same, but just in case use MAX
MAX(max_logs_kept) AS max_logs_kept -- Should all be the same, just need MAX for GROUP BY
FROM evm.log_poller_filters WHERE evm_chain_id=$1
GROUP BY name
) SELECT id FROM (
SELECT l.id, block_number, log_index, max_logs_kept != 0 AND
GROUP BY name`

countLogsSubQuery := ` -- Count logs matching each filter in reverse order, labeling anything after the filter.max_logs_kept'th with old=true
SELECT l.id, block_number, log_index, max_logs_kept != 0 AND
ROW_NUMBER() OVER(PARTITION BY f.name ORDER BY block_number, log_index DESC) > max_logs_kept AS old
FROM filters f JOIN evm.logs l ON l.evm_chain_id=$1 AND
l.address = ANY(f.addresses) AND
l.event_sig = ANY(f.events) AND
(f.topic2 IS NULL OR l.topics[1] = ANY(f.topic2)) AND
(f.topic3 IS NULL OR l.topics[2] = ANY(f.topic3)) AND
(f.topic4 IS NULL OR l.topics[3] = ANY(f.topic4))
) x GROUP BY id, block_number, log_index HAVING BOOL_AND(old)` + limitClause

err = o.ds.SelectContext(selectCtx, &rowIDs, query, ubig.New(o.chainID))
return rowIDs, err
FROM filters f JOIN evm.logs l ON
l.address = ANY(f.addresses) AND
l.event_sig = ANY(f.events) AND
(f.topic2 IS NULL OR l.topics[1] = ANY(f.topic2)) AND
(f.topic3 IS NULL OR l.topics[2] = ANY(f.topic3)) AND
(f.topic4 IS NULL OR l.topics[3] = ANY(f.topic4))
WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3
`

// Return all logs considered "old" by every filter they match
query := fmt.Sprintf(`WITH filters AS ( %s ) SELECT id FROM ( %s ) x GROUP BY id, block_number, log_index HAVING BOOL_AND(old)`,
withSubQuery, countLogsSubQuery)

latestBlock, err := o.SelectLatestBlock(ctx)
if err != nil {
return results, err
}

o.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber, func(lower, upper int64) (int64, error) {
var rowIDs []uint64
err = o.ds.SelectContext(ctx, &rowIDs, query, ubig.New(o.chainID), lower, upper)
if err != nil {
return 0, err
}
results = append(results, rowIDs...)
return int64(len(rowIDs)), err
})

return results, err
}

// DeleteExpiredLogs removes any logs which either:
Expand Down
66 changes: 66 additions & 0 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"bytes"
"context"
"database/sql"
"errors"
"fmt"
"math"
"math/big"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/mock"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -1166,6 +1169,69 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) {
assertion(t, logs, err, startBlock, endBlock)
}

type mockQueryExecutor struct {
mock.Mock
}

func (m *mockQueryExecutor) Exec(lower, upper int64) (int64, error) {
res := m.Called(lower, upper)
return int64(res.Int(0)), res.Error(1)
}

func TestORM_ExecPagedQuery(t *testing.T) {
t.Parallel()
ctx := testutils.Context(t)
lggr := logger.Test(t)
chainID := testutils.NewRandomEVMChainID()
db := pgtest.NewSqlxDB(t)
o := logpoller.NewORM(chainID, db, lggr)

m := mockQueryExecutor{}

queryError := errors.New("some error")
m.On("Exec", int64(0), int64(0)).Return(0, queryError).Once()

// Should handle errors gracefully
_, err := o.ExecPagedQuery(ctx, 0, 0, m.Exec)
assert.ErrorIs(t, err, queryError)

m.On("Exec", int64(0), int64(60)).Return(4, nil).Once()

// Query should only get executed once with limitBlock=end if called with limit=0
numResults, err := o.ExecPagedQuery(ctx, 0, 60, m.Exec)
require.NoError(t, err)
assert.Equal(t, int64(4), numResults)

// Should report actual db errors
_, err = o.ExecPagedQuery(ctx, 300, 1000, m.Exec)
assert.Error(t, err)

o.InsertBlock(ctx, common.HexToHash("0x1234"), 42, time.Now(), 0)

m.On("Exec", mock.Anything, mock.Anything).Return(3, nil)

// Should get called with limitBlock = 342, 642, 942, 1000
numResults, err = o.ExecPagedQuery(ctx, 300, 1000, m.Exec)
require.NoError(t, err)
assert.Equal(t, int64(12), numResults) // 3 results in each of 4 calls
m.AssertNumberOfCalls(t, "Exec", 6) // 4 new calls, plus the prior 2
expectedLimitBlocks := [][]int64{{42, 341}, {342, 641}, {642, 941}, {942, 1000}}
for _, expected := range expectedLimitBlocks {
m.AssertCalled(t, "Exec", expected[0], expected[1])
}

// Should not go all the way to 1000, but stop after ~ 13 results have
// been returned
numResults, err = o.ExecPagedQuery(ctx, 15, 1000, m.Exec)
require.NoError(t, err)
assert.Equal(t, int64(15), numResults)
m.AssertNumberOfCalls(t, "Exec", 11)
expectedLimitBlocks = [][]int64{{42, 56}, {57, 71}, {72, 86}, {87, 101}, {102, 116}} // upper[n] = 42 + 15 * n - 1 for n = 1, 2, 3, 4, 5, lower[n] = upper[n-1] + 1
for _, expected := range expectedLimitBlocks {
m.AssertCalled(t, "Exec", expected[0], expected[1])
}
}

func TestORM_DeleteBlocksBefore(t *testing.T) {
th := SetupTH(t, lpOpts)
o1 := th.ORM
Expand Down

0 comments on commit 0b02ede

Please sign in to comment.