diff --git a/.changeset/moody-hats-sleep.md b/.changeset/moody-hats-sleep.md new file mode 100644 index 000000000..269fe0731 --- /dev/null +++ b/.changeset/moody-hats-sleep.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Use addEventListener instead of overwriting the onabort property, preventing interference with outside users also setting the property on the same signal. Remove event listener when race settles to avoid memory leak. diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 85d493ef5..f66f02c60 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -6,7 +6,7 @@ import { FULL_SYNC_PRIORITY, InternalProgressInformation } from '../../../db/cru import * as sync_status from '../../../db/crud/SyncStatus.js'; import { AbortOperation } from '../../../utils/AbortOperation.js'; import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js'; -import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js'; +import { resolveEarlyOnAbort, throttleLeadingTrailing } from '../../../utils/async.js'; import { BucketChecksum, BucketDescription, @@ -1062,7 +1062,7 @@ The next upload iteration will be delayed.`); }); } - private async applyCheckpoint(checkpoint: Checkpoint, abort: AbortSignal) { + private async applyCheckpoint(checkpoint: Checkpoint, signal: AbortSignal) { let result = await this.options.adapter.syncLocalDatabase(checkpoint); const pending = this.pendingCrudUpload; @@ -1079,9 +1079,9 @@ The next upload iteration will be delayed.`); this.logger.debug( 'Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying.' ); - await Promise.race([pending, onAbortPromise(abort)]); + await resolveEarlyOnAbort(pending, signal); - if (abort.aborted) { + if (signal.aborted) { return { applied: false, endIteration: true }; } diff --git a/packages/common/src/utils/async.ts b/packages/common/src/utils/async.ts index c6fe822d8..72e1ced20 100644 --- a/packages/common/src/utils/async.ts +++ b/packages/common/src/utils/async.ts @@ -49,12 +49,53 @@ export function throttleLeadingTrailing(func: () => void, wait: number) { }; } -export function onAbortPromise(signal: AbortSignal): Promise { - return new Promise((resolve) => { +/** + * Race a promise against an abort signal. + * Returns a promise that resolves early if the signal is aborted before the + * original promise resolves. + * + * Note: The signal does not cancel the promise. To cancel the promise then + * its logic needs to explicitly check the signal. + */ +export function resolveEarlyOnAbort( + promise: Promise, + signal: AbortSignal +): Promise> { + return new Promise((resolve, reject) => { + const resolveWith = (result: ResolveEarlyOnAbortResult) => { + removeAbortHandler(); + resolve(result); + }; + + const rejectWith = (error: Error) => { + removeAbortHandler(); + reject(error); + }; + + const abortHandler = () => { + resolveWith({ aborted: true }); + }; + + const addAbortHandler = () => { + // Use an event listener to avoid interfering with the onabort + // property where other code may have registered a handler. + signal.addEventListener('abort', abortHandler); + }; + + const removeAbortHandler = () => { + // Remove the abort handler to avoid memory leaks. + signal.removeEventListener('abort', abortHandler); + }; + + addAbortHandler(); + if (signal.aborted) { - resolve(); - } else { - signal.onabort = () => resolve(); + abortHandler(); + return; } + + promise.then((result) => resolveWith({ result, aborted: false })).catch((error) => rejectWith(error)); }); } + +type ResolveEarlyOnAbortResult = { result: T; aborted: false } | { aborted: true }; diff --git a/packages/common/tests/utils/async.test.ts b/packages/common/tests/utils/async.test.ts new file mode 100644 index 000000000..4df8b9868 --- /dev/null +++ b/packages/common/tests/utils/async.test.ts @@ -0,0 +1,74 @@ +import { describe, expect, it, vi } from 'vitest'; +import { resolveEarlyOnAbort } from '../../src/utils/async'; + +describe('resolveEarlyOnAbort', () => { + it('should resolve early when signal is aborted', async () => { + const controller = new AbortController(); + + const slowPromise = new Promise((resolve) => { + setTimeout(() => resolve('completed'), 100); + }); + + const racePromise = resolveEarlyOnAbort(slowPromise, controller.signal); + + // Abort after a short delay + setTimeout(() => controller.abort(), 10); + + const result = await racePromise; + expect(result).toEqual({ aborted: true }); + }); + + it('should resolve immediately if signal is already aborted', async () => { + const controller = new AbortController(); + controller.abort(); // Abort before creating the race + + const slowPromise = new Promise((resolve) => { + setTimeout(() => resolve('completed'), 100); + }); + + const result = await resolveEarlyOnAbort(slowPromise, controller.signal); + expect(result).toEqual({ aborted: true }); + }); + + it('should resolve with the result if the promise resolves before the signal is aborted', async () => { + const controller = new AbortController(); + + const slowPromise = new Promise((resolve) => { + setTimeout(() => resolve('completed'), 100); + }); + + const result = await resolveEarlyOnAbort(slowPromise, controller.signal); + expect(result).toEqual({ result: 'completed', aborted: false }); + }); + + it('should show that resolveEarlyOnAbort does not interfere with onabort property or other event listeners', async () => { + const controller = new AbortController(); + let onabortCalled = false; + let eventListenerCalled = false; + + // Set onabort property + controller.signal.onabort = () => { + onabortCalled = true; + }; + + // Add another event listener + controller.signal.addEventListener('abort', () => { + eventListenerCalled = true; + }); + + const slowPromise = new Promise((resolve) => { + setTimeout(() => resolve('completed'), 100); + }); + + const racePromise = resolveEarlyOnAbort(slowPromise, controller.signal); + + // Abort after a short delay + setTimeout(() => controller.abort(), 10); + + const result = await racePromise; + + expect(result).toEqual({ aborted: true }); + expect(onabortCalled).toBe(true); + expect(eventListenerCalled).toBe(true); + }); +});