Skip to content

feat(common): use addEventListener instead of onabort in async utils onAbortPromise #645

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
5 changes: 5 additions & 0 deletions .changeset/moody-hats-sleep.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Use addEventListener instead of overwriting the onabort property, preventing interference with outside users also setting the property on the same signal. Remove event listener when race settles to avoid memory leak.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { FULL_SYNC_PRIORITY, InternalProgressInformation } from '../../../db/cru
import * as sync_status from '../../../db/crud/SyncStatus.js';
import { AbortOperation } from '../../../utils/AbortOperation.js';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js';
import { resolveEarlyOnAbort, throttleLeadingTrailing } from '../../../utils/async.js';
import {
BucketChecksum,
BucketDescription,
Expand Down Expand Up @@ -1062,7 +1062,7 @@ The next upload iteration will be delayed.`);
});
}

private async applyCheckpoint(checkpoint: Checkpoint, abort: AbortSignal) {
private async applyCheckpoint(checkpoint: Checkpoint, signal: AbortSignal) {
let result = await this.options.adapter.syncLocalDatabase(checkpoint);
const pending = this.pendingCrudUpload;

Expand All @@ -1079,9 +1079,9 @@ The next upload iteration will be delayed.`);
this.logger.debug(
'Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying.'
);
await Promise.race([pending, onAbortPromise(abort)]);
await resolveEarlyOnAbort(pending, signal);

if (abort.aborted) {
if (signal.aborted) {
return { applied: false, endIteration: true };
}

Expand Down
51 changes: 46 additions & 5 deletions packages/common/src/utils/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,53 @@ export function throttleLeadingTrailing(func: () => void, wait: number) {
};
}

export function onAbortPromise(signal: AbortSignal): Promise<void> {
return new Promise<void>((resolve) => {
/**
* Race a promise against an abort signal.
* Returns a promise that resolves early if the signal is aborted before the
* original promise resolves.
*
* Note: The signal does not cancel the promise. To cancel the promise then
* its logic needs to explicitly check the signal.
*/
export function resolveEarlyOnAbort<T>(
promise: Promise<T>,
signal: AbortSignal
): Promise<ResolveEarlyOnAbortResult<T>> {
return new Promise((resolve, reject) => {
const resolveWith = (result: ResolveEarlyOnAbortResult<T>) => {
removeAbortHandler();
resolve(result);
};

const rejectWith = (error: Error) => {
removeAbortHandler();
reject(error);
};

const abortHandler = () => {
resolveWith({ aborted: true });
};

const addAbortHandler = () => {
// Use an event listener to avoid interfering with the onabort
// property where other code may have registered a handler.
signal.addEventListener('abort', abortHandler);
};

const removeAbortHandler = () => {
// Remove the abort handler to avoid memory leaks.
signal.removeEventListener('abort', abortHandler);
};

addAbortHandler();

if (signal.aborted) {
resolve();
} else {
signal.onabort = () => resolve();
abortHandler();
return;
}

promise.then((result) => resolveWith({ result, aborted: false })).catch((error) => rejectWith(error));
});
}

type ResolveEarlyOnAbortResult<T> = { result: T; aborted: false } | { aborted: true };
74 changes: 74 additions & 0 deletions packages/common/tests/utils/async.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { describe, expect, it, vi } from 'vitest';
import { resolveEarlyOnAbort } from '../../src/utils/async';

describe('resolveEarlyOnAbort', () => {
it('should resolve early when signal is aborted', async () => {
const controller = new AbortController();

const slowPromise = new Promise<string>((resolve) => {
setTimeout(() => resolve('completed'), 100);
});

const racePromise = resolveEarlyOnAbort(slowPromise, controller.signal);

// Abort after a short delay
setTimeout(() => controller.abort(), 10);

const result = await racePromise;
expect(result).toEqual({ aborted: true });
});

it('should resolve immediately if signal is already aborted', async () => {
const controller = new AbortController();
controller.abort(); // Abort before creating the race

const slowPromise = new Promise<string>((resolve) => {
setTimeout(() => resolve('completed'), 100);
});

const result = await resolveEarlyOnAbort(slowPromise, controller.signal);
expect(result).toEqual({ aborted: true });
});

it('should resolve with the result if the promise resolves before the signal is aborted', async () => {
const controller = new AbortController();

const slowPromise = new Promise<string>((resolve) => {
setTimeout(() => resolve('completed'), 100);
});

const result = await resolveEarlyOnAbort(slowPromise, controller.signal);
expect(result).toEqual({ result: 'completed', aborted: false });
});

it('should show that resolveEarlyOnAbort does not interfere with onabort property or other event listeners', async () => {
const controller = new AbortController();
let onabortCalled = false;
let eventListenerCalled = false;

// Set onabort property
controller.signal.onabort = () => {
onabortCalled = true;
};

// Add another event listener
controller.signal.addEventListener('abort', () => {
eventListenerCalled = true;
});

const slowPromise = new Promise<string>((resolve) => {
setTimeout(() => resolve('completed'), 100);
});

const racePromise = resolveEarlyOnAbort(slowPromise, controller.signal);

// Abort after a short delay
setTimeout(() => controller.abort(), 10);

const result = await racePromise;

expect(result).toEqual({ aborted: true });
expect(onabortCalled).toBe(true);
expect(eventListenerCalled).toBe(true);
});
});