Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Aug 13, 2024
1 parent 543ff9d commit f63102b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
14 changes: 10 additions & 4 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,12 +658,14 @@ func (lp *logPoller) backgroundWorkerRun() {
}
case <-logPruneTick:
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 2401)) // = 7^5 avoids common factors with 1000
if allRemoved, err := lp.PruneExpiredLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
} else if !allRemoved {
allRemoved, err := lp.PruneExpiredLogs(ctx)
if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new logs
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 241))
}
if err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
}
}
}
}
Expand Down Expand Up @@ -1093,7 +1095,11 @@ func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) {
// Returns whether all logs eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true.
func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) {
rowsRemoved, err := lp.orm.DeleteExpiredLogs(ctx, lp.logPrunePageSize)
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
if err != nil || rowsRemoved < lp.logPrunePageSize {
return true, err
}
rowsRemoved, err = lp.orm.DeleteExcessLogs(ctx, lp.logPrunePageSize)
return lp.logPrunePageSize == 0 || err != nil || rowsRemoved < lp.logPrunePageSize, err
}

// Logs returns logs matching topics and address (exactly) in the given block range,
Expand Down
15 changes: 8 additions & 7 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/query"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand All @@ -36,6 +37,7 @@ type ORM interface {
DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error)
DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error
DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error)
DeleteExcessLogs(ctx context.Context, limit int64) (int64, error)

GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error)
SelectBlockByNumber(ctx context.Context, blockNumber int64) (*LogPollerBlock, error)
Expand Down Expand Up @@ -351,12 +353,11 @@ func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, erro

// DeleteExcessLogs deletes any logs old enough that MaxLogsKept has been exceeded for every filter they match.
func (o *DSORM) DeleteExcessLogs(ctx context.Context) (int64, error) {
rowIds := struct {
EvmChainId ubig.Big
BlockHash common.Hash
LogIndex uint64
}{}
err := o.ds.GetContext(ctx, &rowIds, `
var rowIds []struct {
BlockNumber uint64
LogIndex uint64
}
err := o.ds.SelectContext(ctx, &rowIds, `
SELECT block_number, log_index FROM (
SELECT max_logs_kept != 0 AND ROW_NUMBER() OVER(PARTITION BY f.id ORDER BY block_number, log_index DESC) > max_logs_kept AS old, block_number, log_index
FROM evm.log_poller_filters f JOIN evm.logs l
Expand All @@ -366,7 +367,7 @@ func (o *DSORM) DeleteExcessLogs(ctx context.Context) (int64, error) {
return 0, err
}

result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE block_hash=:block_hash AND log_index=:log_index AND evm_chain_id=5`, rowIds)
result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE evm_chain_id = :evm_chain_id AND (block_number, log_index) IN rowIds`, rowIds)

if err != nil {
return 0, err
Expand Down

0 comments on commit f63102b

Please sign in to comment.