Skip to content

Commit

Permalink
[BCFR-900] Add id column as PRIMARY KEY for evm.logs & evm.log_poller…
Browse files Browse the repository at this point in the history
…_blocks (#14451)

* Add id column as PRIMARY KEY for evm.logs & evm.log_poller_blocks

Also:
   - Add UNIQUE INDEXes to replace previous primary keys (still necessary, both for
     optimizing queries and for enforcing uniqueness constraints)
   - Replace all SELECT *'s with helper functions for selecting all columns
   - Refactor nestedBlockQuery into withConfs, and make a bit more use of it

* Clean up db indexes

Some of the columns in these indexes (such as created_at) are no longer used.
Others were not optimized for the queries we need.

* Fix 2 unrelated bugs I noticed

* Update ExpiredLogs query to use id column

* Update test for fromBlock >= :block_number

Previously it was using fromBlock > :block_number which is inconsistent with the other fromBlocks in queries

* Increase staggering of initial pruning runs

* Update go version 1.22.5 -> 1.22.7

* Update DeleteBlocksBefore query to use block_number index instead of LIMIT

* Split off SelectUnmatchedLogs from DeleteExpiredLogs

* Rename Id -> ID for linter

* Add helper function for tickers

* Stagger initial unmatched logs prune

* Reorganize sql migration script, adding comments to clarify

Also, adds a missing CREATE INDEX line that was left out of the Goose Down section:
   CREATE INDEX idx_evm_log_poller_blocks_order_by_block ON evm_log_poller_blocks (evm_chain_id, block_number DESC);

And removes an extraneous DROP line that was in the Goose Up section:
  DROP INDEX IF EXISTS evm.idx_logs_chain_address_event_block_logindex;

* Remove log_index from idx_logs_chain_block_logindex to save some disk space

* add WHERE evm_chain_id and handle sql.ErrNoRows

* Fix lint
  • Loading branch information
reductionista committed Oct 1, 2024
1 parent 4b97702 commit e96210b
Show file tree
Hide file tree
Showing 14 changed files with 482 additions and 309 deletions.
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
golang 1.22.5
golang 1.22.7
mockery 2.43.2
nodejs 20.13.1
pnpm 9.4.0
Expand Down
57 changes: 49 additions & 8 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"math/big"
"math/rand"
"sort"
"strings"
"sync"
Expand All @@ -24,8 +25,8 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
Expand Down Expand Up @@ -558,6 +559,15 @@ func (lp *logPoller) loadFilters(ctx context.Context) error {
return nil
}

// tickStaggeredDelay chooses a uniformly random amount of time to delay between minDelay and minDelay + period
func tickStaggeredDelay(minDelay time.Duration, period time.Duration) <-chan time.Time {
return time.After(minDelay + timeutil.JitterPct(1.0).Apply(period/2))
}

func tickWithDefaultJitter(interval time.Duration) <-chan time.Time {
return time.After(services.DefaultJitter.Apply(interval))
}

func (lp *logPoller) run() {
defer lp.wg.Done()
ctx, cancel := lp.stopCh.NewCtx()
Expand Down Expand Up @@ -635,31 +645,52 @@ func (lp *logPoller) backgroundWorkerRun() {
ctx, cancel := lp.stopCh.NewCtx()
defer cancel()

blockPruneShortInterval := lp.pollPeriod * 100
blockPruneInterval := blockPruneShortInterval * 10
logPruneShortInterval := lp.pollPeriod * 241 // no common factors with 100
logPruneInterval := logPruneShortInterval * 10

// Avoid putting too much pressure on the database by staggering the pruning of old blocks and logs.
// Usually, node after restart will have some work to boot the plugins and other services.
// Deferring first prune by minutes reduces risk of putting too much pressure on the database.
blockPruneTick := time.After(5 * time.Minute)
logPruneTick := time.After(10 * time.Minute)
// Deferring first prune by at least 5 mins reduces risk of putting too much pressure on the database.
blockPruneTick := tickStaggeredDelay(5*time.Minute, blockPruneInterval)
logPruneTick := tickStaggeredDelay(5*time.Minute, logPruneInterval)

// Start initial prune of unmatched logs after 5-15 successful expired log prunes, so that not all chains start
// around the same time. After that, every 20 successful expired log prunes.
successfulExpiredLogPrunes := 5 + rand.Intn(10) //nolint:gosec

for {
select {
case <-ctx.Done():
return
case <-blockPruneTick:
blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 1000))
blockPruneTick = tickWithDefaultJitter(blockPruneInterval)
if allRemoved, err := lp.PruneOldBlocks(ctx); err != nil {
lp.lggr.Errorw("Unable to prune old blocks", "err", err)
} else if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new blocks
blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 100))
blockPruneTick = tickWithDefaultJitter(blockPruneShortInterval)
}
case <-logPruneTick:
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 2401)) // = 7^5 avoids common factors with 1000
logPruneTick = tickWithDefaultJitter(logPruneInterval)
if allRemoved, err := lp.PruneExpiredLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
} else if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new logs
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 241))
logPruneTick = tickWithDefaultJitter(logPruneShortInterval)
} else if successfulExpiredLogPrunes == 20 {
// Only prune unmatched logs if we've successfully pruned all expired logs at least 20 times
// since the last time unmatched logs were pruned
if allRemoved, err := lp.PruneUnmatchedLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune unmatched logs", "err", err)
} else if !allRemoved {
logPruneTick = tickWithDefaultJitter(logPruneShortInterval)
} else {
successfulExpiredLogPrunes = 0
}
} else {
successfulExpiredLogPrunes++
}
}
}
Expand Down Expand Up @@ -1097,6 +1128,16 @@ func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) {
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

func (lp *logPoller) PruneUnmatchedLogs(ctx context.Context) (bool, error) {
ids, err := lp.orm.SelectUnmatchedLogIDs(ctx, lp.logPrunePageSize)
if err != nil {
return false, err
}
rowsRemoved, err := lp.orm.DeleteLogsByRowID(ctx, ids)

return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

// Logs returns logs matching topics and address (exactly) in the given block range,
// which are canonical at time of query.
func (lp *logPoller) Logs(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address) ([]Log, error) {
Expand Down
12 changes: 12 additions & 0 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64)
})
}

func (o *ObservedORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteLogsByRowID", del, func() (int64, error) {
return o.ORM.DeleteLogsByRowID(ctx, rowIDs)
})
}

func (o *ObservedORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) {
return withObservedQueryAndResults[uint64](o, "SelectUnmatchedLogIDs", func() ([]uint64, error) {
return o.ORM.SelectUnmatchedLogIDs(ctx, limit)
})
}

func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) {
return o.ORM.DeleteExpiredLogs(ctx, limit)
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/logpoller/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) {

rowsAffected, err := orm.DeleteExpiredLogs(ctx, 3)
require.NoError(t, err)
require.Equal(t, int64(3), rowsAffected)
assert.Equal(t, 3, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete"))
require.Equal(t, int64(0), rowsAffected)
assert.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete"))

rowsAffected, err = orm.DeleteBlocksBefore(ctx, 30, 0)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit e96210b

Please sign in to comment.