Skip to content

Commit

Permalink
Finally found the bug!
Browse files Browse the repository at this point in the history
There's a case where Keepers registers a filter with one name, then
unregisters it, then re-registers a filter with the same name. There was
an optimization where instead of computing the hash of the events &
topics for a filter in two different for loops, it saves it the first time
and looks it up in a map the second time. But the keys for the map were
the filter names. The first loop deals with removed filters and the
second with new filters. They both had the same filter name, but
different addresses & topics.  This might be a bug in Keepers, but I
can at least make this code more robust since it should be possible to
update a filter's topics & addresses but keep the name the same.
  • Loading branch information
reductionista committed Mar 25, 2024
1 parent a0c3219 commit ff161eb
Showing 1 changed file with 15 additions and 13 deletions.
28 changes: 15 additions & 13 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,22 +1487,20 @@ func (lp *logPoller) ethGetLogsReqs(fromBlock, toBlock *big.Int, blockHash *comm
lp.filtersMu.Lock()

if len(lp.removedFilters) != 0 || len(lp.newFilters) != 0 {
eventsTopicsKeys := map[string]string{}
deletedAddresses := map[common.Address]struct{}{}
deletedEventsTopicsKeys := map[string]struct{}{}

lp.lggr.Debugf("removedFilters = %v, lp.newFilters = %v", lp.removedFilters, lp.newFilters)

// First, remove any reqs corresponding to removed filters
// Some of them we may still need, they will be rebuilt on the next pass
for _, filter := range lp.removedFilters {
lp.lggr.Debugf("handling removedFilter %v", filter.Name)
eventsTopicsKeys[filter.Name] = makeEventsTopicsKey(filter)
deletedEventsTopicsKeys[eventsTopicsKeys[filter.Name]] = struct{}{}
delete(lp.cachedReqsByEventsTopicsKey, eventsTopicsKeys[filter.Name])
lp.lggr.Debugf("Deleted eventsTopicsKey from cache for filter with address %v event sig %v", filter.Addresses[0], filter.EventSigs[0])
eventsTopicsKey := makeEventsTopicsKey(filter)
deletedEventsTopicsKeys[eventsTopicsKey] = struct{}{}
delete(lp.cachedReqsByEventsTopicsKey, eventsTopicsKey)
for _, address := range filter.Addresses {
deletedAddresses[address] = struct{}{}
delete(lp.cachedReqsByAddress, address)
lp.lggr.Debugf("Deleted address %v from cache for filter with event sig %v", address, filter.EventSigs[0])
}
}
lp.removedFilters = nil
Expand All @@ -1513,11 +1511,7 @@ func (lp *logPoller) ethGetLogsReqs(fromBlock, toBlock *big.Int, blockHash *comm
for _, filter := range lp.filters {
var newReq *GetLogsBatchElem

eventsTopicsKey, ok := eventsTopicsKeys[filter.Name]
if !ok {
eventsTopicsKey = makeEventsTopicsKey(filter)
}

eventsTopicsKey := makeEventsTopicsKey(filter)
_, isNew := lp.newFilters[filter.Name]

if isNew {
Expand All @@ -1532,14 +1526,22 @@ func (lp *logPoller) ethGetLogsReqs(fromBlock, toBlock *big.Int, blockHash *comm
}
}

if hasDeletedTopics {
lp.lggr.Debugf("ethGetLogsReqs: processing filter with deleted topics: %v", filter.Name)
}
if hasDeletedAddress {
lp.lggr.Debugf("ethGetLogsReqs: processing filter with deleted addresses %v", filter.Name)
}

if !(isNew || hasDeletedTopics || hasDeletedAddress) {
continue // only rebuild reqs associated with new filters or those sharing topics or addresses with a removed filter
}

if req, ok2 := lp.cachedReqsByEventsTopicsKey[eventsTopicsKey]; ok2 {
// merge this filter with other filters with the same events and topics lists
lp.lggr.Debugf("Merging addressees %v into rec %v", filter.Addresses, req)
lp.lggr.Debugf("Merging addressees %v into rec %v because eventTopicsKey for both = %v", filter.Addresses, req, eventsTopicsKey)
mergeAddressesIntoGetLogsReq(req, filter.Addresses)
lp.lggr.Debugf("Skipping filter with address %v, eventsig %v", filter.Addresses[0], filter.EventSigs[0])
continue
}

Expand Down

0 comments on commit ff161eb

Please sign in to comment.