diff --git a/src/execution/AbortSignalListener.ts b/src/execution/AbortSignalListener.ts new file mode 100644 index 0000000000..3d707ce615 --- /dev/null +++ b/src/execution/AbortSignalListener.ts @@ -0,0 +1,91 @@ +import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; + +/** + * A AbortSignalListener object can be used to trigger multiple responses + * in response to a single AbortSignal. + * + * @internal + */ +export class AbortSignalListener { + abortSignal: AbortSignal; + abort: () => void; + + private _onAborts: Set<() => void>; + + constructor(abortSignal: AbortSignal) { + this.abortSignal = abortSignal; + this._onAborts = new Set<() => void>(); + this.abort = () => { + for (const abort of this._onAborts) { + abort(); + } + }; + + abortSignal.addEventListener('abort', this.abort); + } + + add(onAbort: () => void): void { + this._onAborts.add(onAbort); + } + + delete(onAbort: () => void): void { + this._onAborts.delete(onAbort); + } + + disconnect(): void { + this.abortSignal.removeEventListener('abort', this.abort); + } +} + +export function cancellablePromise( + originalPromise: Promise, + abortSignalListener: AbortSignalListener, +): Promise { + const abortSignal = abortSignalListener.abortSignal; + if (abortSignal.aborted) { + // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors + return Promise.reject(abortSignal.reason); + } + + const { promise, resolve, reject } = promiseWithResolvers(); + const onAbort = () => reject(abortSignal.reason); + abortSignalListener.add(onAbort); + originalPromise.then( + (resolved) => { + abortSignalListener.delete(onAbort); + resolve(resolved); + }, + (error: unknown) => { + abortSignalListener.delete(onAbort); + reject(error); + }, + ); + + return promise; +} + +export function cancellableIterable( + iterable: AsyncIterable, + abortSignalListener: AbortSignalListener, +): AsyncIterable { + const iterator = iterable[Symbol.asyncIterator](); + + const _next = iterator.next.bind(iterator); + + if (iterator.return) { + const _return = iterator.return.bind(iterator); + + return { + [Symbol.asyncIterator]: () => ({ + next: () => cancellablePromise(_next(), abortSignalListener), + return: () => cancellablePromise(_return(), abortSignalListener), + }), + }; + } + + return { + [Symbol.asyncIterator]: () => ({ + next: () => cancellablePromise(_next(), abortSignalListener), + }), + }; +} diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index c5f9c4c2ca..b826a9b9c5 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -4,8 +4,8 @@ import { pathToArray } from '../jsutils/Path.js'; import type { GraphQLError } from '../error/GraphQLError.js'; +import type { AbortSignalListener } from './AbortSignalListener.js'; import { IncrementalGraph } from './IncrementalGraph.js'; -import type { PromiseCanceller } from './PromiseCanceller.js'; import type { CancellableStreamRecord, CompletedExecutionGroup, @@ -44,7 +44,7 @@ export function buildIncrementalResponse( } interface IncrementalPublisherContext { - promiseCanceller: PromiseCanceller | undefined; + abortSignalListener: AbortSignalListener | undefined; cancellableStreams: Set | undefined; } @@ -127,7 +127,7 @@ class IncrementalPublisher { IteratorResult > => { if (isDone) { - this._context.promiseCanceller?.disconnect(); + this._context.abortSignalListener?.disconnect(); await this._returnAsyncIteratorsIgnoringErrors(); return { value: undefined, done: true }; } @@ -176,7 +176,7 @@ class IncrementalPublisher { // TODO: add test for this case /* c8 ignore next */ - this._context.promiseCanceller?.disconnect(); + this._context.abortSignalListener?.disconnect(); await this._returnAsyncIteratorsIgnoringErrors(); return { value: undefined, done: true }; }; diff --git a/src/execution/PromiseCanceller.ts b/src/execution/PromiseCanceller.ts deleted file mode 100644 index 16b438e17f..0000000000 --- a/src/execution/PromiseCanceller.ts +++ /dev/null @@ -1,76 +0,0 @@ -import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; - -/** - * A PromiseCanceller object can be used to trigger multiple responses - * in response to a single AbortSignal. - * - * @internal - */ -export class PromiseCanceller { - abortSignal: AbortSignal; - abort: () => void; - - private _aborts: Set<() => void>; - - constructor(abortSignal: AbortSignal) { - this.abortSignal = abortSignal; - this._aborts = new Set<() => void>(); - this.abort = () => { - for (const abort of this._aborts) { - abort(); - } - }; - - abortSignal.addEventListener('abort', this.abort); - } - - disconnect(): void { - this.abortSignal.removeEventListener('abort', this.abort); - } - - cancellablePromise(originalPromise: Promise): Promise { - if (this.abortSignal.aborted) { - // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors - return Promise.reject(this.abortSignal.reason); - } - - const { promise, resolve, reject } = promiseWithResolvers(); - const abort = () => reject(this.abortSignal.reason); - this._aborts.add(abort); - originalPromise.then( - (resolved) => { - this._aborts.delete(abort); - resolve(resolved); - }, - (error: unknown) => { - this._aborts.delete(abort); - reject(error); - }, - ); - - return promise; - } - - cancellableIterable(iterable: AsyncIterable): AsyncIterable { - const iterator = iterable[Symbol.asyncIterator](); - - const _next = iterator.next.bind(iterator); - - if (iterator.return) { - const _return = iterator.return.bind(iterator); - - return { - [Symbol.asyncIterator]: () => ({ - next: () => this.cancellablePromise(_next()), - return: () => this.cancellablePromise(_return()), - }), - }; - } - - return { - [Symbol.asyncIterator]: () => ({ - next: () => this.cancellablePromise(_next()), - }), - }; - } -} diff --git a/src/execution/__tests__/AbortSignalListener-test.ts b/src/execution/__tests__/AbortSignalListener-test.ts new file mode 100644 index 0000000000..2b2a0e6273 --- /dev/null +++ b/src/execution/__tests__/AbortSignalListener-test.ts @@ -0,0 +1,170 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import { expectPromise } from '../../__testUtils__/expectPromise.js'; + +import { + AbortSignalListener, + cancellableIterable, + cancellablePromise, +} from '../AbortSignalListener.js'; + +describe('AbortSignalListener', () => { + it('works to add a listener', () => { + const abortController = new AbortController(); + + const abortSignalListener = new AbortSignalListener(abortController.signal); + + let called = false; + const onAbort = () => { + called = true; + }; + abortSignalListener.add(onAbort); + + abortController.abort(); + + expect(called).to.equal(true); + }); + + it('works to delete a listener', () => { + const abortController = new AbortController(); + + const abortSignalListener = new AbortSignalListener(abortController.signal); + + let called = false; + /* c8 ignore next 3 */ + const onAbort = () => { + called = true; + }; + abortSignalListener.add(onAbort); + abortSignalListener.delete(onAbort); + + abortController.abort(); + + expect(called).to.equal(false); + }); + + it('works to disconnect a listener from the abortSignal', () => { + const abortController = new AbortController(); + + const abortSignalListener = new AbortSignalListener(abortController.signal); + + let called = false; + /* c8 ignore next 3 */ + const onAbort = () => { + called = true; + }; + abortSignalListener.add(onAbort); + + abortSignalListener.disconnect(); + + abortController.abort(); + + expect(called).to.equal(false); + }); +}); + +describe('cancellablePromise', () => { + it('works to cancel an already resolved promise', async () => { + const abortController = new AbortController(); + + const abortSignalListener = new AbortSignalListener(abortController.signal); + + const promise = Promise.resolve(1); + + const withCancellation = cancellablePromise(promise, abortSignalListener); + + abortController.abort(new Error('Cancelled!')); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); + + it('works to cancel an already resolved promise after abort signal triggered', async () => { + const abortController = new AbortController(); + const abortSignalListener = new AbortSignalListener(abortController.signal); + + abortController.abort(new Error('Cancelled!')); + + const promise = Promise.resolve(1); + + const withCancellation = cancellablePromise(promise, abortSignalListener); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); + + it('works to cancel a hanging promise', async () => { + const abortController = new AbortController(); + const abortSignalListener = new AbortSignalListener(abortController.signal); + + const promise = new Promise(() => { + /* never resolves */ + }); + + const withCancellation = cancellablePromise(promise, abortSignalListener); + + abortController.abort(new Error('Cancelled!')); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); + + it('works to cancel a hanging promise created after abort signal triggered', async () => { + const abortController = new AbortController(); + const abortSignalListener = new AbortSignalListener(abortController.signal); + + abortController.abort(new Error('Cancelled!')); + + const promise = new Promise(() => { + /* never resolves */ + }); + + const withCancellation = cancellablePromise(promise, abortSignalListener); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); +}); + +describe('cancellableAsyncIterable', () => { + it('works to abort a next call', async () => { + const abortController = new AbortController(); + const abortSignalListener = new AbortSignalListener(abortController.signal); + + const asyncIterable = { + [Symbol.asyncIterator]: () => ({ + next: () => Promise.resolve({ value: 1, done: false }), + }), + }; + + const withCancellation = cancellableIterable( + asyncIterable, + abortSignalListener, + ); + + const nextPromise = withCancellation[Symbol.asyncIterator]().next(); + + abortController.abort(new Error('Cancelled!')); + + await expectPromise(nextPromise).toRejectWith('Cancelled!'); + }); + + it('works to abort a next call when already aborted', async () => { + const abortController = new AbortController(); + const abortSignalListener = new AbortSignalListener(abortController.signal); + + abortController.abort(new Error('Cancelled!')); + + const asyncIterable = { + [Symbol.asyncIterator]: () => ({ + next: () => Promise.resolve({ value: 1, done: false }), + }), + }; + + const withCancellation = cancellableIterable( + asyncIterable, + abortSignalListener, + ); + + const nextPromise = withCancellation[Symbol.asyncIterator]().next(); + + await expectPromise(nextPromise).toRejectWith('Cancelled!'); + }); +}); diff --git a/src/execution/__tests__/PromiseCanceller-test.ts b/src/execution/__tests__/PromiseCanceller-test.ts deleted file mode 100644 index 5800c4ceac..0000000000 --- a/src/execution/__tests__/PromiseCanceller-test.ts +++ /dev/null @@ -1,121 +0,0 @@ -import { describe, it } from 'mocha'; - -import { expectPromise } from '../../__testUtils__/expectPromise.js'; - -import { PromiseCanceller } from '../PromiseCanceller.js'; - -describe('PromiseCanceller', () => { - describe('cancellablePromise', () => { - it('works to cancel an already resolved promise', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; - - const promiseCanceller = new PromiseCanceller(abortSignal); - - const promise = Promise.resolve(1); - - const withCancellation = promiseCanceller.cancellablePromise(promise); - - abortController.abort(new Error('Cancelled!')); - - await expectPromise(withCancellation).toRejectWith('Cancelled!'); - }); - - it('works to cancel an already resolved promise after abort signal triggered', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; - - abortController.abort(new Error('Cancelled!')); - - const promiseCanceller = new PromiseCanceller(abortSignal); - - const promise = Promise.resolve(1); - - const withCancellation = promiseCanceller.cancellablePromise(promise); - - await expectPromise(withCancellation).toRejectWith('Cancelled!'); - }); - - it('works to cancel a hanging promise', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; - - const promiseCanceller = new PromiseCanceller(abortSignal); - - const promise = new Promise(() => { - /* never resolves */ - }); - - const withCancellation = promiseCanceller.cancellablePromise(promise); - - abortController.abort(new Error('Cancelled!')); - - await expectPromise(withCancellation).toRejectWith('Cancelled!'); - }); - - it('works to cancel a hanging promise created after abort signal triggered', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; - - abortController.abort(new Error('Cancelled!')); - - const promiseCanceller = new PromiseCanceller(abortSignal); - - const promise = new Promise(() => { - /* never resolves */ - }); - - const withCancellation = promiseCanceller.cancellablePromise(promise); - - await expectPromise(withCancellation).toRejectWith('Cancelled!'); - }); - }); - - describe('cancellableAsyncIterable', () => { - it('works to abort a next call', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; - - const promiseCanceller = new PromiseCanceller(abortSignal); - - const asyncIterable = { - [Symbol.asyncIterator]: () => ({ - next: () => Promise.resolve({ value: 1, done: false }), - }), - }; - - const cancellableAsyncIterable = - promiseCanceller.cancellableIterable(asyncIterable); - - const nextPromise = - cancellableAsyncIterable[Symbol.asyncIterator]().next(); - - abortController.abort(new Error('Cancelled!')); - - await expectPromise(nextPromise).toRejectWith('Cancelled!'); - }); - - it('works to abort a next call when already aborted', async () => { - const abortController = new AbortController(); - const abortSignal = abortController.signal; - - abortController.abort(new Error('Cancelled!')); - - const promiseCanceller = new PromiseCanceller(abortSignal); - - const asyncIterable = { - [Symbol.asyncIterator]: () => ({ - next: () => Promise.resolve({ value: 1, done: false }), - }), - }; - - const cancellableAsyncIterable = - promiseCanceller.cancellableIterable(asyncIterable); - - const nextPromise = - cancellableAsyncIterable[Symbol.asyncIterator]().next(); - - await expectPromise(nextPromise).toRejectWith('Cancelled!'); - }); - }); -}); diff --git a/src/execution/__tests__/abort-signal-test.ts b/src/execution/__tests__/cancellation-test.ts similarity index 100% rename from src/execution/__tests__/abort-signal-test.ts rename to src/execution/__tests__/cancellation-test.ts diff --git a/src/execution/execute.ts b/src/execution/execute.ts index d847835d98..00f0e7b1ae 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -48,6 +48,11 @@ import { GraphQLStreamDirective } from '../type/directives.js'; import type { GraphQLSchema } from '../type/schema.js'; import { assertValidSchema } from '../type/validate.js'; +import { + AbortSignalListener, + cancellableIterable, + cancellablePromise, +} from './AbortSignalListener.js'; import type { DeferUsageSet, ExecutionPlan } from './buildExecutionPlan.js'; import { buildExecutionPlan } from './buildExecutionPlan.js'; import type { @@ -63,7 +68,6 @@ import { import { getVariableSignature } from './getVariableSignature.js'; import { buildIncrementalResponse } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; -import { PromiseCanceller } from './PromiseCanceller.js'; import type { CancellableStreamRecord, CompletedExecutionGroup, @@ -164,7 +168,7 @@ export interface ValidatedExecutionArgs { export interface ExecutionContext { validatedExecutionArgs: ValidatedExecutionArgs; errors: Array | undefined; - promiseCanceller: PromiseCanceller | undefined; + abortSignalListener: AbortSignalListener | undefined; completed: boolean; cancellableStreams: Set | undefined; } @@ -318,8 +322,8 @@ export function experimentalExecuteQueryOrMutationOrSubscriptionEvent( const exeContext: ExecutionContext = { validatedExecutionArgs, errors: undefined, - promiseCanceller: abortSignal - ? new PromiseCanceller(abortSignal) + abortSignalListener: abortSignal + ? new AbortSignalListener(abortSignal) : undefined, completed: false, cancellableStreams: undefined, @@ -368,7 +372,7 @@ export function experimentalExecuteQueryOrMutationOrSubscriptionEvent( }, (error: unknown) => { exeContext.completed = true; - exeContext.promiseCanceller?.disconnect(); + exeContext.abortSignalListener?.disconnect(); return { data: null, errors: withError(exeContext.errors, error as GraphQLError), @@ -382,7 +386,7 @@ export function experimentalExecuteQueryOrMutationOrSubscriptionEvent( exeContext.completed = true; // TODO: add test case for synchronous null bubbling to root with cancellation /* c8 ignore next */ - exeContext.promiseCanceller?.disconnect(); + exeContext.abortSignalListener?.disconnect(); return { data: null, errors: withError(exeContext.errors, error) }; } } @@ -401,7 +405,7 @@ function buildDataResponse( const { rawResult: data, incrementalDataRecords } = graphqlWrappedResult; const errors = exeContext.errors; if (incrementalDataRecords === undefined) { - exeContext.promiseCanceller?.disconnect(); + exeContext.abortSignalListener?.disconnect(); return errors !== undefined ? { errors, data } : { data }; } @@ -833,7 +837,7 @@ function executeField( incrementalContext: IncrementalContext | undefined, deferMap: ReadonlyMap | undefined, ): PromiseOrValue> | undefined { - const { validatedExecutionArgs, promiseCanceller } = exeContext; + const { validatedExecutionArgs, abortSignalListener } = exeContext; const { schema, contextValue, variableValues, hideSuggestions, abortSignal } = validatedExecutionArgs; const fieldName = fieldDetailsList[0].node.name.value; @@ -878,7 +882,9 @@ function executeField( fieldDetailsList, info, path, - promiseCanceller?.cancellablePromise(result) ?? result, + abortSignalListener + ? cancellablePromise(result, abortSignalListener) + : result, incrementalContext, deferMap, ); @@ -1386,8 +1392,10 @@ function completeListValue( const itemType = returnType.ofType; if (isAsyncIterable(result)) { - const maybeCancellableIterable = - exeContext.promiseCanceller?.cancellableIterable(result) ?? result; + const abortSignalListener = exeContext.abortSignalListener; + const maybeCancellableIterable = abortSignalListener + ? cancellableIterable(result, abortSignalListener) + : result; const asyncIterator = maybeCancellableIterable[Symbol.asyncIterator](); return completeAsyncIteratorValue( @@ -1599,9 +1607,11 @@ async function completePromisedListItemValue( deferMap: ReadonlyMap | undefined, ): Promise { try { - const resolved = await (exeContext.promiseCanceller?.cancellablePromise( - item, - ) ?? item); + const abortSignalListener = exeContext.abortSignalListener; + const maybeCancellableItem = abortSignalListener + ? cancellablePromise(item, abortSignalListener) + : item; + const resolved = await maybeCancellableItem; let completed = completeValue( exeContext, itemType, @@ -2108,8 +2118,8 @@ function mapSourceToResponse( } const abortSignal = validatedExecutionArgs.abortSignal; - const promiseCanceller = abortSignal - ? new PromiseCanceller(abortSignal) + const abortSignalListener = abortSignal + ? new AbortSignalListener(abortSignal) : undefined; // For each payload yielded from a subscription, map it over the normal @@ -2117,7 +2127,9 @@ function mapSourceToResponse( // This implements the "MapSourceToResponseEvent" algorithm described in // the GraphQL specification.. return mapAsyncIterable( - promiseCanceller?.cancellableIterable(resultOrStream) ?? resultOrStream, + abortSignalListener + ? cancellableIterable(resultOrStream, abortSignalListener) + : resultOrStream, (payload: unknown) => { const perEventExecutionArgs: ValidatedExecutionArgs = { ...validatedExecutionArgs, @@ -2125,7 +2137,7 @@ function mapSourceToResponse( }; return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs); }, - () => promiseCanceller?.disconnect(), + () => abortSignalListener?.disconnect(), ); } @@ -2275,18 +2287,20 @@ function executeSubscription( const result = resolveFn(rootValue, args, contextValue, info, abortSignal); if (isPromise(result)) { - const promiseCanceller = abortSignal - ? new PromiseCanceller(abortSignal) + const abortSignalListener = abortSignal + ? new AbortSignalListener(abortSignal) : undefined; - const promise = promiseCanceller?.cancellablePromise(result) ?? result; + const promise = abortSignalListener + ? cancellablePromise(result, abortSignalListener) + : result; return promise.then(assertEventStream).then( (resolved) => { - promiseCanceller?.disconnect(); + abortSignalListener?.disconnect(); return resolved; }, (error: unknown) => { - promiseCanceller?.disconnect(); + abortSignalListener?.disconnect(); throw locatedError(error, fieldNodes, pathToArray(path)); }, ); @@ -2652,13 +2666,17 @@ function completeStreamItem( itemType: GraphQLOutputType, ): PromiseOrValue { if (isPromise(item)) { + const abortSignalListener = exeContext.abortSignalListener; + const maybeCancellableItem = abortSignalListener + ? cancellablePromise(item, abortSignalListener) + : item; return completePromisedValue( exeContext, itemType, fieldDetailsList, info, itemPath, - exeContext.promiseCanceller?.cancellablePromise(item) ?? item, + maybeCancellableItem, incrementalContext, new Map(), ).then(