Skip to content

Commit

Permalink
Address remaining PR comments
Browse files Browse the repository at this point in the history
- Remove topics from SelectExcessLogs query
- Early exit from loadFilters
- upper >= end
  • Loading branch information
reductionista committed Oct 11, 2024
1 parent 883e635 commit 41ec2e3
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 12 deletions.
7 changes: 4 additions & 3 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i
// loadFilters loads the filters from db, and activates count-based Log Pruning
// if required by any of the filters
func (lp *logPoller) loadFilters(ctx context.Context) error {
filters, err := lp._loadFilters(ctx)
filters, err := lp.lockAndLoadFilters(ctx)
if err != nil {
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
}
Expand All @@ -565,13 +565,14 @@ func (lp *logPoller) loadFilters(ctx context.Context) error {
for _, filter := range filters {
if filter.MaxLogsKept != 0 {
lp.countBasedLogPruningActive.Store(true)
return nil
}
}
return nil
}

// _loadFilters is the part of loadFilters() requiring a filterMu lock
func (lp *logPoller) _loadFilters(ctx context.Context) (filters map[string]Filter, err error) {
// lockAndLoadFilters is the part of loadFilters() requiring a filterMu lock
func (lp *logPoller) lockAndLoadFilters(ctx context.Context) (filters map[string]Filter, err error) {
lp.filterMu.Lock()
defer lp.filterMu.Unlock()

Expand Down
11 changes: 2 additions & 9 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (r *RangeQueryer[T]) ExecPagedQuery(ctx context.Context, limit, end int64)
}
rowsAffected += rows

if upper == end {
if upper >= end {
break
}
}
Expand Down Expand Up @@ -457,9 +457,6 @@ func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results []
withSubQuery := `
SELECT name,
ARRAY_AGG(address) AS addresses, ARRAY_AGG(event) AS events,
(ARRAY_AGG(topic2) FILTER(WHERE topic2 IS NOT NULL)) AS topic2,
(ARRAY_AGG(topic3) FILTER(WHERE topic3 IS NOT NULL)) AS topic3,
(ARRAY_AGG(topic4) FILTER(WHERE topic4 IS NOT NULL)) AS topic4,
MAX(max_logs_kept) AS max_logs_kept -- Should all be the same, just need MAX for GROUP BY
FROM evm.log_poller_filters WHERE evm_chain_id=$1
GROUP BY name`
Expand All @@ -469,11 +466,7 @@ func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results []
SELECT l.id, block_number, log_index, max_logs_kept != 0 AND
ROW_NUMBER() OVER(PARTITION BY f.name ORDER BY block_number, log_index DESC) > max_logs_kept AS old
FROM filters f JOIN evm.logs l ON
l.address = ANY(f.addresses) AND
l.event_sig = ANY(f.events) AND
(f.topic2 IS NULL OR l.topics[1] = ANY(f.topic2)) AND
(f.topic3 IS NULL OR l.topics[2] = ANY(f.topic3)) AND
(f.topic4 IS NULL OR l.topics[3] = ANY(f.topic4))
l.address = ANY(f.addresses) AND l.event_sig = ANY(f.events)
WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3
`

Expand Down

0 comments on commit 41ec2e3

Please sign in to comment.