diff --git a/.changeset/long-rocks-jog.md b/.changeset/long-rocks-jog.md new file mode 100644 index 00000000000..9c01be199d4 --- /dev/null +++ b/.changeset/long-rocks-jog.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +LogPoller refactored to use fine grained topic filtering while requesting logs from rpc server diff --git a/core/chains/evm/client/simulated_backend_client.go b/core/chains/evm/client/simulated_backend_client.go index 9fe2ff88ba7..4a4b3d4097b 100644 --- a/core/chains/evm/client/simulated_backend_client.go +++ b/core/chains/evm/client/simulated_backend_client.go @@ -186,16 +186,8 @@ func (c *SimulatedBackendClient) blockNumber(number interface{}) (blockNumber *b } return blockNumber, nil } - case *big.Int: - if n == nil { - return nil, nil - } - if n.Sign() < 0 { - return nil, fmt.Errorf("block number must be non-negative") - } - return n, nil default: - return nil, fmt.Errorf("invalid type %T for block number, must be string or *big.Int", n) + return nil, fmt.Errorf("invalid type %T for block number, must be string", n) } } @@ -613,7 +605,7 @@ func (c *SimulatedBackendClient) ethEstimateGas(ctx context.Context, result inte _, err := c.blockNumber(args[1]) if err != nil { - return fmt.Errorf("SimulatedBackendClient expected second arg to be the string 'latest' or a *big.Int for eth_call, got: %T", args[1]) + return fmt.Errorf("SimulatedBackendClient expected second arg to be the string 'latest' or a hexadecimal string for eth_estimateGas, got: %T", args[1]) } resp, err := c.b.EstimateGas(ctx, toCallMsg(params)) @@ -644,7 +636,7 @@ func (c *SimulatedBackendClient) ethCall(ctx context.Context, result interface{} } if _, err := c.blockNumber(args[1]); err != nil { - return fmt.Errorf("SimulatedBackendClient expected second arg to be the string 'latest' or a *big.Int for eth_call, got: %T", args[1]) + return fmt.Errorf("SimulatedBackendClient expected second arg to be the string 'latest' or a hexadecimal string for eth_call, got: %T", args[1]) } resp, err := c.b.CallContract(ctx, toCallMsg(params), nil /* always latest block on simulated backend */) @@ -733,25 +725,12 @@ func (c *SimulatedBackendClient) ethGetLogs(ctx context.Context, result interfac } } - if a, ok := params["addresses"]; ok { + if a, ok := params["address"]; ok { addresses = a.([]common.Address) } if t, ok := params["topics"]; ok { - tt := t.([][]common.Hash) - lastTopic := len(tt) - 1 - for lastTopic >= 0 { - if tt[lastTopic] != nil { - break - } - lastTopic-- - } - // lastTopic is the topic index of the last non-nil topic slice - // We have to drop any nil values in the topics slice after that due to a quirk in FilterLogs(), - // which will only use nil as a wildcard if there are non-nil values after it in the slice - for i := 0; i < lastTopic; i++ { - topics = append(topics, tt[i]) - } + topics = t.([][]common.Hash) } query := ethereum.FilterQuery{ diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index de2a182bbce..07018b6267b 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -5,10 +5,11 @@ import ( "context" "database/sql" "encoding/binary" + "encoding/hex" "errors" "fmt" "math/big" - "sort" + "slices" "strings" "sync" "sync/atomic" @@ -19,7 +20,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" - pkgerrors "github.com/pkg/errors" "golang.org/x/exp/maps" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -66,6 +66,72 @@ type LogPoller interface { LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations) ([]Log, error) } +// GetLogsBatchElem hides away all the interface casting, so the fields can be accessed more easily, and with type safety +type GetLogsBatchElem rpc.BatchElem + +func NewGetLogsReq(filter Filter) *GetLogsBatchElem { + topics := make2DTopics(filter.EventSigs, filter.Topic2, filter.Topic3, filter.Topic4) + + params := map[string]interface{}{ + "address": []common.Address(filter.Addresses), + "topics": topics, + } + + return &GetLogsBatchElem{ + Method: "eth_getLogs", + Args: []interface{}{params}, + Result: new([]types.Log), + } +} + +func (e GetLogsBatchElem) params() map[string]interface{} { + return e.Args[0].(map[string]interface{}) +} + +func (e GetLogsBatchElem) Addresses() []common.Address { + return e.params()["address"].([]common.Address) +} + +func (e GetLogsBatchElem) SetAddresses(addresses []common.Address) { + e.params()["address"] = addresses +} + +func (e GetLogsBatchElem) Topics() [][]common.Hash { + return e.params()["topics"].([][]common.Hash) +} + +func (e GetLogsBatchElem) SetTopics(topics [][]common.Hash) { + e.params()["topics"] = topics +} + +func (e GetLogsBatchElem) FromBlock() *big.Int { + fromBlock, ok := e.params()["fromBlock"].(*big.Int) + if !ok { + return nil + } + return fromBlock +} + +func (e GetLogsBatchElem) ToBlock() *big.Int { + toBlock, ok := e.params()["fromBlock"].(*big.Int) + if !ok { + return nil + } + return toBlock +} + +func (e GetLogsBatchElem) BlockHash() *common.Hash { + blockHash, ok := e.params()["blockHash"].(*common.Hash) + if !ok { + return nil + } + return blockHash +} + +func (e GetLogsBatchElem) SetFromBlock(fromBlock *big.Int) { + e.params()["fromBlock"] = fromBlock +} + type Confirmations int const ( @@ -77,7 +143,7 @@ type LogPollerTest interface { LogPoller PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) BackupPollAndSaveLogs(ctx context.Context) - Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery + EthGetLogsReqs(fromBlock, toBlock *big.Int, blockHash *common.Hash) []GetLogsBatchElem GetReplayFromBlock(ctx context.Context, requested int64) (int64, error) PruneOldBlocks(ctx context.Context) (bool, error) } @@ -92,32 +158,32 @@ type Client interface { var ( _ LogPollerTest = &logPoller{} - ErrReplayRequestAborted = pkgerrors.New("aborted, replay request cancelled") - ErrReplayInProgress = pkgerrors.New("replay request cancelled, but replay is already in progress") - ErrLogPollerShutdown = pkgerrors.New("replay aborted due to log poller shutdown") - ErrFinalityViolated = pkgerrors.New("finality violated") + ErrReplayRequestAborted = errors.New("aborted, replay request cancelled") + ErrReplayInProgress = errors.New("replay request cancelled, but replay is already in progress") + ErrLogPollerShutdown = errors.New("replay aborted due to log poller shutdown") + ErrFinalityViolated = errors.New("finality violated") ) type logPoller struct { services.StateMachine - ec Client - orm ORM - lggr logger.SugaredLogger - pollPeriod time.Duration // poll period set by block production rate - useFinalityTag bool // indicates whether logPoller should use chain's finality or pick a fixed depth for finality - finalityDepth int64 // finality depth is taken to mean that block (head - finality) is finalized. If `useFinalityTag` is set to true, this value is ignored, because finalityDepth is fetched from chain - keepFinalizedBlocksDepth int64 // the number of blocks behind the last finalized block we keep in database - backfillBatchSize int64 // batch size to use when backfilling finalized logs - rpcBatchSize int64 // batch size to use for fallback RPC calls made in GetBlocks - logPrunePageSize int64 - backupPollerNextBlock int64 // next block to be processed by Backup LogPoller - backupPollerBlockDelay int64 // how far behind regular LogPoller should BackupLogPoller run. 0 = disabled - - filterMu sync.RWMutex - filters map[string]Filter - filterDirty bool - cachedAddresses []common.Address - cachedEventSigs []common.Hash + ec Client + orm ORM + lggr logger.SugaredLogger + pollPeriod time.Duration // poll period set by block production rate + useFinalityTag bool // indicates whether logPoller should use chain's finality or pick a fixed depth for finality + finalityDepth int64 // finality depth is taken to mean that block (head - finality) is finalized. If `useFinalityTag` is set to true, this value is ignored, because finalityDepth is fetched from chain + keepFinalizedBlocksDepth int64 // the number of blocks behind the last finalized block we keep in database + backfillBatchSize int64 // batch size to use when backfilling finalized logs + rpcBatchSize int64 // batch size to use for fallback RPC calls made in GetBlocks + logPrunePageSize int64 + backupPollerNextBlock int64 // next block to be processed by Backup LogPoller + backupPollerBlockDelay int64 // how far behind regular LogPoller should BackupLogPoller run. 0 = disabled + filtersMu sync.RWMutex + filters map[string]Filter + newFilters map[string]struct{} // Set of filter names which have been added since cached reqs indices were last rebuilt + removedFilters []Filter // Slice of filters which have been removed or replaced since cached reqs indices were last rebuilt + cachedReqsByAddress map[common.Address][]*GetLogsBatchElem // Index of cached GetLogs requests, by contract address + cachedReqsByEventsTopicsKey map[string]*GetLogsBatchElem // Index of cached GetLogs requests, by eventTopicsKey replayStart chan int64 replayComplete chan error @@ -156,24 +222,26 @@ type Opts struct { func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, opts Opts) *logPoller { ctx, cancel := context.WithCancel(context.Background()) return &logPoller{ - ctx: ctx, - cancel: cancel, - ec: ec, - orm: orm, - lggr: logger.Sugared(logger.Named(lggr, "LogPoller")), - replayStart: make(chan int64), - replayComplete: make(chan error), - pollPeriod: opts.PollPeriod, - backupPollerBlockDelay: opts.BackupPollerBlockDelay, - finalityDepth: opts.FinalityDepth, - useFinalityTag: opts.UseFinalityTag, - backfillBatchSize: opts.BackfillBatchSize, - rpcBatchSize: opts.RpcBatchSize, - keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth, - logPrunePageSize: opts.LogPrunePageSize, - filters: make(map[string]Filter), - filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet. - finalityViolated: new(atomic.Bool), + ctx: ctx, + cancel: cancel, + ec: ec, + orm: orm, + lggr: logger.Sugared(logger.Named(lggr, "LogPoller")), + replayStart: make(chan int64), + replayComplete: make(chan error), + pollPeriod: opts.PollPeriod, + backupPollerBlockDelay: opts.BackupPollerBlockDelay, + finalityDepth: opts.FinalityDepth, + useFinalityTag: opts.UseFinalityTag, + backfillBatchSize: opts.BackfillBatchSize, + rpcBatchSize: opts.RpcBatchSize, + keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth, + logPrunePageSize: opts.LogPrunePageSize, + cachedReqsByAddress: make(map[common.Address][]*GetLogsBatchElem), + cachedReqsByEventsTopicsKey: make(map[string]*GetLogsBatchElem), + filters: make(map[string]Filter), + newFilters: make(map[string]struct{}), + finalityViolated: new(atomic.Bool), } } @@ -211,26 +279,49 @@ func (filter *Filter) Contains(other *Filter) bool { if other == nil { return true } + addresses := make(map[common.Address]interface{}) for _, addr := range filter.Addresses { addresses[addr] = struct{}{} } - events := make(map[common.Hash]interface{}) - for _, ev := range filter.EventSigs { - events[ev] = struct{}{} - } - for _, addr := range other.Addresses { if _, ok := addresses[addr]; !ok { return false } } - for _, ev := range other.EventSigs { - if _, ok := events[ev]; !ok { - return false + + return isTopicsSubset( + make2DTopics(other.EventSigs, other.Topic2, other.Topic3, other.Topic4), + make2DTopics(filter.EventSigs, filter.Topic2, filter.Topic3, filter.Topic4), + ) +} + +type BytesRepresentable interface { + Bytes() []byte +} + +// sortByteArrays can sort a slice of byte arrays (eg common.Address or common.Hash) +// by comparing bytes. It will also remove any duplicate entries found, and +// ensure that what's returned is a copy rather than the original +func sortDeDupByteArrays[T BytesRepresentable](vals []T) (sorted []T) { + if len(vals) <= 1 { + copy(sorted, vals) + return vals + } + + slices.SortStableFunc(vals, func(b1, b2 T) int { + return bytes.Compare( + b1.Bytes(), + b2.Bytes()) + }) + + res := []T{vals[0]} + for _, val := range vals { // de-dupe + if !bytes.Equal(val.Bytes(), res[len(res)-1].Bytes()) { + res = append(res, val) } } - return true + return res } // RegisterFilter adds the provided EventSigs and Addresses to the log poller's log filter query. @@ -250,25 +341,32 @@ func (filter *Filter) Contains(other *Filter) bool { // Warnings/debug information is keyed by filter name. func (lp *logPoller) RegisterFilter(ctx context.Context, filter Filter) error { if len(filter.Addresses) == 0 { - return pkgerrors.Errorf("at least one address must be specified") + return fmt.Errorf("at least one address must be specified") } if len(filter.EventSigs) == 0 { - return pkgerrors.Errorf("at least one event must be specified") + return fmt.Errorf("at least one event must be specified") } for _, eventSig := range filter.EventSigs { if eventSig == [common.HashLength]byte{} { - return pkgerrors.Errorf("empty event sig") + return fmt.Errorf("empty event sig") } } for _, addr := range filter.Addresses { if addr == [common.AddressLength]byte{} { - return pkgerrors.Errorf("empty address") + return fmt.Errorf("empty address") } } - lp.filterMu.Lock() - defer lp.filterMu.Unlock() + // Sort all of these, to speed up comparisons between topics & addresses of different filters + filter.Addresses = sortDeDupByteArrays(filter.Addresses) + filter.EventSigs = sortDeDupByteArrays(filter.EventSigs) + filter.Topic2 = sortDeDupByteArrays(filter.Topic2) + filter.Topic3 = sortDeDupByteArrays(filter.Topic3) + filter.Topic4 = sortDeDupByteArrays(filter.Topic4) + + lp.filtersMu.Lock() + defer lp.filtersMu.Unlock() if existingFilter, ok := lp.filters[filter.Name]; ok { if existingFilter.Contains(&filter) { @@ -277,13 +375,16 @@ func (lp *logPoller) RegisterFilter(ctx context.Context, filter Filter) error { return nil } lp.lggr.Warnw("Updating existing filter with more events or addresses", "name", filter.Name, "filter", filter) + lp.removedFilters = append(lp.removedFilters, existingFilter) } if err := lp.orm.InsertFilter(ctx, filter); err != nil { - return pkgerrors.Wrap(err, "error inserting filter") + return fmt.Errorf("error inserting filter: %w", err) } lp.filters[filter.Name] = filter - lp.filterDirty = true + lp.newFilters[filter.Name] = struct{}{} + + lp.lggr.Debugw("RegisterFilter: registered new filter", "name", filter.Name, "addresses", filter.Addresses, "eventSigs", filter.EventSigs) return nil } @@ -291,8 +392,8 @@ func (lp *logPoller) RegisterFilter(ctx context.Context, filter Filter) error { // If the name does not exist, it will log an error but not return an error. // Warnings/debug information is keyed by filter name. func (lp *logPoller) UnregisterFilter(ctx context.Context, name string) error { - lp.filterMu.Lock() - defer lp.filterMu.Unlock() + lp.filtersMu.Lock() + defer lp.filtersMu.Unlock() _, ok := lp.filters[name] if !ok { @@ -301,17 +402,19 @@ func (lp *logPoller) UnregisterFilter(ctx context.Context, name string) error { } if err := lp.orm.DeleteFilter(ctx, name); err != nil { - return pkgerrors.Wrap(err, "error deleting filter") + return fmt.Errorf("error deleting filter: %w", err) } + + lp.removedFilters = append(lp.removedFilters, lp.filters[name]) delete(lp.filters, name) - lp.filterDirty = true + return nil } // HasFilter returns true if the log poller has an active filter with the given name. func (lp *logPoller) HasFilter(name string) bool { - lp.filterMu.RLock() - defer lp.filterMu.RUnlock() + lp.filtersMu.RLock() + defer lp.filtersMu.RUnlock() _, ok := lp.filters[name] return ok @@ -346,52 +449,6 @@ func (lp *logPoller) GetFilters() map[string]Filter { return filters } -func (lp *logPoller) Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery { - lp.filterMu.Lock() - defer lp.filterMu.Unlock() - if !lp.filterDirty { - return ethereum.FilterQuery{FromBlock: from, ToBlock: to, BlockHash: bh, Topics: [][]common.Hash{lp.cachedEventSigs}, Addresses: lp.cachedAddresses} - } - var ( - addresses []common.Address - eventSigs []common.Hash - addressMp = make(map[common.Address]struct{}) - eventSigMp = make(map[common.Hash]struct{}) - ) - // Merge filters. - for _, filter := range lp.filters { - for _, addr := range filter.Addresses { - addressMp[addr] = struct{}{} - } - for _, eventSig := range filter.EventSigs { - eventSigMp[eventSig] = struct{}{} - } - } - for addr := range addressMp { - addresses = append(addresses, addr) - } - sort.Slice(addresses, func(i, j int) bool { - return bytes.Compare(addresses[i][:], addresses[j][:]) < 0 - }) - for eventSig := range eventSigMp { - eventSigs = append(eventSigs, eventSig) - } - sort.Slice(eventSigs, func(i, j int) bool { - return bytes.Compare(eventSigs[i][:], eventSigs[j][:]) < 0 - }) - if len(eventSigs) == 0 && len(addresses) == 0 { - // If no filter specified, ignore everything. - // This allows us to keep the log poller up and running with no filters present (e.g. no jobs on the node), - // then as jobs are added dynamically start using their filters. - addresses = []common.Address{common.HexToAddress("0x0000000000000000000000000000000000000000")} - eventSigs = []common.Hash{} - } - lp.cachedAddresses = addresses - lp.cachedEventSigs = eventSigs - lp.filterDirty = false - return ethereum.FilterQuery{FromBlock: from, ToBlock: to, BlockHash: bh, Topics: [][]common.Hash{eventSigs}, Addresses: addresses} -} - // Replay signals that the poller should resume from a new block. // Blocks until the replay is complete. // Replay can be used to ensure that filter modification has been applied for all blocks from "fromBlock" up to latest. @@ -405,13 +462,13 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) (err error) { } }() - lp.lggr.Debugf("Replaying from block %d", fromBlock) + lp.lggr.Debugf("Replaying from block", fromBlock) latest, err := lp.ec.HeadByNumber(ctx, nil) if err != nil { return err } if fromBlock < 1 || fromBlock > latest.Number { - return pkgerrors.Errorf("Invalid replay block number %v, acceptable range [1, %v]", fromBlock, latest.Number) + return fmt.Errorf("Invalid replay block number %v, acceptable range [1, %v]", fromBlock, latest.Number) } // Backfill all logs up to the latest saved finalized block outside the LogPoller's main loop. @@ -438,7 +495,7 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) (err error) { select { case lp.replayStart <- fromBlock: case <-ctx.Done(): - return pkgerrors.Wrap(ErrReplayRequestAborted, ctx.Err().Error()) + return fmt.Errorf("%w: %w", ErrReplayRequestAborted, ctx.Err()) } // Block until replay complete or cancelled. select { @@ -524,7 +581,7 @@ func (lp *logPoller) HealthReport() map[string]error { func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (int64, error) { lastProcessed, err := lp.orm.SelectLatestBlock(ctx) if err != nil { - if !pkgerrors.Is(err, sql.ErrNoRows) { + if !errors.Is(err, sql.ErrNoRows) { // Real DB error return 0, err } @@ -538,16 +595,19 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i } func (lp *logPoller) loadFilters() error { - lp.filterMu.Lock() - defer lp.filterMu.Unlock() + lp.filtersMu.Lock() + defer lp.filtersMu.Unlock() filters, err := lp.orm.LoadFilters(lp.ctx) if err != nil { - return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying") + return fmt.Errorf("Failed to load initial filters from db, retrying: %w", err) + } + + for name, filter := range filters { + lp.filters[name] = filter + lp.newFilters[name] = struct{}{} } - lp.filters = filters - lp.filterDirty = true return nil } @@ -578,7 +638,7 @@ func (lp *logPoller) run() { var start int64 lastProcessed, err := lp.orm.SelectLatestBlock(lp.ctx) if err != nil { - if !pkgerrors.Is(err, sql.ErrNoRows) { + if !errors.Is(err, sql.ErrNoRows) { // Assume transient db reading issue, retry forever. lp.lggr.Errorw("unable to get starting block", "err", err) continue @@ -692,7 +752,7 @@ func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) { if lp.backupPollerNextBlock == 0 { lastProcessed, err := lp.orm.SelectLatestBlock(ctx) if err != nil { - if pkgerrors.Is(err, sql.ErrNoRows) { + if errors.Is(err, sql.ErrNoRows) { lp.lggr.Warnw("Backup log poller ran before first successful log poller run, skipping") } else { lp.lggr.Errorw("Backup log poller unable to get starting block", "err", err) @@ -789,10 +849,10 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { batchSize := lp.backfillBatchSize for from := start; from <= end; from += batchSize { to := mathutil.Min(from+batchSize-1, end) - gethLogs, err := lp.ec.FilterLogs(ctx, lp.Filter(big.NewInt(from), big.NewInt(to), nil)) + gethLogs, err := lp.batchFetchLogs(ctx, big.NewInt(from), big.NewInt(to), nil) if err != nil { var rpcErr client.JsonError - if pkgerrors.As(err, &rpcErr) { + if errors.As(err, &rpcErr) { if rpcErr.Code != jsonRpcLimitExceeded { lp.lggr.Errorw("Unable to query for logs", "err", err, "from", from, "to", to) return err @@ -851,20 +911,21 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren // Additional sanity checks, don't necessarily trust the RPC. if currentBlock == nil { lp.lggr.Errorf("Unexpected nil block from RPC", "currentBlockNumber", currentBlockNumber) - return nil, pkgerrors.Errorf("Got nil block for %d", currentBlockNumber) + return nil, fmt.Errorf("Got nil block for %d", currentBlockNumber) } if currentBlock.Number != currentBlockNumber { lp.lggr.Warnw("Unable to get currentBlock, rpc returned incorrect block", "currentBlockNumber", currentBlockNumber, "got", currentBlock.Number) - return nil, pkgerrors.Errorf("Block mismatch have %d want %d", currentBlock.Number, currentBlockNumber) + return nil, fmt.Errorf("Block mismatch have %d want %d", currentBlock.Number, currentBlockNumber) } } // Does this currentBlock point to the same parent that we have saved? // If not, there was a reorg, so we need to rewind. + expectedParent, err1 := lp.orm.SelectBlockByNumber(ctx, currentBlockNumber-1) - if err1 != nil && !pkgerrors.Is(err1, sql.ErrNoRows) { + if err1 != nil && !errors.Is(err1, sql.ErrNoRows) { // If err is not a 'no rows' error, assume transient db issue and retry lp.lggr.Warnw("Unable to read latestBlockNumber currentBlock saved", "err", err1, "currentBlockNumber", currentBlockNumber) - return nil, pkgerrors.New("Unable to read latestBlockNumber currentBlock saved") + return nil, errors.New("Unable to read latestBlockNumber currentBlock saved") } // We will not have the previous currentBlock on initial poll. havePreviousBlock := err1 == nil @@ -880,7 +941,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, expectedParent.FinalizedBlockNumber) if err2 != nil { lp.lggr.Warnw("Unable to find LCA after reorg, retrying", "err", err2) - return nil, pkgerrors.New("Unable to find LCA after reorg, retrying") + return nil, errors.New("Unable to find LCA after reorg, retrying") } lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlockNumber) @@ -970,7 +1031,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int for { h := currentBlock.Hash var logs []types.Log - logs, err = lp.ec.FilterLogs(ctx, lp.Filter(nil, nil, &h)) + logs, err = lp.batchFetchLogs(ctx, nil, nil, &h) if err != nil { lp.lggr.Warnw("Unable to query for logs, retrying", "err", err, "block", currentBlockNumber) return @@ -1063,7 +1124,7 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He } } lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber) - rerr := pkgerrors.New("Reorg greater than finality depth") + rerr := errors.New("Reorg greater than finality depth") lp.SvcErrBuffer.Append(rerr) lp.finalityViolated.Store(true) return nil, rerr @@ -1246,7 +1307,7 @@ func (lp *logPoller) GetBlocksRange(ctx context.Context, numbers []uint64) ([]Lo } if len(blocksNotFound) > 0 { - return nil, pkgerrors.Errorf("blocks were not found in db or RPC call: %v", blocksNotFound) + return nil, fmt.Errorf("blocks were not found in db or RPC call: %v", blocksNotFound) } return blocks, nil @@ -1287,17 +1348,325 @@ func (lp *logPoller) fillRemainingBlocksFromRPC( return logPollerBlocks, nil } -func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int64) ([]*evmtypes.Head, error) { - reqs := make([]rpc.BatchElem, 0, len(blocksRequested)) - for _, num := range blocksRequested { - req := rpc.BatchElem{ - Method: "eth_getBlockByNumber", - Args: []interface{}{num, false}, - Result: &evmtypes.Head{}, +// mergeAddressesIntoGetLogsReq merges a new list of addresses into a GetLogs req, +// while preserving sort order and removing duplicates +func mergeAddressesIntoGetLogsReq(req *GetLogsBatchElem, newAddresses []common.Address) { + var merged []common.Address + var i, j int + addresses := req.Addresses() + + for i < len(addresses) && j < len(newAddresses) { + cmp := bytes.Compare(newAddresses[j].Bytes(), addresses[i].Bytes()) + if cmp < 0 { + merged = append(merged, newAddresses[j]) + j++ + } else if cmp > 0 { + merged = append(merged, addresses[i]) + i++ + } else { + merged = append(merged, addresses[i]) + i++ + j++ // only keep original, skip duplicate + continue } - reqs = append(reqs, req) } + // Append remaining elements, if any + merged = append(merged, newAddresses[j:]...) + merged = append(merged, addresses[i:]...) + + req.SetAddresses(merged) +} + +func make2DTopics(eventSigs, topics2, topics3, topics4 []common.Hash) (res [][]common.Hash) { + topics := [][]common.Hash{eventSigs, topics2, topics3, topics4} + lastTopic := len(topics) - 1 + for lastTopic >= 0 && topics[lastTopic] == nil { + lastTopic-- + } + + res = make([][]common.Hash, lastTopic+1) + copy(res, topics) + + return res +} + +// isTopicsSubset returns true if all of the sets in the list topicsA are subsets of or equal to the sets in topicsB. +// topicsA and topicsB each contain 4 (or any equal number of) sets of slices of topic values. +// +// Interpreting A & B as filters on the same contract address, "true" means that anything matching A will match B +// +// Assumptions: +// - every element of topicsA & topicsB are sorted lists containing no duplicates +func isTopicsSubset(topicsA [][]common.Hash, topicsB [][]common.Hash) bool { + if len(topicsB) > len(topicsA) { + return false // If topicsB requires a larger number of topics to be emitted, then B is a narrower filter than A + } + for i := range topicsB { // doesn't matter what topics[j] for j > len(topicsB) is, as that can only narrows filter A further + if len(topicsB[i]) == 0 { + continue // nil/empty list of topics matches all values, so topicsA[n] automatically a subset + } + if len(topicsA[i]) == 0 { + return false // topicsA[n] matches all values, but not topicsB[n], so topicsA is not a subset + } + topicsMapB := make(map[common.Hash]interface{}) + for _, b := range topicsB[i] { + topicsMapB[b] = struct{}{} + } + for _, a := range topicsA[i] { + if _, ok := topicsMapB[a]; !ok { + return false + } + } + } + return true +} + +func makeEventsTopicsKey(filter Filter) string { + // eventsTopicsKey is constructed to uniquely identify the particular combination of + // eventSigs and topics sets a filter has. Because we don't want the key to depend on + // the order of eventSigs, or the order of topic values for a specific topic index, we + // must make sure these 4 lists are sorted in the same way + + size := len(filter.EventSigs[0])*(len(filter.EventSigs)+len(filter.Topic2)+len(filter.Topic3)+len(filter.Topic4)) + 4 + var eventsTopicsKey = make([]byte, 0, size) + + appendHashes := func(hashes []common.Hash) { + for _, h := range hashes { + eventsTopicsKey = append(eventsTopicsKey, h[:]...) + } + eventsTopicsKey = append(eventsTopicsKey, 0xFF) // separator + } + appendHashes(filter.EventSigs) + appendHashes(filter.Topic2) + appendHashes(filter.Topic3) + appendHashes(filter.Topic4) + return hex.EncodeToString(eventsTopicsKey) +} + +func compareBlockNumbers(n, m *big.Int) (cmp int) { + if n != nil && m != nil { + return int(m.Uint64() - n.Uint64()) + } + if n == nil { + cmp-- + } + if m == nil { + cmp++ + } + return cmp +} + +// Exposes ethGetLogsReqs to tests, casting the results to []GetLogBatchElem and sorting them to make +// the output more predictable and convenient for assertions +func (lp *logPoller) EthGetLogsReqs(fromBlock, toBlock *big.Int, blockHash *common.Hash) []GetLogsBatchElem { + rawReqs := lp.ethGetLogsReqs(fromBlock, toBlock, blockHash) + reqs := make([]GetLogsBatchElem, len(rawReqs)) + for i := range rawReqs { + reqs[i] = GetLogsBatchElem(rawReqs[i]) + } + + slices.SortStableFunc(reqs, func(a, b GetLogsBatchElem) int { + nilA, nilB := a.BlockHash() == nil, b.BlockHash() == nil + if nilA && !nilB { + return -1 + } else if !nilA && nilB { + return 1 + } + if !nilB && !nilA { + if cmp := bytes.Compare(a.BlockHash()[:], b.BlockHash()[:]); cmp != 0 { + return cmp + } + } + + if cmp := compareBlockNumbers(a.FromBlock(), b.FromBlock()); cmp != 0 { + return cmp + } + + if cmp := compareBlockNumbers(a.ToBlock(), b.ToBlock()); cmp != 0 { + return cmp + } + + addressesA, addressesB := a.Addresses(), b.Addresses() + if len(addressesA) != len(addressesB) { + return len(addressesA) - len(addressesB) + } + for i := range addressesA { + if cmp := bytes.Compare(addressesA[i][:], addressesB[i][:]); cmp != 0 { + return cmp + } + } + + topicsA, topicsB := a.Topics(), b.Topics() + if len(topicsA) != len(topicsB) { // should both be 4, but may as well handle more general case + return len(topicsA) - len(topicsB) + } + for i := range topicsA { + if len(topicsA[i]) != len(topicsB[i]) { + return len(topicsA[i]) - len(topicsB[i]) + } + for j := range topicsA[i] { + if cmp := bytes.Compare(topicsA[i][j][:], topicsB[i][j][:]); cmp != 0 { + return cmp + } + } + } + return 0 // a and b are identical + }) + return reqs +} + +// The topics passed into RegisterFilter are 2D slices backed by arrays that the +// caller could change at any time. We must have our own deep copy that's +// immutable and thread safe. Similarly, we don't want to pass our mutex-protected +// copy down the stack while sending batch requests and waiting for responses +func copyTopics(topics [][]common.Hash) (clone [][]common.Hash) { + clone = make([][]common.Hash, len(topics)) + for i, topic := range topics { + clone[i] = make([]common.Hash, len(topic)) + copy(clone[i], topics[i]) + } + return clone +} + +// ethGetLogsReqs generates a batched rpc reqs for all logs matching registered filters, +// copying cached reqs and filling in block range/hash if none of the registered filters have changed +func (lp *logPoller) ethGetLogsReqs(fromBlock, toBlock *big.Int, blockHash *common.Hash) []rpc.BatchElem { + lp.filtersMu.Lock() + + if len(lp.removedFilters) != 0 || len(lp.newFilters) != 0 { + deletedAddresses := map[common.Address]struct{}{} + deletedEventsTopicsKeys := map[string]struct{}{} + + lp.lggr.Debugw("ethGetLogsReqs: dirty cache, rebuilding reqs indices", "removedFilters", lp.removedFilters, "newFilters", lp.newFilters) + + // First, remove any reqs corresponding to removed filters + // Some of them we may still need, they will be rebuilt on the next pass + for _, filter := range lp.removedFilters { + eventsTopicsKey := makeEventsTopicsKey(filter) + deletedEventsTopicsKeys[eventsTopicsKey] = struct{}{} + delete(lp.cachedReqsByEventsTopicsKey, eventsTopicsKey) + for _, address := range filter.Addresses { + deletedAddresses[address] = struct{}{} + delete(lp.cachedReqsByAddress, address) + } + } + lp.removedFilters = nil + + // Merge/add any new filters. + for _, filter := range lp.filters { + var newReq *GetLogsBatchElem + + eventsTopicsKey := makeEventsTopicsKey(filter) + _, isNew := lp.newFilters[filter.Name] + + _, hasDeletedTopics := deletedEventsTopicsKeys[eventsTopicsKey] + var hasDeletedAddress bool + for _, addr := range filter.Addresses { + if _, hasDeletedAddress = deletedAddresses[addr]; hasDeletedAddress { + break + } + } + + if !(isNew || hasDeletedTopics || hasDeletedAddress) { + continue // only rebuild reqs associated with new filters or those sharing topics or addresses with a removed filter + } + + if req, ok2 := lp.cachedReqsByEventsTopicsKey[eventsTopicsKey]; ok2 { + // merge this filter with other filters with the same events and topics lists + mergeAddressesIntoGetLogsReq(req, filter.Addresses) + continue + } + + for _, addr := range filter.Addresses { + if reqsForAddress, ok2 := lp.cachedReqsByAddress[addr]; !ok2 { + newReq = NewGetLogsReq(filter) + lp.cachedReqsByEventsTopicsKey[eventsTopicsKey] = newReq + lp.cachedReqsByAddress[addr] = []*GetLogsBatchElem{newReq} + } else { + newTopics := make2DTopics(filter.EventSigs, filter.Topic2, filter.Topic3, filter.Topic4) + + for i, req := range reqsForAddress { + topics := req.Topics() + if isTopicsSubset(newTopics, topics) { + // Already covered by existing req + break + } else if isTopicsSubset(topics, newTopics) { + // Replace existing req by new req which includes it + reqsForAddress[i] = NewGetLogsReq(filter) + lp.cachedReqsByAddress[addr] = reqsForAddress + break + } + // Nothing similar enough found for this address, add a new req + lp.cachedReqsByEventsTopicsKey[eventsTopicsKey] = NewGetLogsReq(filter) + lp.cachedReqsByAddress[addr] = append(reqsForAddress, lp.cachedReqsByEventsTopicsKey[eventsTopicsKey]) + } + } + } + } + lp.newFilters = make(map[string]struct{}) + } + lp.filtersMu.Unlock() + + blockParams := map[string]interface{}{} + if blockHash != nil { + blockParams["blockHash"] = blockHash + } + if fromBlock != nil { + blockParams["fromBlock"] = rpc.BlockNumber(fromBlock.Uint64()).String() + } + if toBlock != nil { + blockParams["toBlock"] = rpc.BlockNumber(toBlock.Uint64()).String() + } + + // Fill fromBlock, toBlock, & blockHash while deep-copying cached reqs into a result array + reqs := make([]rpc.BatchElem, 0, len(lp.cachedReqsByEventsTopicsKey)) + for _, req := range lp.cachedReqsByEventsTopicsKey { + addresses := make([]common.Address, len(req.Addresses())) + copy(addresses, req.Addresses()) + topics := copyTopics(req.Topics()) + + params := maps.Clone(blockParams) + params["address"] = addresses + params["topics"] = topics + + reqs = append(reqs, rpc.BatchElem{ + Method: req.Method, + Args: []interface{}{params}, + Result: new([]types.Log), + }) + } + + return reqs +} + +// batchFetchLogs fetches logs for either a single block by block hash, or by block range, +// rebuilding the cached reqs if necessary, sending them to the rpc server, and parsing the results +// Requests for different filters are sent in parallel batches. For block range requests, the +// block range is also broken up into serial batches +func (lp *logPoller) batchFetchLogs(ctx context.Context, fromBlock *big.Int, toBlock *big.Int, blockHash *common.Hash) ([]types.Log, error) { + reqs := lp.ethGetLogsReqs(fromBlock, toBlock, blockHash) + + lp.lggr.Debugw("batchFetchLogs: sending batched requests", "rpcBatchSize", lp.rpcBatchSize, "numReqs", len(reqs)) + if err := lp.sendBatchedRequests(ctx, lp.rpcBatchSize, reqs); err != nil { + return nil, err + } + + var logs []types.Log + for _, req := range reqs { + if req.Error != nil { + return nil, req.Error + } + res, ok := req.Result.(*[]types.Log) + if !ok { + return nil, fmt.Errorf("expected result type %T from eth_getLogs request, got %T", res, req.Result) + } + logs = append(logs, *res...) + } + return logs, nil +} + +func (lp *logPoller) sendBatchedRequests(ctx context.Context, batchSize int64, reqs []rpc.BatchElem) error { for i := 0; i < len(reqs); i += int(batchSize) { j := i + int(batchSize) if j > len(reqs) { @@ -1306,9 +1675,27 @@ func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []str err := lp.ec.BatchCallContext(ctx, reqs[i:j]) if err != nil { - return nil, err + return err } } + return nil +} + +func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int64) ([]*evmtypes.Head, error) { + reqs := make([]rpc.BatchElem, 0, len(blocksRequested)) + for _, num := range blocksRequested { + req := rpc.BatchElem{ + Method: "eth_getBlockByNumber", + Args: []interface{}{num, false}, + Result: &evmtypes.Head{}, + } + reqs = append(reqs, req) + } + + err := lp.sendBatchedRequests(ctx, batchSize, reqs) + if err != nil { + return nil, err + } var blocks = make([]*evmtypes.Head, 0, len(reqs)) for _, r := range reqs { @@ -1318,16 +1705,16 @@ func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []str block, is := r.Result.(*evmtypes.Head) if !is { - return nil, pkgerrors.Errorf("expected result to be a %T, got %T", &evmtypes.Head{}, r.Result) + return nil, fmt.Errorf("expected result to be a %T, got %T", &evmtypes.Head{}, r.Result) } if block == nil { - return nil, pkgerrors.New("invariant violation: got nil block") + return nil, errors.New("invariant violation: got nil block") } if block.Hash == (common.Hash{}) { - return nil, pkgerrors.Errorf("missing block hash for block number: %d", block.Number) + return nil, fmt.Errorf("missing block hash for block number: %d", block.Number) } if block.Number < 0 { - return nil, pkgerrors.Errorf("expected block number to be >= to 0, got %d", block.Number) + return nil, fmt.Errorf("expected block number to be >= to 0, got %d", block.Number) } blocks = append(blocks, block) } diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index b6af0f7de5c..5aa85de7cbf 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -54,8 +54,8 @@ func validateFiltersTable(t *testing.T, lp *logPoller, orm ORM) { func TestLogPoller_RegisterFilter(t *testing.T) { t.Parallel() - a1 := common.HexToAddress("0x2ab9a2dc53736b361b72d900cdf9f78f9406fbbb") - a2 := common.HexToAddress("0x2ab9a2dc53736b361b72d900cdf9f78f9406fbbc") + a1 := common.HexToAddress("0x2Ab9A2Dc53736b361b72D900Cdf9f78F9406FBBb") + a2 := common.HexToAddress("0x2Ab9A2Dc53736b361b72D900Cdf9f78F9406FBBc") lggr, observedLogs := logger.TestObserved(t, zapcore.WarnLevel) chainID := testutils.NewRandomEVMChainID() @@ -73,29 +73,64 @@ func TestLogPoller_RegisterFilter(t *testing.T) { } lp := NewLogPoller(orm, nil, lggr, lpOpts) - // We expect a zero Filter if nothing registered yet. - f := lp.Filter(nil, nil, nil) - require.Equal(t, 1, len(f.Addresses)) - assert.Equal(t, common.HexToAddress("0x0000000000000000000000000000000000000000"), f.Addresses[0]) + // We expect empty list of reqs if nothing registered yet. + reqs := lp.EthGetLogsReqs(nil, nil, nil) + require.Len(t, reqs, 0) + topics1 := [][]common.Hash{{EmitterABI.Events["Log1"].ID}} + topics2 := [][]common.Hash{{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}} + err := lp.RegisterFilter(ctx, Filter{Name: "Emitter Log 1, address 1", EventSigs: topics1[0], Addresses: []common.Address{a1}}) + require.NoError(t, err) + reqs = lp.EthGetLogsReqs(nil, nil, nil) + require.Len(t, reqs, 1) + assert.Equal(t, []common.Address{a1}, reqs[0].Addresses()) + assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID}}, reqs[0].Topics()) + validateFiltersTable(t, lp, orm) + + err = lp.RegisterFilter(ctx, Filter{Name: "Emitter Log 1 + 2, address 2", EventSigs: topics2[0], Addresses: []common.Address{a2}}) + require.NoError(t, err) + reqs = lp.EthGetLogsReqs(nil, nil, nil) + require.Len(t, reqs, 2) // should have 2 separate reqs, since these cannot be merged + getLogsReq1 := reqs[0] + getLogsReq2 := reqs[1] + addresses1 := reqs[0].Addresses() + require.Len(t, addresses1, 1) + addresses2 := reqs[1].Addresses() + require.Len(t, addresses2, 1) + assert.Equal(t, a1, addresses1[0]) + assert.Equal(t, a2, addresses2[0]) + assert.Equal(t, topics1, getLogsReq1.Topics()) + assert.Equal(t, topics2, getLogsReq2.Topics()) + validateFiltersTable(t, lp, orm) - err := lp.RegisterFilter(ctx, Filter{Name: "Emitter Log 1", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, Addresses: []common.Address{a1}}) + // Reqs should not change when a duplicate filter is added + err = lp.RegisterFilter(ctx, Filter{Name: "Emitter Log 1 + 2 dupe", EventSigs: topics2[0], Addresses: []common.Address{a2}}) require.NoError(t, err) - assert.Equal(t, []common.Address{a1}, lp.Filter(nil, nil, nil).Addresses) - assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID}}, lp.Filter(nil, nil, nil).Topics) + reqs = lp.EthGetLogsReqs(nil, nil, nil) + require.Len(t, reqs, 2) + assert.Equal(t, []common.Address{a1}, reqs[0].Addresses()) + assert.Equal(t, topics1, reqs[0].Topics()) + assert.Equal(t, []common.Address{a2}, reqs[1].Addresses()) + assert.Equal(t, topics2, reqs[1].Topics()) validateFiltersTable(t, lp, orm) - // Should de-dupe EventSigs - err = lp.RegisterFilter(ctx, Filter{Name: "Emitter Log 1 + 2", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, Addresses: []common.Address{a2}}) + // Same address as "Emitter Log 1 + 2, address 2", but only looking for Log1 (subset of topics). Recs should not change + err = lp.RegisterFilter(ctx, Filter{Name: "Emitter Log 2, address 2", EventSigs: evmtypes.HashArray{topics2[0][1]}, Addresses: []common.Address{a2}}) require.NoError(t, err) - assert.Equal(t, []common.Address{a1, a2}, lp.Filter(nil, nil, nil).Addresses) - assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}}, lp.Filter(nil, nil, nil).Topics) + reqs = lp.EthGetLogsReqs(nil, nil, nil) + require.Len(t, reqs, 2) + assert.Equal(t, []common.Address{a1}, reqs[0].Addresses()) + assert.Equal(t, topics1, reqs[0].Topics()) + assert.Equal(t, []common.Address{a2}, reqs[1].Addresses()) + assert.Equal(t, topics2, reqs[1].Topics()) validateFiltersTable(t, lp, orm) - // Should de-dupe Addresses - err = lp.RegisterFilter(ctx, Filter{Name: "Emitter Log 1 + 2 dupe", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, Addresses: []common.Address{a2}}) + // Same address as "Emitter Log 1, address 1", but different topics lists. Should create new rec + err = lp.RegisterFilter(ctx, Filter{Name: "Emitter Log 2, address 1", EventSigs: evmtypes.HashArray{topics2[0][1]}, Addresses: []common.Address{a1}}) require.NoError(t, err) - assert.Equal(t, []common.Address{a1, a2}, lp.Filter(nil, nil, nil).Addresses) - assert.Equal(t, [][]common.Hash{{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}}, lp.Filter(nil, nil, nil).Topics) + reqs = lp.EthGetLogsReqs(nil, nil, nil) + require.Len(t, reqs, 3) + assert.Equal(t, []common.Address{a1}, reqs[1].Addresses()) + assert.Equal(t, [][]common.Hash{{topics2[0][1]}}, reqs[1].Topics()) validateFiltersTable(t, lp, orm) // Address required. @@ -113,35 +148,46 @@ func TestLogPoller_RegisterFilter(t *testing.T) { require.Contains(t, observedLogs.TakeAll()[0].Entry.Message, "not found") // Check that all filters are still there - _, ok := lp.filters["Emitter Log 1"] - require.True(t, ok, "'Emitter Log 1 Filter' missing") - _, ok = lp.filters["Emitter Log 1 + 2"] - require.True(t, ok, "'Emitter Log 1 + 2' Filter missing") + _, ok := lp.filters["Emitter Log 1, address 1"] + require.True(t, ok, "'Emitter Log 1, address 1' Filter missing") + _, ok = lp.filters["Emitter Log 1 + 2, address 2"] + require.True(t, ok, "'Emitter Log 1 + 2, address 2' Filter missing") _, ok = lp.filters["Emitter Log 1 + 2 dupe"] require.True(t, ok, "'Emitter Log 1 + 2 dupe' Filter missing") + _, ok = lp.filters["Emitter Log 2, address 2"] + require.True(t, ok, "'Emitter Log 2, address 2' Filter missing") + _, ok = lp.filters["Emitter Log 2, address 1"] + require.True(t, ok, "'Emitter Log 2, address 1' Filter missing") // Removing an existing Filter should remove it from both memory and db - err = lp.UnregisterFilter(ctx, "Emitter Log 1 + 2") + err = lp.UnregisterFilter(ctx, "Emitter Log 1 + 2, address 2") require.NoError(t, err) - _, ok = lp.filters["Emitter Log 1 + 2"] + _, ok = lp.filters["Emitter Log 1 + 2, address 2"] require.False(t, ok, "'Emitter Log 1 Filter' should have been removed by UnregisterFilter()") - require.Len(t, lp.filters, 2) + require.Len(t, lp.filters, 4) validateFiltersTable(t, lp, orm) err = lp.UnregisterFilter(ctx, "Emitter Log 1 + 2 dupe") require.NoError(t, err) - err = lp.UnregisterFilter(ctx, "Emitter Log 1") + err = lp.UnregisterFilter(ctx, "Emitter Log 1, address 1") + require.NoError(t, err) + err = lp.UnregisterFilter(ctx, "Emitter Log 2, address 1") + require.NoError(t, err) + err = lp.UnregisterFilter(ctx, "Emitter Log 2, address 2") require.NoError(t, err) assert.Len(t, lp.filters, 0) filters, err := lp.orm.LoadFilters(ctx) require.NoError(t, err) assert.Len(t, filters, 0) + require.Len(t, lp.newFilters, 0) + require.Len(t, lp.removedFilters, 5) + + lp.EthGetLogsReqs(nil, nil, nil) + // Make sure cache was invalidated - assert.Len(t, lp.Filter(nil, nil, nil).Addresses, 1) - assert.Equal(t, lp.Filter(nil, nil, nil).Addresses[0], common.HexToAddress("0x0000000000000000000000000000000000000000")) - assert.Len(t, lp.Filter(nil, nil, nil).Topics, 1) - assert.Len(t, lp.Filter(nil, nil, nil).Topics[0], 0) + assert.Len(t, lp.cachedReqsByEventsTopicsKey, 0) + assert.Len(t, lp.cachedReqsByAddress, 0) } func TestLogPoller_ConvertLogs(t *testing.T) { @@ -200,27 +246,15 @@ func TestFilterName(t *testing.T) { } func TestLogPoller_BackupPollerStartup(t *testing.T) { - addr := common.HexToAddress("0x2ab9a2dc53736b361b72d900cdf9f78f9406fbbc") lggr, observedLogs := logger.TestObserved(t, zapcore.WarnLevel) chainID := testutils.FixtureChainID db := pgtest.NewSqlxDB(t) orm := NewORM(chainID, db, lggr) head := evmtypes.Head{Number: 3} - events := []common.Hash{EmitterABI.Events["Log1"].ID} - log1 := types.Log{ - Index: 0, - BlockHash: common.Hash{}, - BlockNumber: uint64(3), - Topics: events, - Address: addr, - TxHash: common.HexToHash("0x1234"), - Data: EvmWord(uint64(300)).Bytes(), - } ec := evmclimocks.NewClient(t) ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil) - ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil) ec.On("ConfiguredChainID").Return(chainID, nil) ctx := testutils.Context(t) @@ -258,19 +292,10 @@ func TestLogPoller_Replay(t *testing.T) { head := evmtypes.Head{Number: 4} events := []common.Hash{EmitterABI.Events["Log1"].ID} - log1 := types.Log{ - Index: 0, - BlockHash: common.Hash{}, - BlockNumber: uint64(head.Number), - Topics: events, - Address: addr, - TxHash: common.HexToHash("0x1234"), - Data: EvmWord(uint64(300)).Bytes(), - } ec := evmclimocks.NewClient(t) ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil) - ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Twice() + ec.On("BatchCallContext", mock.Anything, mock.Anything).Return(nil).Twice() ec.On("ConfiguredChainID").Return(chainID, nil) lpOpts := Opts{ PollPeriod: time.Hour, @@ -282,6 +307,9 @@ func TestLogPoller_Replay(t *testing.T) { } lp := NewLogPoller(orm, ec, lggr, lpOpts) + err := lp.RegisterFilter(ctx, Filter{Name: "Emitter Log 1", EventSigs: events, Addresses: []common.Address{addr}}) + require.NoError(t, err) + // process 1 log in block 3 lp.PollAndSaveLogs(ctx, 4) latest, err := lp.LatestBlock(ctx) @@ -344,7 +372,7 @@ func TestLogPoller_Replay(t *testing.T) { rctx, rcancel := context.WithCancel(testutils.Context(t)) var wg sync.WaitGroup defer func() { wg.Wait() }() - ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) { + ec.On("BatchCallContext", mock.Anything, mock.Anything).Once().Return(nil).Run(func(args mock.Arguments) { wg.Add(1) go func() { defer wg.Done() @@ -352,7 +380,7 @@ func TestLogPoller_Replay(t *testing.T) { close(cancelled) }() }) - ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) { + ec.On("BatchCallContext", mock.Anything, mock.Anything).Once().Return(nil).Run(func(args mock.Arguments) { rcancel() wg.Add(1) go func() { @@ -369,7 +397,7 @@ func TestLogPoller_Replay(t *testing.T) { <-cancelled }) - ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Maybe() // in case task gets delayed by >= 100ms + ec.On("BatchCallContext", mock.Anything, mock.Anything).Return(nil).Maybe() // in case task gets delayed by >= 100ms t.Cleanup(lp.reset) servicetest.Run(t, lp) @@ -382,7 +410,7 @@ func TestLogPoller_Replay(t *testing.T) { }) // remove Maybe expectation from prior subtest, as it will override all expected calls in future subtests - ec.On("FilterLogs", mock.Anything, mock.Anything).Unset() + ec.On("BatchCallContext", mock.Anything, mock.Anything).Unset() // run() should abort if log poller shuts down while replay is in progress t.Run("shutdown during replay", func(t *testing.T) { @@ -392,7 +420,7 @@ func TestLogPoller_Replay(t *testing.T) { done := make(chan struct{}) defer func() { <-done }() - ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) { + ec.On("BatchCallContext", mock.Anything, mock.Anything).Once().Return(nil).Run(func(args mock.Arguments) { go func() { defer close(done) select { @@ -401,11 +429,11 @@ func TestLogPoller_Replay(t *testing.T) { } }() }) - ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) { + ec.On("BatchCallContext", mock.Anything, mock.Anything).Once().Return(nil).Run(func(args mock.Arguments) { lp.cancel() close(pass) }) - ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Maybe() // in case task gets delayed by >= 100ms + ec.On("BatchCallContext", mock.Anything, mock.Anything).Return(nil).Maybe() // in case task gets delayed by >= 100ms t.Cleanup(lp.reset) servicetest.Run(t, lp) @@ -554,8 +582,55 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { }) } -func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) { +func Test_GetLogsReqHelpers(t *testing.T) { + t.Parallel() + + filter1 := Filter{ + Name: "filter1", + Addresses: evmtypes.AddressArray{testutils.NewAddress()}, + EventSigs: evmtypes.HashArray{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, + Topic2: []common.Hash{EmitterABI.Events["Log3"].ID, EmitterABI.Events["Log4"].ID}, + } + + blockHash := common.HexToHash("0x1234") + + testCases := []struct { + name string + filter Filter + fromBlock *big.Int + toBlock *big.Int + blockHash *common.Hash + }{ + {"block range", filter1, big.NewInt(5), big.NewInt(10), nil}, + {"block hash", filter1, nil, nil, &blockHash}, + } + + for _, c := range testCases { + var req *GetLogsBatchElem + t.Run("createGetLogsRec", func(t *testing.T) { + req = NewGetLogsReq(filter1) + assert.Equal(t, req.Method, "eth_getLogs") + require.Len(t, req.Args, 1) + assert.Equal(t, c.filter.Addresses, evmtypes.AddressArray(req.Addresses())) + assert.Equal(t, c.filter.EventSigs, evmtypes.HashArray(req.Topics()[0])) + assert.Equal(t, c.filter.Topic2, evmtypes.HashArray(req.Topics()[1])) + }) + t.Run("mergeAddressesIntoGetLogsReq", func(t *testing.T) { + newAddresses := []common.Address{testutils.NewAddress(), testutils.NewAddress()} + mergeAddressesIntoGetLogsReq(req, newAddresses) + assert.Len(t, req.Addresses(), len(c.filter.Addresses)+2) + assert.Contains(t, req.Addresses(), newAddresses[0]) + assert.Contains(t, req.Addresses(), newAddresses[1]) + }) + } +} + +func benchmarkEthGetLogsReqs(b *testing.B, nFilters, nAddresses, nEvents int) { lggr := logger.Test(b) + chainID := testutils.NewRandomEVMChainID() + db := pgtest.NewSqlxDB(b) + + orm := NewORM(chainID, db, lggr) lpOpts := Opts{ PollPeriod: time.Hour, FinalityDepth: 2, @@ -563,7 +638,7 @@ func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) { RpcBatchSize: 2, KeepFinalizedBlocksDepth: 1000, } - lp := NewLogPoller(nil, nil, lggr, lpOpts) + lp := NewLogPoller(orm, nil, lggr, lpOpts) for i := 0; i < nFilters; i++ { var addresses []common.Address var events []common.Hash @@ -578,16 +653,16 @@ func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) { } b.ResetTimer() for n := 0; n < b.N; n++ { - lp.Filter(nil, nil, nil) + lp.EthGetLogsReqs(nil, nil, nil) } } -func BenchmarkFilter10_1(b *testing.B) { - benchmarkFilter(b, 10, 1, 1) +func BenchmarkEthGetLogsReqs10_1(b *testing.B) { + benchmarkEthGetLogsReqs(b, 10, 1, 1) } -func BenchmarkFilter100_10(b *testing.B) { - benchmarkFilter(b, 100, 10, 10) +func BenchmarkEthGetLogsReqs100_10(b *testing.B) { + benchmarkEthGetLogsReqs(b, 100, 10, 10) } -func BenchmarkFilter1000_100(b *testing.B) { - benchmarkFilter(b, 1000, 100, 100) +func BenchmarkEthGetLogsReqs1000_100(b *testing.B) { + benchmarkEthGetLogsReqs(b, 1000, 100, 100) } diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 2096ccf3cf4..d929e4c4584 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -17,6 +17,7 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rpc" "github.com/leanovate/gopter" "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" @@ -150,6 +151,7 @@ func TestPopulateLoadedDB(t *testing.T) { func TestLogPoller_Integration(t *testing.T) { lpOpts := logpoller.Opts{ + PollPeriod: 1 * time.Hour, FinalityDepth: 2, BackfillBatchSize: 3, RpcBatchSize: 2, @@ -161,11 +163,13 @@ func TestLogPoller_Integration(t *testing.T) { ctx := testutils.Context(t) require.NoError(t, th.LogPoller.RegisterFilter(ctx, logpoller.Filter{Name: "Integration test", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, Addresses: []common.Address{th.EmitterAddress1}})) - require.Len(t, th.LogPoller.Filter(nil, nil, nil).Addresses, 1) - require.Len(t, th.LogPoller.Filter(nil, nil, nil).Topics, 1) - - require.Len(t, th.LogPoller.Filter(nil, nil, nil).Addresses, 1) - require.Len(t, th.LogPoller.Filter(nil, nil, nil).Topics, 1) + reqs := th.LogPoller.EthGetLogsReqs(nil, nil, nil) + require.Len(t, reqs, 1) + req := reqs[0] + require.Len(t, req.Addresses(), 1) + topics := req.Topics() + require.Len(t, topics, 1) + require.Len(t, topics[0], 1) // Emit some logs in blocks 3->7. for i := 0; i < 5; i++ { @@ -189,7 +193,7 @@ func TestLogPoller_Integration(t *testing.T) { require.Equal(t, 4, len(logs)) // Once the backup poller runs we should also have the log from block 3 - testutils.AssertEventually(t, func() bool { + testutils.RequireEventually(t, func() bool { l, err2 := th.LogPoller.Logs(ctx, 3, 3, EmitterABI.Events["Log1"].ID, th.EmitterAddress1) require.NoError(t, err2) return len(l) == 1 @@ -672,8 +676,10 @@ func TestLogPoller_BlockTimestamps(t *testing.T) { th.PollAndSaveLogs(ctx, lb.BlockNumber+1) lg1, err := th.LogPoller.Logs(ctx, 0, 20, EmitterABI.Events["Log1"].ID, th.EmitterAddress1) require.NoError(t, err) + require.NotNil(t, lg1) lg2, err := th.LogPoller.Logs(ctx, 0, 20, EmitterABI.Events["Log2"].ID, th.EmitterAddress2) require.NoError(t, err) + require.NotNil(t, lg2) // Logs should have correct timestamps b, _ := th.Client.BlockByHash(ctx, lg1[0].BlockHash) @@ -1181,6 +1187,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { // Check that L1_1 has a proper data payload lgs, err := th.ORM.SelectLogsByBlockRange(testutils.Context(t), 2, 2) require.NoError(t, err) + require.NotNil(t, lgs) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000001`), lgs[0].Data) // Single block reorg and log poller not working for a while, mine blocks and progress with finalization @@ -1564,16 +1571,36 @@ func TestTooManyLogResults(t *testing.T) { return &evmtypes.Head{Number: blockNumber.Int64()}, nil }) - call2 := ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) { - if fq.BlockHash != nil { - return []types.Log{}, nil // succeed when single block requested + // If the # of blocks requested exceeds sizeLimit, simulate a "too many results" error from rpc server + handleBatchCall := func(b []rpc.BatchElem, sizeLimit uint64) error { + require.Len(t, b, 1) + require.Equal(t, "eth_getLogs", b[0].Method) + require.Len(t, b[0].Args, 1) + params := b[0].Args[0].(map[string]interface{}) + blockHash, ok := params["blockHash"] + if ok && blockHash.(*common.Hash) != nil { + return nil // succeed when single block requested } - from := fq.FromBlock.Uint64() - to := fq.ToBlock.Uint64() - if to-from >= 4 { - return []types.Log{}, &clientErr // return "too many results" error if block range spans 4 or more blocks + fromBlock, ok := params["fromBlock"] + require.True(t, ok) + toBlock, ok := params["toBlock"] + require.True(t, ok) + + from, err := hexutil.DecodeBig(fromBlock.(string)) + require.NoError(t, err) + require.NotNil(t, from) + to, err := hexutil.DecodeBig(toBlock.(string)) + require.NoError(t, err) + require.NotNil(t, to) + + if to.Uint64()-from.Uint64() >= sizeLimit { + return &clientErr // return "too many results" error if block range spans more than sizeLimit # of blocks } - return logs, err + return nil + } + + call2 := ec.On("BatchCallContext", mock.Anything, mock.Anything).Return(func(ctx context.Context, b []rpc.BatchElem) error { + return handleBatchCall(b, 4) // error out if requesting more than 4 blocks at once }) addr := testutils.NewAddress() @@ -1607,11 +1634,8 @@ func TestTooManyLogResults(t *testing.T) { } return &evmtypes.Head{Number: blockNumber.Int64()}, nil }) - call2.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) { - if fq.BlockHash != nil { - return []types.Log{}, nil // succeed when single block requested - } - return []types.Log{}, &clientErr // return "too many results" error if block range spans 4 or more blocks + call2.On("BatchCallContext", mock.Anything, mock.Anything).Return(func(ctx context.Context, b []rpc.BatchElem) error { + return handleBatchCall(b, 0) // error out for any range request }) lp.PollAndSaveLogs(ctx, 298) @@ -1920,3 +1944,122 @@ func markBlockAsFinalizedByHash(t *testing.T, th TestHarness, blockHash common.H require.NoError(t, err) th.Client.Blockchain().SetFinalized(b.Header()) } + +func Test_EthGetLogsRecs(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + + lpOpts := logpoller.Opts{ + UseFinalityTag: true, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + th := SetupTH(t, lpOpts) + lp := th.LogPoller + + addr1 := common.HexToAddress("0x0000000000000000000000000000001111111111") + addr2 := common.HexToAddress("0x0000000000000000000000000000002222222222") + addr3 := common.HexToAddress("0x0000000000000000000000000000003333333333") + + event1 := common.HexToHash("0x112233440000000000000000111111111111111111111111111111111111101") + event2 := common.HexToHash("0x112233440000000000000000222222222222222222222222222222222222202") + event3 := common.HexToHash("0x112233440000000000000000333333333333333333333333333333333333303") + event4 := common.HexToHash("0x112233440000000000000000444444444444444444444444444444444444404") + + topicA := common.HexToHash("0x00000000000000000000000000000000000000000000000000000000000aaaa") + topicB := common.HexToHash("0x00000000000000000000000000000000000000000000000000000000000bbbb") + topicC := common.HexToHash("0x00000000000000000000000000000000000000000000000000000000000cccc") + topicD := common.HexToHash("0x00000000000000000000000000000000000000000000000000000000000dddd") + + filters := map[string]*logpoller.Filter{ + "1 event sig, 2 addresses": {EventSigs: evmtypes.HashArray{event1}, Addresses: evmtypes.AddressArray{addr1, addr2}}, + "3 event sigs, 2 addresses": {EventSigs: evmtypes.HashArray{event2, event3, event4}, Addresses: evmtypes.AddressArray{addr2, addr3}}, + "1 event sig, 2 topic2 values": {EventSigs: evmtypes.HashArray{event1}, Addresses: evmtypes.AddressArray{addr2}, Topic2: evmtypes.HashArray{topicA, topicB}}, + "2 addresses, 2 event sigs, topics 2, 3, and 4 filtered": { + EventSigs: evmtypes.HashArray{event1}, + Addresses: evmtypes.AddressArray{addr3}, + Topic2: evmtypes.HashArray{topicA}, + Topic3: evmtypes.HashArray{topicB}, + Topic4: evmtypes.HashArray{topicC, topicD}, + }, + } + for name, filter := range filters { + filter.Name = name + } + + otherFilter := *filters["1 event sig, 2 topic2 values"] // update Topics2, but keep name of filter the same + otherFilter.Topic2 = evmtypes.HashArray{topicD, topicA} + filters["1 eventsig, 2 other topic2 values"] = &otherFilter + + req1 := *logpoller.NewGetLogsReq(*filters["1 event sig, 2 addresses"]) + req2 := *logpoller.NewGetLogsReq(*filters["3 event sigs, 2 addresses"]) + req3 := *logpoller.NewGetLogsReq(*filters["1 event sig, 2 topic2 values"]) + req4 := *logpoller.NewGetLogsReq(*filters["2 addresses, 2 event sigs, topics 2, 3, and 4 filtered"]) + + tests := []struct { + name string + newFilters []string + removedFilters []string + expected []logpoller.GetLogsBatchElem + }{ + { + name: "add filter with 2 addresses", + newFilters: []string{"1 event sig, 2 addresses"}, + removedFilters: []string{}, + expected: []logpoller.GetLogsBatchElem{req1}, + }, + { + name: "add filter with 3 event sigs", + newFilters: []string{"3 event sigs, 2 addresses"}, + removedFilters: []string{}, + expected: []logpoller.GetLogsBatchElem{req1, req2}, + }, + { + name: "add filter with 2 topic values", + newFilters: []string{"1 event sig, 2 topic2 values"}, + removedFilters: []string{}, + expected: []logpoller.GetLogsBatchElem{req1, req2}, // 3rd filter is already covered by existing filters + }, + { + name: "add filter with complex topic filtering", + newFilters: []string{"2 addresses, 2 event sigs, topics 2, 3, and 4 filtered"}, + removedFilters: []string{}, + expected: []logpoller.GetLogsBatchElem{req4, req1, req2}, // 3rd req for 4th filter added + }, + { + name: "update topic values in filter", + newFilters: []string{"1 event sig, 2 topic2 values"}, + removedFilters: []string{"1 event sig, 2 other topic2 values"}, + expected: []logpoller.GetLogsBatchElem{req4, req1, req2}, // shouldn't change reqs + }, + { + name: "remove first two filters", + newFilters: []string{}, + removedFilters: []string{"1 event sig, 2 addresses", "3 event sigs, 2 addresses"}, + expected: []logpoller.GetLogsBatchElem{req3, req4}, // when req1 & req2 are removed, narrower req3 should be added + }, + { + name: "add a filter and remove one", + newFilters: []string{"1 event sig, 2 addresses"}, + removedFilters: []string{"1 event sig, 2 topic2 values"}, + expected: []logpoller.GetLogsBatchElem{req4, req1}, // req1 added for 1st filter, req3 removed + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for _, name := range tt.newFilters { + err := lp.RegisterFilter(ctx, *filters[name]) + require.NoError(t, err) + } + + for _, name := range tt.removedFilters { + err := lp.UnregisterFilter(ctx, name) + require.NoError(t, err) + } + reqs := lp.EthGetLogsReqs(nil, nil, nil) + assert.Equal(t, tt.expected, reqs) + }) + } +} diff --git a/core/internal/testutils/testutils.go b/core/internal/testutils/testutils.go index f4867eda69a..96c4f2cbc1e 100644 --- a/core/internal/testutils/testutils.go +++ b/core/internal/testutils/testutils.go @@ -353,11 +353,16 @@ func IntToHex(n int) string { // risk of spamming const TestInterval = 100 * time.Millisecond -// AssertEventually waits for f to return true +// AssertEventually waits for f to return true; test will continue even if condition is never satisfied func AssertEventually(t *testing.T, f func() bool) { assert.Eventually(t, f, WaitTimeout(t), TestInterval/2) } +// RequireEventually waits for f to return true; test fails immediately if timeout is reached +func RequireEventually(t *testing.T, f func() bool) { + require.Eventually(t, f, WaitTimeout(t), TestInterval/2) +} + // RequireLogMessage fails the test if emitted logs don't contain the given message func RequireLogMessage(t *testing.T, observedLogs *observer.ObservedLogs, msg string) { for _, l := range observedLogs.All() { diff --git a/core/services/vrf/v2/listener_v2_log_listener_test.go b/core/services/vrf/v2/listener_v2_log_listener_test.go index a393aec3ee3..2fc0257f81d 100644 --- a/core/services/vrf/v2/listener_v2_log_listener_test.go +++ b/core/services/vrf/v2/listener_v2_log_listener_test.go @@ -1,6 +1,7 @@ package v2 import ( + "bytes" "context" "fmt" "math/big" @@ -155,9 +156,19 @@ func setupVRFLogPollerListenerTH(t *testing.T, Addresses: []common.Address{emitterAddress1}, Retention: 0})) require.Nil(t, err) - require.Len(t, lp.Filter(nil, nil, nil).Addresses, 2) - require.Len(t, lp.Filter(nil, nil, nil).Topics, 1) - require.Len(t, lp.Filter(nil, nil, nil).Topics[0], 3) + reqs := lp.EthGetLogsReqs(nil, nil, nil) + require.Len(t, reqs, 2) + req1, req2 := reqs[0], reqs[1] + if bytes.Compare(vrfLogEmitterAddress[:], emitterAddress1[:]) < 0 { + // reqs will come back sorted by address; swap if necessary, so that rec1 is emitterAddress1 + req1, req2 = req2, req1 + } + require.Len(t, req1.Addresses(), 1) + require.Len(t, req1.Topics(), 1) + require.Len(t, req1.Topics()[0], 1) + require.Len(t, req2.Addresses(), 1) + require.Len(t, req2.Topics(), 1) + require.Len(t, req2.Topics()[0], 2) th := &vrfLogPollerListenerTH{ Lggr: lggr,