-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace single course grained filter with batch request of fine grained topic filters #12361
Conversation
I see that you haven't updated any README files. Would it make sense to do so? |
47557b6
to
dad86f6
Compare
dad86f6
to
8c306fa
Compare
703ff8f
to
86440b9
Compare
86440b9
to
8164c4a
Compare
0be370a
to
9d206d0
Compare
9d206d0
to
286f532
Compare
286f532
to
7b9d2f0
Compare
b9ac871
to
f9da226
Compare
f9da226
to
be15f49
Compare
be15f49
to
2ecbf0a
Compare
2ecbf0a
to
e69d591
Compare
a73dc26
to
89d13a7
Compare
cf2cadf
to
56ab85e
Compare
Quality Gate passedIssues Measures |
@@ -63,6 +63,72 @@ type LogPoller interface { | |||
LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations) ([]Log, error) | |||
} | |||
|
|||
// GetLogsBatchElem hides away all the interface casting, so the fields can be accessed more easily, and with type safety | |||
type GetLogsBatchElem rpc.BatchElem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, but log_poller.go
is getting a very large file, how about extracting that to a separate file?
return e.params()["address"].([]common.Address) | ||
} | ||
|
||
func (e GetLogsBatchElem) SetAddresses(addresses []common.Address) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of setter methods in general, cause it's very easy to make a mistake in concurrent env with mutable objects. What do you think about making it immutable (and thread-safe) by always constructing a new object? Right now, you can accidentally fall into case in which one routine reads from that struct and another one tries to update addresses. Errors like this are extremely difficult to spot
You can achieve that by having an additional constructor that takes existing GetLogsBatchElem
and addresses []common.Address
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow what you're saying about immutability or concurrency. These objects are only ever accessed by a single thread, so we shouldn't have to worry about either of those issues.
I tried to implement this originally with only having the getter methods, but it resulted in a lot of extra awkward casting. The setter is just to clean up that mess, so that it can be set in a simple way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a general thought that immutable structs are threadsafe by definition because you cannot alter their state. On the other hand, in Go you can access fields directly without a setter method so maybe my remark doesn't make any sense at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see... I think it could make sense because in this case, you'd have to go out of your way with some casting to directly access the fields so it wouldn't be easy to mess things up, despite not technically being immutable. On the other hand, I think this is only used in parts of the code which are protected by a mutex.
One of my primary concerns with this PR was to avoid causing performance issues due to much more complex structures having to get torn down and rebuilt. So I tried to err on the side of not rebuilding anything that wasn't essential, in favor of updating existing structures. Probably doesn't matter too much in this case though because it's just a slice--I'll look it over again, thanks for the suggestion.
return blockHash | ||
} | ||
|
||
func (e GetLogsBatchElem) SetFromBlock(fromBlock *big.Int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SetFromBlock
and SetTopics
are probably not used, maybe not worth adding at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think they were in an earlier draft and I ended up not needing them.
func (lp *logPoller) ethGetLogsReqs(fromBlock, toBlock *big.Int, blockHash *common.Hash) []rpc.BatchElem { | ||
lp.filtersMu.Lock() | ||
|
||
if len(lp.removedFilters) != 0 || len(lp.newFilters) != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we rebuild these on insert/delete?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the two main advantages of doing it this way (at least before this PR) were:
- Optimization: if lots of filters are registered or unregistered in rapid succession (eg through CLO adding or removing a batch of jobs, or a single job like LogTriggers registering/unregistering a large # of user-defined filters in a loop), the cache* only needs to be rebuilt once, not once for every filter
- Only one thread (the runloop) ever reads or writes to the cache, avoiding the need for worrying about any concurrency issues there. Only reading and writing of
lp.filters
needs to be protected by a mutex.
Also, not sure if this one matters but:
3. Filter registration can return quickly in most cases, rather than taking a while.
However, because I've also included some improvements in the cache reconstruction process (it no longer rebuilds it from scratch when a filter is removed), the first advantage has been reduced I guess. Removing a single filter can still cause it to have to rebuild the req's for lots of other filters: it rebuilds any reqs including filters which overlap in an way with the one being removed. But most of them will presumably be untouched, and for adding new filters it doesn't need to mess with anything unrelated to the added filters.
For 2, originally I was hoping we could just unlock lp.filtersDirty
soon after entering into ethGetLogsReqs()
, and have it unlocked for most of it while it messes with the cache. That turned out to be too difficult because there are so many interactions between the two, so I just left it locked throughout the whole cache reconstruction process. But I think this is still a big advantage in that it is unlocked during the actual rpc requests. If there were other threads which could change the cache, then they could all end up getting blocked on rpc requests or vice versa.
(*) To be clear, by cache I mean what was called lp.cachedAddresses
and lp.cachedEventSigs
prior to this PR, and now becomes lp.cachedReqsByAddress
and
lp.cachedReqsByEventTopicsKey
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, the idea is to have rpc requests sent in the background, and not have that interfere with any of the calls down to log poller... whether it be registering & unregistering filters or querying for things. Changing this part of the architecture of LogPoller so that the two are interconnected and have to wait for each other seems like a big (potentially risky?) change, so I wouldn't make it unless there is a clear advantage of doing so. (If you think there is, I'm open to considering it.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only one thread (the runloop) ever reads or writes to the cache, avoiding the need for worrying about any concurrency issues there. Only reading and writing of lp.filters needs to be protected by a mutex.
Are you sure about that? It seems that runLoop is the one entry point as you mentioned. However, you can spawn concurrent routine from AsyncReplay -> Replay -> backfill -> batchFetchLogs
On the other hand everything is synchronized behind lp.filtersMu
so there should not be any concurrent access I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good point. I think the two reasons I gave are accurate in that... these were the reasons why the code was that way when I started working on the PR. But I forgot about the AsyncReplay refactor, which happened after I'd finished this PR and moved on to other projects, while it was waiting for review. ;-)
…ed topic filters This replaces Filter() with EthGetLogsReqs(), adding fetchLogsByBlockHash, sendBatchRequests, and a handful of other helper methods to support this It also replaces these fields in logPoller struct: lp.filter lp.filterDirty lp.cachedEventSigs lp.cachedAddresses With new fields, used for implementing caching and indexing of a collection of batch requests, insetad of a single filter: lp.newFilters map[string]struct{} // Set of filter names which have been added since cached reqs indices were last rebuilt lp.removedFilters []Filter // Slice of filters which have been removed or replaced since cached reqs indices were last rebuilt lp.cachedReqsByAddress map[common.Address][]*GetLogsBatchElem // Index of cached GetLogs requests, by contract address lp.cachedReqsByEventsTopicsKey map[string]*GetLogsBatchElem // Index of cached GetLogs requests, by eventTopicsKey
Trace -> Debug for BatchCallContext()
Production rpc servers will fail if these are passed as *big.Int instead of a string, so our SimultaedBackedClient was giving a false positive PASS by handling the conversion. Must be converted in the code before calling BatchCallContext() to work on live networks; now will also be required for simulated geth
Instead of always being length 4, it needs to be the same length (or less) than the number of topics emitted by logs it matches
This can cause false positives, since it's not done by production rpc servers
There's a case where Keepers registers a filter with one name, then unregisters it, then re-registers a filter with the same name. There was an optimization where instead of computing the hash of the events & topics for a filter in two different for loops, it saves it the first time and looks it up in a map the second time. But the keys for the map were the filter names. The first loop deals with removed filters and the second with new filters. They both had the same filter name, but different addresses & topics. This might be a bug in Keepers, but I can at least make this code more robust since it should be possible to update a filter's topics & addresses but keep the name the same.
c7c0155
to
8d8ab99
Compare
I see you added a changeset file but it does not contain a tag. Please edit the text include at least one of the following tags:
|
Quality Gate passedIssues Measures |
This PR is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 7 days. |
This PR changes the way in which LogPoller requests logs from the rpc server. Instead of maintaining one giant cached filter which includes a list of all contract addresses and topics, and passing that to
ec.GetFilter()
each time logs are polled, it maintains an index ofeth_getLogs
requests containing more fine grained filters, sending these in a call toec.BatchCallContext()
all at once, and combining the results. The rpc server is still only contacted once, but the amount of data which comes back will be much more tailored to what we actually need, reducing in some cases significantly reducing the number of results coming back, the amount of network traffic, and the amount of storage space we use in the db (although the last one could have been accomplished in simpler ways). This is especially important for the new Automation LogTriggers feature, where each custom user filter must have separate rate limiting and accounting enforced.As an example, with the "single giant filter" approach, if we had 100 filters each with 1 address and 1 event type, we would be requesting 100 x 100 = 10,000 combinations of addresses + events, while with the new approach we will only request the 200 cases we actually need. Most of those would be unlikely to show up, but when considering intentional DoS attacks on LogTriggers one could imagine a scenario where these 100 x 100 combinations generate an enormous amount of results for each node to save in its db.
Additionally, it allows filtering on all 5 keys (address, event_sig, topic2, topic3, topic4) supported by evm logs, where previously we only filtered on (address, event_sig). So for example, if there is a custom filter registered which only needs to track token transfers between a particular from and to address, the filter can use topic2=from, topic3=to and this will narrow down the results we get back from "all transfers to and from all wallets in the world" to just a single pair of wallets.
The brute force approach to this would be for each req we send to correspond to a single registered filter. This would work well for most purposes, but there are cases where reqs for 2 or more filters can be merged together into a single request. We have to be careful not to over-optimize here, because doing too much merging can re-open the door to an explosion of combinations. So we take a cautious approach of only optimizing by merging when it's clear there will be no additional cost in bandwidth. There are only two cases where we merge reqs for similar filters together: 1. if they share exactly the same event sig and topics list, then we can merge the lists of contract addresses together. and 2. if there are two filters for the same contract address where one of them is narrower than the other (matches a subset or the same set of event_sig + topic combinations), then the broader of the two filters is all we need.
The
lp.Filter()
method has been replaced withlp.ethGetLogsReqs()
. batchFetchLogs, sendBatchRequests, and a handful of other helper methods have also been added for support.It also replaces these fields in logPoller struct:
With new fields, used for implementing caching and indexing of a collection of batch requests, insetad of a single filter:
Some convenience methods for parsing fields in a
rpc.BatchElem
containing an eth_getLogs request have been added in the form of a derived typeGetLogsBatchElem