From 1293f2c1cb7e262e46b3dd5755271f44a42c4264 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Sun, 29 Sep 2024 11:32:31 -0700 Subject: [PATCH] Use ExecPagedQuery for SelectUnmatchedLogIDs Also: add deduping of ids for logs matching multiple filters --- core/chains/evm/logpoller/orm.go | 40 +++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 69070e46f1e..70b4f7774dc 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "math" "math/big" "strings" "time" @@ -325,6 +326,7 @@ func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, quer if err2 != nil { return numResults, err } + numResults += rows if upper == end { @@ -390,21 +392,32 @@ type Exp struct { } func (o *DSORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) { - query := ` - SELECT l.id FROM evm.logs l LEFT JOIN ( + batchLogsSubQuery := `SELECT id, evm_chain_id, address, event_sig FROM evm.logs + WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3` + + query := fmt.Sprintf(` + SELECT l.id FROM (%s) l LEFT JOIN ( SELECT evm_chain_id, address, event FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY evm_chain_id, address, event ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event WHERE l.evm_chain_id = $1 AND r.evm_chain_id IS NULL - ` + `, batchLogsSubQuery) - if limit == 0 { - err = o.ds.SelectContext(ctx, &ids, query, ubig.New(o.chainID)) - return ids, err - } - err = o.ds.SelectContext(ctx, &ids, fmt.Sprintf("%s LIMIT %d", query, limit), ubig.New(o.chainID)) + o.ExecPagedQuery(ctx, limit, math.MaxInt64, func(lower, upper int64) (numResults int64, err2 error) { + var rowIDs []uint64 + err2 = o.ds.SelectContext(ctx, &rowIDs, query, ubig.New(o.chainID), lower, upper) + // dedupe rowIDs before appending them to results + m := make(map[uint64]bool) + for _, id := range rowIDs { + if _, val := m[id]; !val { + m[id] = true + ids = append(ids, id) + } + } + return int64(len(rowIDs)), err2 + }) return ids, err } @@ -448,7 +461,16 @@ func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results [] if err != nil { return 0, err } - results = append(results, rowIDs...) + + // dedupe rowIDs before appending them to results + m := make(map[uint64]bool) + for _, id := range rowIDs { + if _, val := m[id]; !val { + m[id] = true + results = append(results, id) + } + } + return int64(len(rowIDs)), err })