-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce a cache for Publishers that tracks subscriptions to manage the cache #2861
base: main
Are you sure you want to change the base?
Conversation
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java
Show resolved
Hide resolved
}); | ||
} | ||
|
||
private static final class Holder<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is here so that we can add it to the hashmap and avoid some jumping through hoops to manage the reference equality stuff for HashMap
. A quick code comment might be helpful for future readers.
}); | ||
|
||
item2.publisher = multicastStrategy.apply(newPublisher) | ||
.liftSync(subscriber -> new Subscriber<T>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this second liftSync
? I think it applies to the individual streams. In my minds eye that means if a single stream ends it removes the underlying stream from the cache. I don't know how that would happen unless the parent publisher completed and we remove it at that level as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's there to handle subscriber errors and clean up if the subscriber is put into a bad state via a throw. In fact it looks like I missed a case with onNext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a test case that would demonstrate their purpose? If I delete the second liftsync and move the syncrhronized to the first everything still works as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mgodave - can you add a comment along the lines of:
Motivation for this being "after" the multicast is bcz multicast doesn't propagate cancellation upstream unless there are no subscribers (e.g. they all cancel) ... so we acquire the lock in cancel here, there are no async boundaries in multi-cast, and then we remove from the map in cancel "above" multicast. This prevents race conditions where someone does a get
and we return a Publisher
that has been cancelled (because there are no subscriber).
Also consider breaking this out into a named (e.g. not anonymous, private/final) class which is easier to look at when debugging larger operator chains.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also consider breaking this out into a named (e.g. not anonymous, private/final) class which is easier to look at when debugging larger operator chains.
These are inline because they access four different pieces of local state and properties. I played around with breaking this out and it doesn't feel cohesive. I'm choosing to leave it as is for now but I am happy to revisit if you have strong feelings.
}); | ||
|
||
item2.publisher = multicastStrategy.apply(newPublisher) | ||
.liftSync(subscriber -> new Subscriber<T>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mgodave - can you add a comment along the lines of:
Motivation for this being "after" the multicast is bcz multicast doesn't propagate cancellation upstream unless there are no subscribers (e.g. they all cancel) ... so we acquire the lock in cancel here, there are no async boundaries in multi-cast, and then we remove from the map in cancel "above" multicast. This prevents race conditions where someone does a get
and we return a Publisher
that has been cancelled (because there are no subscriber).
Also consider breaking this out into a named (e.g. not anonymous, private/final) class which is easier to look at when debugging larger operator chains.
@Override | ||
public void cancel() { | ||
try { | ||
assert Thread.holdsLock(publisherCache); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider adding a comment here too (lock was acquired after the multi-cast, we need to be holding the lock here to interact with the map and prevent returning a cancelled Publisher)
*/ | ||
public Publisher<T> get(final K key, final Function<K, Publisher<T>> publisherSupplier) { | ||
return Publisher.defer(() -> { | ||
synchronized (publisherCache) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synchronized
has been used in the past bcz it doesn't require additional allocations. however loom fibers don't support synchronized
, should we use Lock
objects instead?
} | ||
|
||
private void lockRemoveFromMap() { | ||
synchronized (publisherCache) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment here to the effect:
- completion of the first Subscriber after multicast means the the multicast operator is in a terminal state and we therefore remove it from the map. There are cases where folks may want to re-subscribe to the Publisher (e.g. get the cached value, trigger another event) however that currently isn't supported and we favor bounding the size of the map which has scope outside the operator chain.
* @param <T> the type of the {@link Publisher} contained in the cache. | ||
* @return a new PublisherCache that will wrap cached values with multicast operator. | ||
*/ | ||
public static <K, T> PublisherCache<K, T> multicast() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by exposing these static method we may need to add a new factory method here for each operator overload. did you consider instead exposing just a Function<>
(or similar) so folks can apply the variant they want? some risks maybe:
- folks could apply operators that don't obey the assumptions (we could clarify the constraints the operator must abide by ... allows for multiple subscribers, cancels upstream only after no subscribers present, no async cancel processing as the synchronization here depends upon it, ..)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did consider this. I'm open to removing these, the create()
method just above is essentially what you describe, the user would specify the Publisher
configured however they desire in the function on get
.
I had initially built this without the static methods and a constructor that took a function which would be used to supply a new Publisher on a cache miss. I decided to move this function to the get
method as it emulated how I might expect to use a cache, ex: I might not want a function from name -> Publisher but rather I would prefer a closure that would allow me to use the context at hand to instantiate the new object.
For an initial API I don't have any problem deferring to your suggestion as we learn how this ultimately ends up being used.
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherCache.java
Show resolved
Hide resolved
Will mark it as draft for now, mark as ready when comments are addressed or close if no plans to continue on this one. |
Motivation:
Handling the caching of Publishers comes up often and correctly managing the cache can be tricky and error prone to implement correctly. Scenarios where caching of Publishers can be useful include those similar to the multicast and replay operators but have the added dimension of asynchronous access, for instance multiple requests which need to consume the same data.
Modifications:
Add a PublisherCache utility that manages the lifecycle of a cached Publisher. A publisher is removed from the cache when it no longer has any subscriptions.