Replies: 24 comments
-
cc @benjamingr |
Beta Was this translation helpful? Give feedback.
-
One of my first questions would be: what guarantees does |
Beta Was this translation helpful? Give feedback.
-
I am generally in favor of this and think it's a good way towards a unified API and this makes converting promise APIs to Rx (where appropriate) much easier which is great. |
Beta Was this translation helpful? Give feedback.
-
Also, let me know if there is anything we (Node) can do to help with this transition.
AbortSignal is an EventTarget, event listeners are guaranteed to be called in order and without duplicates + throwing an error has uncaught exception semantics (mostly) so it's important that internal listeners inside Rx don't throw. |
Beta Was this translation helpful? Give feedback.
-
This is quite interesting and there could be lot, major changes and discussion would be involved. Just few initial random thoughts unordered.
|
Beta Was this translation helpful? Give feedback.
-
Right now it's supported in every major browser, and node support is in experimental stages. There are already multiple AbortSignal polyfills.
Idiomatic patterns with controller/signal or other token-based paradigms are that you create a controller, then pass its signal to whatever API you might want to cancel. (See use in |
Beta Was this translation helpful? Give feedback.
-
yes, this is what I meant about |
Beta Was this translation helpful? Give feedback.
-
Note that it might be obvious but there is a polyfill for old Node versions. |
Beta Was this translation helpful? Give feedback.
-
Yes, it's my personal take is would like to avoid deprecating existing implementation over new implementation requires polyfill. For stable runtime support I'd choose to support existing implementation for suffecient time - for one side, we took long time to support IE11 and making breaking changes over widely using runtime seems bit opposite move. |
Beta Was this translation helpful? Give feedback.
-
I'd like to suggest an alternative for using the AbortController: I'm not too happy with how this API looks like for Observable. I think it makes total sense for const controller = new AbortController();
fetch('resource', { signal: controller.signal });
// later on...
controller.abort(); But in the API proposed above, I think it's inconvenient for both producers and consumers when we compare it to the current API:
Wouldn't it be better if we made an API that keeps the pros we have with the existing API? What I'd like to suggest is: const onePerSecond = new Observable((subscriber) => {
let n = 0;
const id = setInterval(() => subscriber.next(n++), 1000);
return () => {
clearInterval(id);
}
});
const abortController = onePerSecond.subscribe(...);
// later on
abortController.abort(); To let consumers reuse AbortControllers, maybe it could be done with an operator? const abortController = onePerSecond.subscribe(...);
range1To100.pipe(
takeUntilSignal(abortController.signal)
).subscribe(...);
// later on
abortController.abort(); // Will cancel both subscriptions. If there's more than one Or as an alternative, I think that keeping the signal in That's just my suggestion. I think that although this is better from a "rxjs library" consumer point of view, we still need to weigh in some possible cons. I'm sure I'm missing lots of internal details on both rxjs and AbortController, so it's quite hard for me to identify them. |
Beta Was this translation helpful? Give feedback.
-
FWIW the reason promises do this is that the language folks at TC39 wanted an invariant for promises where once you have a promise (a subscription in this case) you don't also have the capability of "action at a distance" where you can "abort" the subscription. In C#, Rx observables (IIRC it's been like 5 years) take a cancel token: // copied from SO after googling
Observable.Interval(TimeSpan.FromSeconds(0.5), instance)
.Subscribe(_ => Console.WriteLine(DateTime.UtcNow), cts.Token); So it's not like there is no prior art :] This is actually pretty convenient since in most cases the token is just passed along - the inconvenient part (separating observable subscriptions from the ability to cancel them) causes an extra argument to be passed around - I think that's quite intentional. Personally I would have preferred promise cancellation (with a promise type Task that enables that) + observable cancellation but I think that in this ecosystem Rx using AbortController/AbortSignal makes a lot of sense. Just to address some of the concerns:
Rx can provide sugar for this (allow the user to return a function and make that a listener to abort on the provided signal. But I think it's still pretty short to write and the producer side isn't the hard one for this (usually).
In most cases I suspect they will just forward abort signals they got from elsewhere rather than create their own - but yes. const abortController = onePerSecond.subscribe(...);
// later on
abortController.abort(); That's explicitly making observables stronger (but composition harder) since it loses the possibility of knowing the source won't be cancelled (which may be fine). |
Beta Was this translation helpful? Give feedback.
-
Yep - I also think it's nice that we're moving towards a unified way of semantically representing cancelling "tasks". However, with my suggestion I'm presenting a way that I feel is more developer-friendly, which is what I think abstractions like Observable should aim for. I agree that the way that Observable interops with the rest of the ecosystem should be by using standard conventions, but that shouldn't stop it from making it easier for developers to use it. In terms of composability of abort signals, I added a possible example of how can that be addressed by using an API that feels natural to Observables. I’m sure that we can find an API that internally would use AbortController, without having to sacrifice the ergonomics of Observable. |
Beta Was this translation helpful? Give feedback.
-
I don't think I explained myself well:
I think a lot of code looks like this: (people already using async functions and want to mix cancellation between Rx): async function doSomething(param, token) {
const val = await somePromiseOpMaybeFetch(param, { token });
await getSomeStreamBasedOn(val).forEach((value) => {
handle(value);
}, token); // adding the token here is super easy and intuitive, if I get a controller I'm mostly stuck
} So I guess what I'm saying is that the API Ben is proposing is making it easier for users and harder for library authors |
Beta Was this translation helpful? Give feedback.
-
Sorry if I didn't explain myself. I'm also talking from a user point of view, not from I library author point of view. In my original suggestion, I said:
With that I meant that I already liked the proposal of passing in a signal to subscribe and forEach. So my suggestion already supports your example (because it's one of the points I agree that it doesn't look bad from a user point of view). But at the same time I also propose an additional way of doing it: async function doSomething(param, token) {
const val = await somePromiseOpMaybeFetch(param, { token });
await getSomeStreamBasedOn(val).pipe(
takeUntilSignal(token)
).forEach((value) => {
handle(value);
});
} Which is something that also feels natural within the streams mindset. But again, I think that both ways are interesting, and they are compatible with each other. I'd like to point out that the advantage on the operator route though, is that you can theoretically compose more signals and it will allow you to represent more complex interactions between streams. |
Beta Was this translation helpful? Give feedback.
-
I assumed (I might be wrong) that passing the signal to In the above API - how does cancellation propagate across operators? (Assuming operators need the capability to be aware of this) |
Beta Was this translation helpful? Give feedback.
-
Thanks for pointing this out, now I realise I was missing important examples. With my suggestion, we would keep the existing user-oriented API. As an example, let's implement const map =
<T, R>(mapFn: (value: T) => R) =>
(source: Observable<T>) =>
new Observable<R>(obs => source.subscribe({
next: value => obs.next(mapFn(value)),
error: obs.error,
complete: obs.complete
})) Notice how this example would work as-it-is with the current API as well. Which it also implies that migration of old code wouldn't be as dramatic. How does propagation of signals work here with my suggestion? The Let's break it down:
It's consistent with how This way, in general the only pieces that need to deal with AbortController are internal bits of Let's compare implementing an operator a bit more complex dealing with multiple subscriptions, const switchMap =
<T, R>(mapFn: (value: T) => Observable<R>) =>
(source: Observable<T>) =>
new Observable<R>((obs, signal) => {
let innerController = null;
source.subscribe({
next: value => {
innerController?.abort();
innerController = new AbortController();
mapFn(value).subscribe({
next: obs.next,
error: obs.error,
}, innerController.signal);
},
error: obs.error,
complete: obs.complete
}), signal);
const cleanup = () => {
signal.removeEventListener('abort', cleanup);
obs.complete();
innerController?.abort();
}
signal.addEventListener('abort', cleanup);
/* Q: if `source` completes or errors, does `signal` emit abort?
If it doesn't, then the listener to `signal` also needs to be cleaned
up in those cases */
} With my suggestion, where cleanup is handled by const switchMap =
<T, R>(mapFn: (value: T) => Observable<R>) =>
(source: Observable<T>) =>
new Observable<R>(obs => {
let innerController = null;
const outerController = source.subscribe({
next: value => {
innerController?.abort();
innerController = mapFn(value).subscribe({
next: obs.next,
error: obs.error,
}, outerController.signal)
},
error: obs.error,
complete: obs.complete
});
return outerController;
) And I'm aware that these are not a perfect example, because it's something that doesn't directly apply to users (as it's an internal operator of rxjs), but in my projects I've had to build my custom operators, some of them simple and others more complex, and I think it gives a picture how it simplifies most of the work for producers, as they don't have to work with event listeners. |
Beta Was this translation helpful? Give feedback.
-
I am aware that not returning an AbortController makes creating new operators slightly more ergonomic but the fundamental tidbit is whether or not you want to couple RxJS observables with the capability to cancel/abort them - or you want the two to be two separate capabilities. I think there is a third way (in case RxJS doesn't want to separate the two capabilities like promises did) which is:
(another random note is RxJS probably doesn't want to create an AbortController for each observable subscription) |
Beta Was this translation helpful? Give feedback.
-
@benlesh Question!
How would it be possible to trigger the abort signal? The loop will 100% saturate the main event loop right? |
Beta Was this translation helpful? Give feedback.
-
@SanderElias The subscriber can signal the abort in the |
Beta Was this translation helpful? Give feedback.
-
I've been reading this and the related issues, and just wanted to make sure that somebody considers interactions between the proposed designs and function observableToPromise<T>(obs: Observable<T>, signal?: AbortSignal): Promise<T> {
if (!signal) { return obs.toPromise(); }
// Reject immediately if the signal has already fired. Use `EmptyError`
// because that's what `first` will fail with per the note below
if (signal.aborted) { return Promise.reject(new EmptyError()); }
const stop = fromEvent(signal, "abort").pipe(take(1));
// Note that `takeUntil` will cause the observable to complete when the
// Signal fires, but `first` will fail with EmptyError if there wasn't a
// value, which will reject out of the returned Promise.
return obs.pipe(takeUntil(stop), first()).toPromise();
} I think it does what it says on the tin, and as the comments point out, the returned promise rejects when aborted. I believe the current |
Beta Was this translation helpful? Give feedback.
-
I've been reading over this and #6347. It sounds like maybe the discussion is moving away from replacing Is that right? It wouldn't have the benefit of shrinking the runtime (per the OP here), but it seems to make the most sense to me. |
Beta Was this translation helpful? Give feedback.
-
This might be a bit of an old thread by now, but the way I think about it, I'm less concerned about changing RxJS to use the AbortController spec, and more concerned with making RxJS interface well with the AbortController. There are two categories that I'm concerned about, really: I'm trying to take an existing function that takes an AbortSignal as an argument and coerce it into an Observable.My concern here is making sure that the Observable triggers the abort on finalize so that the existing function performs any necessary cleanup of its own. For that, I made a function that I tend to copy-paste into new projects: function deferAbort<T>(factoryWithAbort:(signal:AbortSignal) => ObservableInput<T>):Observable<T>{
return new Observable<T>(subscriber => {
const abortController = new AbortController();
const subscription = from(factoryWithAbort(abortController.signal)).subscribe(subscriber);
return () => {
subscription.unsubscribe();
abortController.abort();
};
});
} For instance, a paginated query: async function getPageFromSomeAPI(id:string, from:number, to:number, signal:AbortSignal):Promise<APIRecord[]>{
// return a promise of pages
}
const PAGE_SIZE = 1000;
async function *streamPagesFromSomeAPI(id:string, signal:AbortSignal):AsyncIterable<APIRecord[]>{
for(let i = 0; signal.aborted; i += PAGE_SIZE){
yield getPageFromSomeAPI(id, i, i + PAGE_SIZE, signal);
}
}
function pagesFromSomeAPI$(id:string):Observable<APIRecord[]>{
return deferAbort((signal:AbortSignal) => streamPagesFromSomeAPI(id, signal)).pipe(
takeWhile(page => page.length >= PAGE_SIZE, true)
);
} I'm trying to coerce an Observable into a function that takes an AbortSignal and returns a promise or async iterableIn this case, I end up using IxJS for the conversion. import {from as ixFrom, withAbort} from 'ix/asynciterables`
const source$ = /* Some observable */
export function myAppLogic(signal:AbortSignal):AsyncIterable<Message>{
return ixFrom(source$).pipe(withAbort(signal));
} I know that there is discussion of letting Observables create an AsyncIterator, so we may want to add |
Beta Was this translation helpful? Give feedback.
-
AbortSignal was a mistake, do not move to it, an observable is a resource and not an action, I was wrong. Instead just implement Symbol.dispose on subscriptions, that's a thing now. |
Beta Was this translation helpful? Give feedback.
-
@benjamingr I had already come to that conclusion myself... but it was in large part because AbortSignal/Controller was simply slow and unergonomic for RxJS's use case. |
Beta Was this translation helpful? Give feedback.
-
Now that it looks like Node will be getting AbortController/AbortSignal I think it's time we seriously start planning moving RxJS toward using
AbortSignal
.I have done some research and work on a naive implementation here: https://github.com/benlesh/trex
CONS:
AbortSignal
usesEventTarget
(addEventListener
andremoveEventListener
), which is really gross.PROS:
Proposed (Future) API:
Here is where I'd like to go with this:
Observable
subscribe
andforEach
both accept an optionalAbortSignal
argument.Observable
's initialization function will have a second argument to it that is theAbortSignal
.Subscription
.Observable initialization function changes:
The idea here would be to pass the
signal
into the initialization function to allow users to check it directly for whether or notit had been aborted, and also to allow them to register teardown for the producer.
In the asynchronous case:
In the synchronous case:
Observable subscribe and forEach functions changes:
Here we can finally move to a subscribe signature that only accepts an observer, and a
forEach
that accepts the function. Both will be cancellable by virtue of the fact that anAbortSignal
can be passed. The only downside I see to this is most people still like to just pass a function, and in the case of an error withforEach(fn)
there will be an unhandled rejection if the user does not handle possible errors coming off of it. (honestly, this might be an upside, now that I think about it, forcing people to actually handle errors).forEach
might become the more popular method of subscribing toObservable
, finally.Old API:
New API:
Note that aborting a
forEach
will result in the returned promise being rejected with anAbortError
. That is important so we don't leave promises hanging.Other thoughts
Most, if not all of these changes could be non-breaking new features. Which I see as great. Both
Subscription
andAbortSignal
could live alongside of each other, and could be converted back and forth. It's not until we decided to move away fromSubscription
that these would become breaking changes.What I want in this thread:
Your thoughts, questions, and concerns. I think it would be good to see what we can plan out as far as moving this direction.
Beta Was this translation helpful? Give feedback.
All reactions