Skip to content

Commit

Permalink
Use ExecPagedQuery for SelectUnmatchedLogIDs
Browse files Browse the repository at this point in the history
Also: add deduping of ids for logs matching multiple filters
  • Loading branch information
reductionista committed Sep 30, 2024
1 parent aa13051 commit 688c02d
Showing 1 changed file with 31 additions and 9 deletions.
40 changes: 31 additions & 9 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"math"
"math/big"
"strings"
"time"
Expand Down Expand Up @@ -326,6 +327,7 @@ func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, quer
if err2 != nil {
return numResults, err
}

numResults += rows

if upper == end {
Expand Down Expand Up @@ -391,21 +393,32 @@ type Exp struct {
}

func (o *DSORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) {
query := `
SELECT l.id FROM evm.logs l LEFT JOIN (
batchLogsSubQuery := `SELECT id, evm_chain_id, address, event_sig FROM evm.logs
WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3`

query := fmt.Sprintf(`
SELECT l.id FROM (%s) l LEFT JOIN (
SELECT evm_chain_id, address, event
FROM evm.log_poller_filters
WHERE evm_chain_id = $1
GROUP BY evm_chain_id, address, event
) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event
WHERE l.evm_chain_id = $1 AND r.evm_chain_id IS NULL
`
`, batchLogsSubQuery)

if limit == 0 {
err = o.ds.SelectContext(ctx, &ids, query, ubig.New(o.chainID))
return ids, err
}
err = o.ds.SelectContext(ctx, &ids, fmt.Sprintf("%s LIMIT %d", query, limit), ubig.New(o.chainID))
o.ExecPagedQuery(ctx, limit, math.MaxInt64, func(lower, upper int64) (numResults int64, err2 error) {
var rowIDs []uint64
err2 = o.ds.SelectContext(ctx, &rowIDs, query, ubig.New(o.chainID), lower, upper)
// dedupe rowIDs before appending them to results
m := make(map[uint64]bool)
for _, id := range rowIDs {
if _, val := m[id]; !val {
m[id] = true
ids = append(ids, id)
}
}
return int64(len(rowIDs)), err2
})

return ids, err
}
Expand Down Expand Up @@ -449,7 +462,16 @@ func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results []
if err != nil {
return 0, err
}
results = append(results, rowIDs...)

// dedupe rowIDs before appending them to results
m := make(map[uint64]bool)
for _, id := range rowIDs {
if _, val := m[id]; !val {
m[id] = true
results = append(results, id)
}
}

return int64(len(rowIDs)), err
})

Expand Down

0 comments on commit 688c02d

Please sign in to comment.