-
Notifications
You must be signed in to change notification settings - Fork 270
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sdk): Dropping an UpdatesSubscriber
release its reader token for the garbage collector
#4102
feat(sdk): Dropping an UpdatesSubscriber
release its reader token for the garbage collector
#4102
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #4102 +/- ##
==========================================
- Coverage 84.70% 84.59% -0.11%
==========================================
Files 269 269
Lines 28766 28759 -7
==========================================
- Hits 24366 24329 -37
- Misses 4400 4430 +30 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks. Need a few clarifications in the test IMO, but good to merge after comments have been addressed 👍
linked_chunk.push_items_back(['b']); | ||
linked_chunk.push_items_back(['c']); | ||
|
||
// The waker must have been called only once for the two updates. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand why, but can you elaborate in the comment? (It's because we only add a waker whenever we explicitly start polling, which we did once before for each stream.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A waker is consumed after it's been called. A waker is registered when a Future
or a Stream
is polled. I'm updating the comment.
// For the sake of this test, we also need to advance the main reader token. | ||
let _ = linked_chunk.updates().unwrap().take(); | ||
let _ = linked_chunk.updates().unwrap().take(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uh? Why do we need to do it twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's something I want to change. The garbage collector is run before take_with_token
is called (which is called by take
). I think it's better to run the garbage collector after take_with_token
is called: “I read new items, I clean read items” vs. “I clean previously read items, I read new ones”. Right now, we are in the second pattern, but I believe the first is preferable. I'll address that in the next pull request.
…r the GC. The event cache stores its events in a linked chunk. The linked chunk supports updates (`ObservableUpdates`) via `LinkedChunk::updates()`. This `ObservableUpdates` receives all updates that are happening inside the `LinkedChunk`. An `ObservableUpdates` wraps `UpdatesInner`, which is the real logic to handle multiple update readers. Each reader has a unique `ReaderToken`. `UpdatesInner` has a garbage collector that drops all updates that are read by all readers. And here comes the problem. A category of readers are `UpdatesSubscriber`, returned by `ObservableUpdates::subscribe()`. When an `UpdatesSubscriber` is dropped, its reader token was still alive, thus preventing the garbage collector to clear all its pending updates: they were kept in memory for the eternity. This patch implements `Drop` for `UpdatesSubscriber` to correctly remove its `ReaderToken` from `UpdatesInner`. This patch also adds a test that runs multiple subscribers, and when one is dropped, its pending updates are collected by the garbage collector.
d2d5240
to
02e0e9c
Compare
The event cache stores its events in a linked chunk. The linked chunk
supports updates (
ObservableUpdates
) viaLinkedChunk::updates()
.This
ObservableUpdates
receives all updates that are happening insidethe
LinkedChunk
. AnObservableUpdates
wrapsUpdatesInner
, whichis the real logic to handle multiple update readers. Each reader has a
unique
ReaderToken
.UpdatesInner
has a garbage collector that dropsall updates that are read by all readers. And here comes the problem.
A category of readers are
UpdatesSubscriber
, returned byObservableUpdates::subscribe()
. When anUpdatesSubscriber
isdropped, its reader token was still alive, thus preventing the garbage
collector to clear all its pending updates: they were kept in memory
for the eternity.
This patch implements
Drop
forUpdatesSubscriber
to correctly removeits
ReaderToken
fromUpdatesInner
. This patch also adds a test thatruns multiple subscribers, and when one is dropped, its pending updates
are collected by the garbage collector.
EventCache
storage #3280