diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index f4ed52e6dd..ff460c7ce9 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "math" "math/big" "strings" "time" @@ -326,6 +327,7 @@ func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, quer if err2 != nil { return numResults, err } + numResults += rows if upper == end { @@ -391,21 +393,32 @@ type Exp struct { } func (o *DSORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) { - query := ` - SELECT l.id FROM evm.logs l LEFT JOIN ( + batchLogsSubQuery := `SELECT id, evm_chain_id, address, event_sig FROM evm.logs + WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3` + + query := fmt.Sprintf(` + SELECT l.id FROM (%s) l LEFT JOIN ( SELECT evm_chain_id, address, event FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY evm_chain_id, address, event ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event WHERE l.evm_chain_id = $1 AND r.evm_chain_id IS NULL - ` + `, batchLogsSubQuery) - if limit == 0 { - err = o.ds.SelectContext(ctx, &ids, query, ubig.New(o.chainID)) - return ids, err - } - err = o.ds.SelectContext(ctx, &ids, fmt.Sprintf("%s LIMIT %d", query, limit), ubig.New(o.chainID)) + o.ExecPagedQuery(ctx, limit, math.MaxInt64, func(lower, upper int64) (numResults int64, err2 error) { + var rowIDs []uint64 + err2 = o.ds.SelectContext(ctx, &rowIDs, query, ubig.New(o.chainID), lower, upper) + // dedupe rowIDs before appending them to results + m := make(map[uint64]bool) + for _, id := range rowIDs { + if _, val := m[id]; !val { + m[id] = true + ids = append(ids, id) + } + } + return int64(len(rowIDs)), err2 + }) return ids, err } @@ -449,7 +462,16 @@ func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results [] if err != nil { return 0, err } - results = append(results, rowIDs...) + + // dedupe rowIDs before appending them to results + m := make(map[uint64]bool) + for _, id := range rowIDs { + if _, val := m[id]; !val { + m[id] = true + results = append(results, id) + } + } + return int64(len(rowIDs)), err })