diff --git a/.tool-versions b/.tool-versions index 6cd01e3f913..345e10e2afa 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,4 +1,4 @@ -golang 1.22.5 +golang 1.22.7 mockery 2.43.2 nodejs 20.13.1 pnpm 9.4.0 diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index dd7e0c5242b..b19e3f53c39 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "math/big" + "math/rand" "sort" "strings" "sync" @@ -24,8 +25,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/timeutil" "github.com/smartcontractkit/chainlink-common/pkg/types/query" - "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" @@ -558,6 +559,15 @@ func (lp *logPoller) loadFilters(ctx context.Context) error { return nil } +// tickStaggeredDelay chooses a uniformly random amount of time to delay between minDelay and minDelay + period +func tickStaggeredDelay(minDelay time.Duration, period time.Duration) <-chan time.Time { + return time.After(minDelay + timeutil.JitterPct(1.0).Apply(period/2)) +} + +func tickWithDefaultJitter(interval time.Duration) <-chan time.Time { + return time.After(services.DefaultJitter.Apply(interval)) +} + func (lp *logPoller) run() { defer lp.wg.Done() ctx, cancel := lp.stopCh.NewCtx() @@ -635,31 +645,52 @@ func (lp *logPoller) backgroundWorkerRun() { ctx, cancel := lp.stopCh.NewCtx() defer cancel() + blockPruneShortInterval := lp.pollPeriod * 100 + blockPruneInterval := blockPruneShortInterval * 10 + logPruneShortInterval := lp.pollPeriod * 241 // no common factors with 100 + logPruneInterval := logPruneShortInterval * 10 + // Avoid putting too much pressure on the database by staggering the pruning of old blocks and logs. // Usually, node after restart will have some work to boot the plugins and other services. - // Deferring first prune by minutes reduces risk of putting too much pressure on the database. - blockPruneTick := time.After(5 * time.Minute) - logPruneTick := time.After(10 * time.Minute) + // Deferring first prune by at least 5 mins reduces risk of putting too much pressure on the database. + blockPruneTick := tickStaggeredDelay(5*time.Minute, blockPruneInterval) + logPruneTick := tickStaggeredDelay(5*time.Minute, logPruneInterval) + + // Start initial prune of unmatched logs after 5-15 successful expired log prunes, so that not all chains start + // around the same time. After that, every 20 successful expired log prunes. + successfulExpiredLogPrunes := 5 + rand.Intn(10) //nolint:gosec for { select { case <-ctx.Done(): return case <-blockPruneTick: - blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 1000)) + blockPruneTick = tickWithDefaultJitter(blockPruneInterval) if allRemoved, err := lp.PruneOldBlocks(ctx); err != nil { lp.lggr.Errorw("Unable to prune old blocks", "err", err) } else if !allRemoved { // Tick faster when cleanup can't keep up with the pace of new blocks - blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 100)) + blockPruneTick = tickWithDefaultJitter(blockPruneShortInterval) } case <-logPruneTick: - logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 2401)) // = 7^5 avoids common factors with 1000 + logPruneTick = tickWithDefaultJitter(logPruneInterval) if allRemoved, err := lp.PruneExpiredLogs(ctx); err != nil { lp.lggr.Errorw("Unable to prune expired logs", "err", err) } else if !allRemoved { // Tick faster when cleanup can't keep up with the pace of new logs - logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 241)) + logPruneTick = tickWithDefaultJitter(logPruneShortInterval) + } else if successfulExpiredLogPrunes == 20 { + // Only prune unmatched logs if we've successfully pruned all expired logs at least 20 times + // since the last time unmatched logs were pruned + if allRemoved, err := lp.PruneUnmatchedLogs(ctx); err != nil { + lp.lggr.Errorw("Unable to prune unmatched logs", "err", err) + } else if !allRemoved { + logPruneTick = tickWithDefaultJitter(logPruneShortInterval) + } else { + successfulExpiredLogPrunes = 0 + } + } else { + successfulExpiredLogPrunes++ } } } @@ -1097,6 +1128,16 @@ func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) { return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err } +func (lp *logPoller) PruneUnmatchedLogs(ctx context.Context) (bool, error) { + ids, err := lp.orm.SelectUnmatchedLogIDs(ctx, lp.logPrunePageSize) + if err != nil { + return false, err + } + rowsRemoved, err := lp.orm.DeleteLogsByRowID(ctx, ids) + + return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err +} + // Logs returns logs matching topics and address (exactly) in the given block range, // which are canonical at time of query. func (lp *logPoller) Logs(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address) ([]Log, error) { diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index e0ed0cc4786..59b93fffdaf 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -136,6 +136,18 @@ func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) }) } +func (o *ObservedORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) { + return withObservedExecAndRowsAffected(o, "DeleteLogsByRowID", del, func() (int64, error) { + return o.ORM.DeleteLogsByRowID(ctx, rowIDs) + }) +} + +func (o *ObservedORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) { + return withObservedQueryAndResults[uint64](o, "SelectUnmatchedLogIDs", func() ([]uint64, error) { + return o.ORM.SelectUnmatchedLogIDs(ctx, limit) + }) +} + func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) { return o.ORM.DeleteExpiredLogs(ctx, limit) diff --git a/core/chains/evm/logpoller/observability_test.go b/core/chains/evm/logpoller/observability_test.go index 4ea7adceab0..27b8a3c3225 100644 --- a/core/chains/evm/logpoller/observability_test.go +++ b/core/chains/evm/logpoller/observability_test.go @@ -120,8 +120,8 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { rowsAffected, err := orm.DeleteExpiredLogs(ctx, 3) require.NoError(t, err) - require.Equal(t, int64(3), rowsAffected) - assert.Equal(t, 3, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete")) + require.Equal(t, int64(0), rowsAffected) + assert.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete")) rowsAffected, err = orm.DeleteBlocksBefore(ctx, 30, 0) require.NoError(t, err) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index e40fb80f108..30cd19e0447 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -3,6 +3,7 @@ package logpoller import ( "context" "database/sql" + "errors" "fmt" "math/big" "strings" @@ -33,9 +34,11 @@ type ORM interface { LoadFilters(ctx context.Context) (map[string]Filter, error) DeleteFilter(ctx context.Context, name string) error + DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64) error DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error + SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error) @@ -102,9 +105,9 @@ func (o *DSORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNum if err != nil { return err } - query := `INSERT INTO evm.log_poller_blocks - (evm_chain_id, block_hash, block_number, block_timestamp, finalized_block_number, created_at) - VALUES (:evm_chain_id, :block_hash, :block_number, :block_timestamp, :finalized_block_number, NOW()) + query := `INSERT INTO evm.log_poller_blocks + (evm_chain_id, block_hash, block_number, block_timestamp, finalized_block_number, created_at) + VALUES (:evm_chain_id, :block_hash, :block_number, :block_timestamp, :finalized_block_number, NOW()) ON CONFLICT DO NOTHING` _, err = o.ds.NamedExecContext(ctx, query, args) return err @@ -166,7 +169,7 @@ func (o *DSORM) DeleteFilter(ctx context.Context, name string) error { // LoadFilters returns all filters for this chain func (o *DSORM) LoadFilters(ctx context.Context) (map[string]Filter, error) { query := `SELECT name, - ARRAY_AGG(DISTINCT address)::BYTEA[] AS addresses, + ARRAY_AGG(DISTINCT address)::BYTEA[] AS addresses, ARRAY_AGG(DISTINCT event)::BYTEA[] AS event_sigs, ARRAY_AGG(DISTINCT topic2 ORDER BY topic2) FILTER(WHERE topic2 IS NOT NULL) AS topic2, ARRAY_AGG(DISTINCT topic3 ORDER BY topic3) FILTER(WHERE topic3 IS NOT NULL) AS topic3, @@ -185,9 +188,52 @@ func (o *DSORM) LoadFilters(ctx context.Context) (map[string]Filter, error) { return filters, err } +func blocksQuery(clause string) string { + return fmt.Sprintf(`SELECT %s FROM evm.log_poller_blocks %s`, strings.Join(blocksFields[:], ", "), clause) +} +func logsQuery(clause string) string { + return fmt.Sprintf(`SELECT %s FROM evm.logs %s`, strings.Join(logsFields[:], ", "), clause) +} + +func logsQueryWithTablePrefix(tableAlias string, clause string) string { + var s strings.Builder + for i, field := range logsFields { + if i > 0 { + s.WriteString(", ") + } + s.WriteString(fmt.Sprintf("%s.%s", tableAlias, field)) + } + return fmt.Sprintf(`SELECT %s FROM evm.logs AS %s %s`, s.String(), tableAlias, clause) +} + +func withConfs(query string, tableAlias string, confs evmtypes.Confirmations) string { + var lastConfirmedBlock string + + var tablePrefix string + if tableAlias != "" { + tablePrefix = tableAlias + "." + } + if confs == evmtypes.Finalized { + lastConfirmedBlock = `finalized_block_number` + } else { + lastConfirmedBlock = `block_number - :confs` + } + return fmt.Sprintf(`%s %sblock_number <= ( + SELECT %s + FROM evm.log_poller_blocks + WHERE evm_chain_id = :evm_chain_id + ORDER BY block_number DESC LIMIT 1)`, query, tablePrefix, lastConfirmedBlock) +} + +func logsQueryWithConfs(clause string, confs evmtypes.Confirmations) string { + return withConfs(logsQuery(clause), "", confs) +} + func (o *DSORM) SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPollerBlock, error) { var b LogPollerBlock - if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE block_hash = $1 AND evm_chain_id = $2`, hash.Bytes(), ubig.New(o.chainID)); err != nil { + if err := o.ds.GetContext(ctx, &b, + blocksQuery(`WHERE block_hash = $1 AND evm_chain_id = $2`), + hash.Bytes(), ubig.New(o.chainID)); err != nil { return nil, err } return &b, nil @@ -195,7 +241,9 @@ func (o *DSORM) SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPo func (o *DSORM) SelectBlockByNumber(ctx context.Context, n int64) (*LogPollerBlock, error) { var b LogPollerBlock - if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE block_number = $1 AND evm_chain_id = $2`, n, ubig.New(o.chainID)); err != nil { + if err := o.ds.GetContext(ctx, &b, + blocksQuery(`WHERE block_number = $1 AND evm_chain_id = $2`), n, ubig.New(o.chainID), + ); err != nil { return nil, err } return &b, nil @@ -203,7 +251,9 @@ func (o *DSORM) SelectBlockByNumber(ctx context.Context, n int64) (*LogPollerBlo func (o *DSORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error) { var b LogPollerBlock - if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1`, ubig.New(o.chainID)); err != nil { + if err := o.ds.GetContext(ctx, &b, + blocksQuery(`WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1`), ubig.New(o.chainID), + ); err != nil { return nil, err } return &b, nil @@ -211,7 +261,10 @@ func (o *DSORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error) func (o *DSORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) { var b LogPollerBlock - if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 ORDER BY block_number ASC LIMIT 1`, ubig.New(o.chainID), minAllowedBlockNumber); err != nil { + if err := o.ds.GetContext(ctx, &b, + blocksQuery(`WHERE evm_chain_id = $1 AND block_number >= $2 ORDER BY block_number ASC LIMIT 1`), + ubig.New(o.chainID), minAllowedBlockNumber, + ); err != nil { return nil, err } return &b, nil @@ -224,15 +277,11 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig if err != nil { return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id + query := logsQueryWithConfs( + `WHERE evm_chain_id = :evm_chain_id AND event_sig = :event_sig - AND address = :address - AND block_number <= %s - ORDER BY block_number desc, log_index DESC - LIMIT 1 - `, nestedBlockNumberQuery(confs)) + AND address = :address AND `, confs) + + `ORDER BY block_number desc, log_index DESC LIMIT 1` var l Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -248,28 +297,50 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig // DeleteBlocksBefore delete blocks before and including end. When limit is set, it will delete at most limit blocks. // Otherwise, it will delete all blocks at once. func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) { - if limit > 0 { - result, err := o.ds.ExecContext(ctx, - `DELETE FROM evm.log_poller_blocks - WHERE block_number IN ( - SELECT block_number FROM evm.log_poller_blocks - WHERE block_number <= $1 - AND evm_chain_id = $2 - LIMIT $3 - ) - AND evm_chain_id = $2`, - end, ubig.New(o.chainID), limit) + var result sql.Result + var err error + + if limit == 0 { + result, err = o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks + WHERE block_number <= $1 AND evm_chain_id = $2`, end, ubig.New(o.chainID)) if err != nil { return 0, err } return result.RowsAffected() } - result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks - WHERE block_number <= $1 AND evm_chain_id = $2`, end, ubig.New(o.chainID)) + + var limitBlock int64 + err = o.ds.GetContext(ctx, &limitBlock, `SELECT MIN(block_number) FROM evm.log_poller_blocks + WHERE evm_chain_id = $1`, ubig.New(o.chainID)) if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return 0, nil + } return 0, err } - return result.RowsAffected() + + // Remove up to limit blocks at a time, until we've reached the limit or removed everything eligible for deletion + var deleted, rows int64 + for limitBlock += (limit - 1); deleted < limit; limitBlock += limit { + if limitBlock > end { + limitBlock = end + } + result, err = o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE block_number <= $1 AND evm_chain_id = $2`, limitBlock, ubig.New(o.chainID)) + if err != nil { + return deleted, err + } + + if rows, err = result.RowsAffected(); err != nil { + return deleted, err + } + + deleted += rows + + if limitBlock == end { + break + } + } + return deleted, err } func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { @@ -281,11 +352,11 @@ func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error // If not applied, these queries can become very slow. After some critical number // of logs, Postgres will try to scan all the logs in the index by block_number. // Latency without upper bound filter can be orders of magnitude higher for large number of logs. - _, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks + _, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE evm_chain_id = $1 - AND block_number >= $2 - AND block_number <= (SELECT MAX(block_number) - FROM evm.log_poller_blocks + AND block_number >= $2 + AND block_number <= (SELECT MAX(block_number) + FROM evm.log_poller_blocks WHERE evm_chain_id = $1)`, ubig.New(o.chainID), start) if err != nil { @@ -293,8 +364,8 @@ func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error return err } - _, err = o.ds.ExecContext(ctx, `DELETE FROM evm.logs - WHERE evm_chain_id = $1 + _, err = o.ds.ExecContext(ctx, `DELETE FROM evm.logs + WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= (SELECT MAX(block_number) FROM evm.logs WHERE evm_chain_id = $1)`, ubig.New(o.chainID), start) @@ -314,32 +385,49 @@ type Exp struct { ShouldDelete bool } +func (o *DSORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) { + query := ` + SELECT l.id FROM evm.logs l LEFT JOIN ( + SELECT evm_chain_id, address, event + FROM evm.log_poller_filters + WHERE evm_chain_id = $1 + GROUP BY evm_chain_id, address, event + ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event + WHERE l.evm_chain_id = $1 AND r.evm_chain_id IS NULL + ` + + if limit == 0 { + err = o.ds.SelectContext(ctx, &ids, query, ubig.New(o.chainID)) + return ids, err + } + err = o.ds.SelectContext(ctx, &ids, fmt.Sprintf("%s LIMIT %d", query, limit), ubig.New(o.chainID)) + + return ids, err +} + // DeleteExpiredLogs removes any logs which either: // - don't match any currently registered filters, or // - have a timestamp older than any matching filter's retention, UNLESS there is at // least one matching filter with retention=0 func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { - var err error - var result sql.Result - query := `DELETE FROM evm.logs - WHERE (evm_chain_id, address, event_sig, block_number) IN ( - SELECT l.evm_chain_id, l.address, l.event_sig, l.block_number - FROM evm.logs l - LEFT JOIN ( - SELECT address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention - FROM evm.log_poller_filters - WHERE evm_chain_id = $1 - GROUP BY evm_chain_id, address, event - ) r ON l.address = r.address AND l.event_sig = r.event - WHERE l.evm_chain_id = $1 AND -- Must be WHERE rather than ON due to LEFT JOIN - r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) %s)` - + limitClause := "" if limit > 0 { - result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, "LIMIT $2"), ubig.New(o.chainID), limit) - } else { - result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, ""), ubig.New(o.chainID)) + limitClause = fmt.Sprintf("LIMIT %d", limit) } + query := fmt.Sprintf(` + WITH rows_to_delete AS ( + SELECT l.id + FROM evm.logs l JOIN ( + SELECT evm_chain_id, address, event, MAX(retention) AS retention + FROM evm.log_poller_filters + WHERE evm_chain_id = $1 + GROUP BY evm_chain_id, address, event + HAVING MIN(retention) > 0 + ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event AND + l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second') %s + ) DELETE FROM evm.logs WHERE id IN (SELECT id FROM rows_to_delete)`, limitClause) + result, err := o.ds.ExecContext(ctx, query, ubig.New(o.chainID)) if err != nil { return 0, err } @@ -384,10 +472,10 @@ func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.D end = len(logs) } - query := `INSERT INTO evm.logs - (evm_chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, topics, tx_hash, data, created_at) - VALUES - (:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW()) + query := `INSERT INTO evm.logs + (evm_chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, topics, tx_hash, data, created_at) + VALUES + (:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW()) ON CONFLICT DO NOTHING` _, err := tx.NamedExecContext(ctx, query, logs[start:end]) @@ -422,11 +510,11 @@ func (o *DSORM) SelectLogsByBlockRange(ctx context.Context, start, end int64) ([ return nil, err } - query := `SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND block_number >= :start_block - AND block_number <= :end_block - ORDER BY block_number, log_index` + query := logsQuery(` + WHERE evm_chain_id = :evm_chain_id + AND block_number >= :start_block + AND block_number <= :end_block + ORDER BY block_number, log_index`) var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -451,13 +539,13 @@ func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address common return nil, err } - query := `SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND block_number >= :start_block - AND block_number <= :end_block - ORDER BY block_number, log_index` + query := logsQuery(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND block_number >= :start_block + AND block_number <= :end_block + ORDER BY block_number, log_index`) var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -482,14 +570,12 @@ func (o *DSORM) SelectLogsCreatedAfter(ctx context.Context, address common.Addre return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND block_timestamp > :block_timestamp_after - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs( + `WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND block_timestamp > :block_timestamp_after AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -516,12 +602,12 @@ func (o *DSORM) SelectLogsWithSigs(ctx context.Context, start, end int64, addres return nil, err } - query := `SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = ANY(:event_sig_array) - AND block_number BETWEEN :start_block AND :end_block - ORDER BY block_number, log_index` + query := logsQuery(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = ANY(:event_sig_array) + AND block_number BETWEEN :start_block AND :end_block + ORDER BY block_number, log_index`) query, sqlArgs, err := o.ds.BindNamed(query, args) if err != nil { @@ -544,11 +630,11 @@ func (o *DSORM) GetBlocksRange(ctx context.Context, start int64, end int64) ([]L return nil, err } - query := `SELECT * FROM evm.log_poller_blocks - WHERE block_number >= :start_block + query := blocksQuery(` + WHERE block_number >= :start_block AND block_number <= :end_block AND evm_chain_id = :evm_chain_id - ORDER BY block_number ASC` + ORDER BY block_number ASC`) var blocks []LogPollerBlock query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -575,17 +661,17 @@ func (o *DSORM) SelectLatestLogEventSigsAddrsWithConfs(ctx context.Context, from return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs WHERE (block_number, address, event_sig) IN ( - SELECT MAX(block_number), address, event_sig FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND event_sig = ANY(:event_sig_array) - AND address = ANY(:address_array) - AND block_number > :start_block - AND block_number <= %s - GROUP BY event_sig, address - ) - ORDER BY block_number ASC`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(`WHERE id IN ( + SELECT LAST_VALUE(id) OVER( + PARTITION BY evm_chain_id, address, event_sig + ORDER BY block_number, log_index + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) FROM evm.logs + WHERE evm_chain_id = :evm_chain_id + AND event_sig = ANY(:event_sig_array) + AND address = ANY(:address_array) + AND block_number >= :start_block AND `, confs) + ` + )` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -610,13 +696,12 @@ func (o *DSORM) SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context, if err != nil { return 0, err } - query := fmt.Sprintf(` - SELECT COALESCE(MAX(block_number), 0) FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND event_sig = ANY(:event_sig_array) - AND address = ANY(:address_array) - AND block_number > :start_block - AND block_number <= %s`, nestedBlockNumberQuery(confs)) + + query := withConfs(`SELECT COALESCE(MAX(block_number), 0) FROM evm.logs + WHERE evm_chain_id = :evm_chain_id + AND event_sig = ANY(:event_sig_array) + AND address = ANY(:address_array) + AND block_number >= :start_block AND `, "", confs) var blockNumber int64 query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -641,14 +726,12 @@ func (o *DSORM) SelectLogsDataWordRange(ctx context.Context, address common.Addr return nil, err } - query := fmt.Sprintf(`SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND substring(data from 32*:word_index+1 for 32) >= :word_value_min - AND substring(data from 32*:word_index+1 for 32) <= :word_value_max - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND substring(data from 32*:word_index+1 for 32) >= :word_value_min + AND substring(data from 32*:word_index+1 for 32) <= :word_value_max AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -672,14 +755,12 @@ func (o *DSORM) SelectLogsDataWordGreaterThan(ctx context.Context, address commo return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND substring(data from 32*:word_index+1 for 32) >= :word_value_min - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND substring(data from 32*:word_index+1 for 32) >= :word_value_min AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -703,15 +784,14 @@ func (o *DSORM) SelectLogsDataWordBetween(ctx context.Context, address common.Ad if err != nil { return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND substring(data from 32*:word_index_min+1 for 32) <= :word_value - AND substring(data from 32*:word_index_max+1 for 32) >= :word_value - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND substring(data from 32*:word_index_min+1 for 32) <= :word_value + AND substring(data from 32*:word_index_max+1 for 32) >= :word_value AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -735,14 +815,11 @@ func (o *DSORM) SelectIndexedLogsTopicGreaterThan(ctx context.Context, address c return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] >= :topic_value_min - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] >= :topic_value_min AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -767,15 +844,12 @@ func (o *DSORM) SelectIndexedLogsTopicRange(ctx context.Context, address common. return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] >= :topic_value_min - AND topics[:topic_index] <= :topic_value_max - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] >= :topic_value_min + AND topics[:topic_index] <= :topic_value_max AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -799,14 +873,12 @@ func (o *DSORM) SelectIndexedLogs(ctx context.Context, address common.Address, e return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] = ANY(:topic_values) - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] = ANY(:topic_values) AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -832,14 +904,14 @@ func (o *DSORM) SelectIndexedLogsByBlockRange(ctx context.Context, start, end in return nil, err } - query := `SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] = ANY(:topic_values) - AND block_number >= :start_block - AND block_number <= :end_block - ORDER BY block_number, log_index` + query := logsQuery(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] = ANY(:topic_values) + AND block_number >= :start_block + AND block_number <= :end_block + ORDER BY block_number, log_index`) var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -865,16 +937,13 @@ func (o *DSORM) SelectIndexedLogsCreatedAfter(ctx context.Context, address commo return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] = ANY(:topic_values) - AND block_timestamp > :block_timestamp_after - AND block_number <= %s - ORDER BY block_number, log_index - `, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] = ANY(:topic_values) + AND block_timestamp > :block_timestamp_after AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -898,12 +967,12 @@ func (o *DSORM) SelectIndexedLogsByTxHash(ctx context.Context, address common.Ad return nil, err } - query := `SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND tx_hash = :tx_hash - ORDER BY block_number, log_index` + query := logsQuery(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND tx_hash = :tx_hash + ORDER BY block_number, log_index`) var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -933,25 +1002,22 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si return nil, err } - nestedQuery := nestedBlockNumberQuery(confs) - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :sigA - AND block_number BETWEEN :start_block AND :end_block - AND block_number <= %s - EXCEPT - SELECT a.* FROM evm.logs AS a - INNER JOIN evm.logs B - ON a.evm_chain_id = b.evm_chain_id - AND a.address = b.address - AND a.topics[:topic_index] = b.topics[:topic_index] - AND a.event_sig = :sigA - AND b.event_sig = :sigB - AND b.block_number BETWEEN :start_block AND :end_block - AND b.block_number <= %s - ORDER BY block_number, log_index`, nestedQuery, nestedQuery) + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :sigA + AND block_number BETWEEN :start_block AND :end_block AND `, confs) + + ` EXCEPT ` + + withConfs(logsQueryWithTablePrefix("a", ` + INNER JOIN evm.logs AS b + ON a.evm_chain_id = b.evm_chain_id + AND a.address = b.address + AND a.topics[:topic_index] = b.topics[:topic_index] + AND a.event_sig = :sigA + AND b.event_sig = :sigB + AND b.block_number BETWEEN :start_block AND :end_block + AND `), "b", confs) + + ` ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -989,19 +1055,11 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, lim return logs, nil } -func nestedBlockNumberQuery(confs evmtypes.Confirmations) string { - if confs == evmtypes.Finalized { - return ` - (SELECT finalized_block_number - FROM evm.log_poller_blocks - WHERE evm_chain_id = :evm_chain_id - ORDER BY block_number DESC LIMIT 1) ` - } - // Intentionally wrap with greatest() function and don't return negative block numbers when :confs > :block_number - // It doesn't impact logic of the outer query, because block numbers are never less or equal to 0 (guarded by log_poller_blocks_block_number_check) - return ` - (SELECT greatest(block_number - :confs, 0) - FROM evm.log_poller_blocks - WHERE evm_chain_id = :evm_chain_id - ORDER BY block_number DESC LIMIT 1) ` +// DeleteLogsByRowID accepts a list of log row id's to delete +func (o *DSORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) { + result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE id = ANY($1)`, rowIDs) + if err != nil { + return 0, err + } + return result.RowsAffected() } diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 14f6bbeb6aa..7ebc97bb835 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -321,10 +321,22 @@ func TestORM(t *testing.T) { Data: []byte("hello short retention"), BlockTimestamp: time.Now(), }, + { + EvmChainId: ubig.New(th.ChainID), + LogIndex: 7, + BlockHash: common.HexToHash("0x1239"), + BlockNumber: int64(17), + EventSig: topic, + Topics: [][]byte{topic[:]}, + Address: common.HexToAddress("0x1236"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello2 short retention"), + BlockTimestamp: time.Now(), + }, { EvmChainId: ubig.New(th.ChainID), LogIndex: 8, - BlockHash: common.HexToHash("0x1238"), + BlockHash: common.HexToHash("0x1239"), BlockNumber: int64(17), EventSig: topic2, Topics: [][]byte{topic2[:]}, @@ -365,10 +377,9 @@ func TestORM(t *testing.T) { }, })) - t.Log(latest.BlockNumber) logs, err := o1.SelectLogsByBlockRange(ctx, 1, 17) require.NoError(t, err) - require.Len(t, logs, 8) + require.Len(t, logs, 9) logs, err = o1.SelectLogsByBlockRange(ctx, 10, 10) require.NoError(t, err) @@ -483,18 +494,33 @@ func TestORM(t *testing.T) { require.Equal(t, int64(17), latest.BlockNumber) logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - require.Len(t, logs, 8) + require.Len(t, logs, 9) // Delete expired logs with page limit time.Sleep(2 * time.Millisecond) // just in case we haven't reached the end of the 1ms retention period - deleted, err := o1.DeleteExpiredLogs(ctx, 2) + deleted, err := o1.DeleteExpiredLogs(ctx, 1) require.NoError(t, err) - assert.Equal(t, int64(2), deleted) + assert.Equal(t, int64(1), deleted) // Delete expired logs without page limit deleted, err = o1.DeleteExpiredLogs(ctx, 0) require.NoError(t, err) - assert.Equal(t, int64(2), deleted) + assert.Equal(t, int64(1), deleted) + + // Select unmatched logs with page limit + ids, err := o1.SelectUnmatchedLogIDs(ctx, 2) + require.NoError(t, err) + assert.Len(t, ids, 2) + + // Select unmatched logs without page limit + ids, err = o1.SelectUnmatchedLogIDs(ctx, 0) + require.NoError(t, err) + assert.Len(t, ids, 3) + + // Delete logs by row id + deleted, err = o1.DeleteLogsByRowID(ctx, ids) + require.NoError(t, err) + assert.Equal(t, int64(3), deleted) // Ensure that both of the logs from the second chain are still there logs, err = o2.SelectLogs(ctx, 0, 100, common.HexToAddress("0x1236"), topic2) @@ -506,9 +532,9 @@ func TestORM(t *testing.T) { logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - // It should have retained the log matching filter0 (due to ret=0 meaning permanent retention) as well as all - // 3 logs matching filter12 (ret=1 hour). It should have deleted 3 logs not matching any filter, as well as 1 - // of the 2 logs matching filter1 (ret=1ms)--the one that doesn't also match filter12. + // The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour) + // Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything + // matching filter12 should be kept regardless of what other filters it matches. assert.Len(t, logs, 4) // Delete logs after should delete all logs. @@ -1600,7 +1626,7 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) { events: []common.Hash{event1, event2}, addrs: []common.Address{address1, address2}, confs: 0, - fromBlock: 3, + fromBlock: 4, expectedBlockNumber: 0, }, { diff --git a/core/chains/evm/logpoller/parser.go b/core/chains/evm/logpoller/parser.go index 19302c89192..9dfa00eaf3e 100644 --- a/core/chains/evm/logpoller/parser.go +++ b/core/chains/evm/logpoller/parser.go @@ -13,6 +13,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) @@ -26,6 +27,10 @@ const ( var ( ErrUnexpectedCursorFormat = errors.New("unexpected cursor format") + logsFields = [...]string{"evm_chain_id", "log_index", "block_hash", "block_number", + "address", "event_sig", "topics", "tx_hash", "data", "created_at", "block_timestamp"} + blocksFields = [...]string{"evm_chain_id", "block_hash", "block_number", "block_timestamp", + "finalized_block_number", "created_at"} ) // The parser builds SQL expressions piece by piece for each Accept function call and resets the error and expression @@ -220,7 +225,7 @@ func (v *pgDSLParser) buildQuery(chainID *big.Int, expressions []query.Expressio v.err = nil // build the query string - clauses := []string{"SELECT evm.logs.* FROM evm.logs"} + clauses := []string{logsQuery("")} where, err := v.whereClause(expressions, limiter) if err != nil { diff --git a/core/chains/evm/logpoller/parser_test.go b/core/chains/evm/logpoller/parser_test.go index f37497600bd..8446b3c93ef 100644 --- a/core/chains/evm/logpoller/parser_test.go +++ b/core/chains/evm/logpoller/parser_test.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) @@ -34,7 +35,7 @@ func TestDSLParser(t *testing.T) { result, args, err := parser.buildQuery(chainID, expressions, limiter) require.NoError(t, err) - assert.Equal(t, "SELECT evm.logs.* FROM evm.logs WHERE evm_chain_id = :evm_chain_id ORDER BY "+defaultSort, result) + assert.Equal(t, logsQuery(" WHERE evm_chain_id = :evm_chain_id ORDER BY "+defaultSort), result) assertArgs(t, args, 1) }) @@ -52,15 +53,14 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.CursorLimit("10-5-0x42", query.CursorFollowing, 20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (address = :address_0 AND event_sig = :event_sig_0 " + - "AND block_number <= " + - "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + - "AND (block_number > :cursor_block_number OR (block_number = :cursor_block_number AND log_index > :cursor_log_index)) " + - "ORDER BY block_number ASC, log_index ASC, tx_hash ASC " + - "LIMIT 20" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (address = :address_0 AND event_sig = :event_sig_0 " + + "AND block_number <= " + + "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + + "AND (block_number > :cursor_block_number OR (block_number = :cursor_block_number AND log_index > :cursor_log_index)) " + + "ORDER BY block_number ASC, log_index ASC, tx_hash ASC " + + "LIMIT 20") require.NoError(t, err) assert.Equal(t, expected, result) @@ -80,12 +80,11 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.CountLimit(20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (address = :address_0 AND event_sig = :event_sig_0) " + - "ORDER BY " + defaultSort + " " + - "LIMIT 20" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (address = :address_0 AND event_sig = :event_sig_0) " + + "ORDER BY " + defaultSort + " " + + "LIMIT 20") require.NoError(t, err) assert.Equal(t, expected, result) @@ -102,10 +101,9 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.Limit{}, query.NewSortBySequence(query.Desc)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "ORDER BY block_number DESC, log_index DESC, tx_hash DESC" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "ORDER BY block_number DESC, log_index DESC, tx_hash DESC") require.NoError(t, err) assert.Equal(t, expected, result) @@ -122,10 +120,9 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.Limit{}, query.NewSortByBlock(query.Asc), query.NewSortByTimestamp(query.Desc)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "ORDER BY block_number ASC, block_timestamp DESC" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "ORDER BY block_number ASC, block_timestamp DESC") require.NoError(t, err) assert.Equal(t, expected, result) @@ -147,16 +144,15 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.CursorLimit("10-20-0x42", query.CursorPrevious, 20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (block_timestamp = :block_timestamp_0 " + - "AND tx_hash = :tx_hash_0 " + - "AND block_number != :block_number_0 " + - "AND block_number <= " + - "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + - "AND (block_number < :cursor_block_number OR (block_number = :cursor_block_number AND log_index < :cursor_log_index)) " + - "ORDER BY block_number DESC, log_index DESC, tx_hash DESC LIMIT 20" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (block_timestamp = :block_timestamp_0 " + + "AND tx_hash = :tx_hash_0 " + + "AND block_number != :block_number_0 " + + "AND block_number <= " + + "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + + "AND (block_number < :cursor_block_number OR (block_number = :cursor_block_number AND log_index < :cursor_log_index)) " + + "ORDER BY block_number DESC, log_index DESC, tx_hash DESC LIMIT 20") require.NoError(t, err) assert.Equal(t, expected, result) @@ -175,10 +171,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND block_number <= (SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND block_number <= (SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -194,10 +189,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -213,10 +207,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -243,10 +236,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -268,10 +260,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND topics[:topic_index_0] > :topic_value_0 AND topics[:topic_index_0] < :topic_value_1 ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND topics[:topic_index_0] > :topic_value_0 AND topics[:topic_index_0] < :topic_value_1 ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -304,12 +295,11 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (block_timestamp >= :block_timestamp_0 " + - "AND (tx_hash = :tx_hash_0 " + - "OR block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1))) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (block_timestamp >= :block_timestamp_0 " + + "AND (tx_hash = :tx_hash_0 " + + "OR block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1))) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -353,14 +343,13 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (block_timestamp = :block_timestamp_0 " + - "AND (tx_hash = :tx_hash_0 " + - "OR (block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) " + - "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 " + - "AND substring(data from 32*:word_index_0+1 for 32) <= :word_value_1))) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (block_timestamp = :block_timestamp_0 " + + "AND (tx_hash = :tx_hash_0 " + + "OR (block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) " + + "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 " + + "AND substring(data from 32*:word_index_0+1 for 32) <= :word_value_1))) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) diff --git a/core/scripts/go.mod b/core/scripts/go.mod index b32ae445022..73a8605e008 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -1,6 +1,6 @@ module github.com/smartcontractkit/chainlink/core/scripts -go 1.22.5 +go 1.22.7 // Make sure we're working with the latest chainlink libs replace github.com/smartcontractkit/chainlink/v2 => ../../ diff --git a/core/store/migrate/migrations/0254_log_poller_primary_keys.sql b/core/store/migrate/migrations/0254_log_poller_primary_keys.sql new file mode 100644 index 00000000000..39bee02656c --- /dev/null +++ b/core/store/migrate/migrations/0254_log_poller_primary_keys.sql @@ -0,0 +1,42 @@ +-- +goose Up + +-- Replace (block_hash, log_index, evm_chain_id) primary key on evm.log_poller_blocks with new id column +ALTER TABLE evm.logs DROP CONSTRAINT logs_pkey; +ALTER TABLE evm.logs ADD COLUMN id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY; +-- Replaces previous primary key index, but also useful for accelerating DeleteLogsAndBlocksAfter +-- This also strengthens the uniqueness requirement, ensuring that we can't insert logs for two different blocks +-- with the same block number but different block hashes--something which would corrupt the db +CREATE UNIQUE INDEX idx_logs_chain_block_logindex ON evm.logs (evm_chain_id, block_number, log_index); + +-- Previously used for WHERE evm_chain_id = $1 AND address = $2 AND event_sig = $3 ... ORDER BY block_number, created_at +DROP INDEX IF EXISTS evm.idx_evm_logs_ordered_by_block_and_created_at; +-- Useful for the current form of those queries: WHERE evm_chain_id = $1 AND address = $2 AND event_sig = $3 ... ORDER BY block_number, log_index +-- log_index is not included in this index because it increases the index size by ~ 10% for a likely negligible performance benefit +CREATE INDEX idx_logs_chain_address_event_block_logindex ON evm.logs (evm_chain_id, address, event_sig, block_number); + +-- Replace (block_number, evm_chain_id) primary key on evm.log_poller_blocks with new id column +ALTER TABLE evm.log_poller_blocks DROP CONSTRAINT log_poller_blocks_pkey; +ALTER TABLE evm.log_poller_blocks ADD COLUMN id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY; + +-- Add UNIQUE keyword to (evm_chain_id, block_number DESC) index (and rename for consistency) +-- This index is useful for any query for logs which includies a "confs" param, and for DeleteBlocksBefore, DeleteLogsAndBlocksAfter, etc. +-- Prior to this we also had a separate UNIQUE INDEX for (block_number, evm_chain_id) generated by the primary key defintion. +-- Adding the UNIQUE keyword to this allows us to combine the two indices into a single index, saving on disk space--with the +-- added benefit of making insertions into this index slightly faster (since it can rely on the keys being unique) +DROP INDEX IF EXISTS evm.idx_evm_log_poller_blocks_order_by_block; +CREATE UNIQUE INDEX idx_log_poller_blocks_chain_block ON evm.log_poller_blocks (evm_chain_id, block_number DESC); + +-- +goose Down + +-- revert evm.log_poller_blocks +DROP INDEX IF EXISTS evm.idx_log_poller_blocks_chain_block; +CREATE INDEX idx_evm_log_poller_blocks_order_by_block ON evm.log_poller_blocks (evm_chain_id, block_number DESC); +ALTER TABLE evm.log_poller_blocks DROP COLUMN id; +ALTER TABLE evm.log_poller_blocks ADD PRIMARY KEY (block_number, evm_chain_id); + +-- revert evm.logs +DROP INDEX IF EXISTS evm.idx_logs_chain_address_event_block_logindex; +CREATE INDEX idx_evm_logs_ordered_by_block_and_created_at ON evm.logs (evm_chain_id, address, event_sig, block_number, created_at); +DROP INDEX IF EXISTS evm.idx_logs_chain_block_logindex; +ALTER TABLE evm.logs DROP COLUMN id; +ALTER TABLE evm.logs ADD PRIMARY KEY (block_hash, log_index, evm_chain_id); diff --git a/dashboard-lib/go.mod b/dashboard-lib/go.mod index 0ab2b696dbc..a26c6ab454f 100644 --- a/dashboard-lib/go.mod +++ b/dashboard-lib/go.mod @@ -1,6 +1,6 @@ module github.com/smartcontractkit/chainlink/dashboard-lib -go 1.22.5 +go 1.22.7 require ( github.com/K-Phoen/grabana v0.22.1 diff --git a/go.mod b/go.mod index 73c38b8c2db..f0640e9706f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/smartcontractkit/chainlink/v2 -go 1.22.5 +go 1.22.7 require ( github.com/Depado/ginprom v1.8.0 diff --git a/integration-tests/go.mod b/integration-tests/go.mod index b6dab3a01f9..b6aaaa5cbc0 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -1,6 +1,6 @@ module github.com/smartcontractkit/chainlink/integration-tests -go 1.22.5 +go 1.22.7 // Make sure we're working with the latest chainlink libs replace github.com/smartcontractkit/chainlink/v2 => ../ diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 975b1a13d35..1b2398a80f3 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -1,6 +1,6 @@ module github.com/smartcontractkit/chainlink/load-tests -go 1.22.5 +go 1.22.7 // Make sure we're working with the latest chainlink libs replace github.com/smartcontractkit/chainlink/v2 => ../../