Skip to content

Commit

Permalink
refactor sync retrying to properly call onGetError and onSetError, fi…
Browse files Browse the repository at this point in the history
…x retries were not getting cancelled if a new set changed the value
  • Loading branch information
jmeistrich committed Aug 25, 2024
1 parent 41c4568 commit b88957d
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 49 deletions.
49 changes: 34 additions & 15 deletions src/retry.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { isPromise } from './is';
import type { RetryOptions } from './observableInterfaces';
import type { NodeInfo, RetryOptions } from './observableInterfaces';
import type { OnErrorRetryParams, SyncedGetSetBaseParams } from './sync/syncTypes';

function calculateRetryDelay(retryOptions: RetryOptions, retryNum: number): number | null {
const { backoff, delay = 1000, infinite, times = 3, maxDelay = 30000 } = retryOptions;
Expand All @@ -10,39 +11,57 @@ function calculateRetryDelay(retryOptions: RetryOptions, retryNum: number): numb
return null;
}

function createRetryTimeout(retryOptions: RetryOptions, retryNum: number, fn: () => void) {
function createRetryTimeout(retryOptions: RetryOptions, retryNum: number, fn: () => void): number | false {
const delayTime = calculateRetryDelay(retryOptions, retryNum);
if (delayTime) {
return setTimeout(fn, delayTime);
return setTimeout(fn, delayTime) as unknown as number;
} else {
return false;
}
}

const mapRetryTimeouts = new Map<NodeInfo, number>();

export function runWithRetry<T>(
state: { retryNum: number; retry: RetryOptions | undefined },
fn: (e: { retryNum: number; cancelRetry: () => void }) => T | Promise<T>,
state: SyncedGetSetBaseParams<any>,
retryOptions: RetryOptions | undefined,
fn: (params: OnErrorRetryParams) => T | Promise<T>,
onError: (error: Error) => void,
): T | Promise<T> {
const { retry } = state;
const e = Object.assign(state, { cancel: false, cancelRetry: () => (e.cancel = false) });
let value = fn(e);
let value = fn(state);

if (isPromise(value) && retry) {
let timeoutRetry: any;
return new Promise<any>((resolve) => {
if (isPromise(value) && retryOptions) {
let timeoutRetry: number;
if (mapRetryTimeouts.has(state.node)) {
clearTimeout(mapRetryTimeouts.get(state.node));
}
return new Promise<any>((resolve, reject) => {
const run = () => {
(value as Promise<any>)
.then((val: any) => {
resolve(val);
})
.catch(() => {
.catch((error: Error) => {
state.retryNum++;
if (timeoutRetry) {
clearTimeout(timeoutRetry);
}
if (!e.cancel) {
timeoutRetry = createRetryTimeout(retry, state.retryNum, () => {
value = fn(e);
if (onError) {
onError(error);
}
if (!state.cancelRetry) {
const timeout = createRetryTimeout(retryOptions, state.retryNum, () => {
value = fn(state);
run();
});

if (timeout === false) {
state.cancelRetry = true;
reject();
} else {
mapRetryTimeouts.set(state.node, timeout);
timeoutRetry = timeout;
}
}
});
};
Expand Down
8 changes: 4 additions & 4 deletions src/sync-plugins/keel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ export function syncedKeel<
fn: Function,
from: 'create' | 'update' | 'delete',
) => {
const { retryNum, cancelRetry, update } = params;
const { retryNum, update } = params;

if (
from === 'create' &&
Expand All @@ -434,7 +434,7 @@ export function syncedKeel<
if (__DEV__) {
console.log('Creating duplicate data already saved, just ignore.');
}
cancelRetry();
params.cancelRetry = true;
// This has already been saved but didn't update pending changes, so just update with {} to clear the pending state
update({
value: {} as TRemote,
Expand All @@ -445,13 +445,13 @@ export function syncedKeel<
if (__DEV__) {
console.log('Deleting non-existing data, just ignore.');
}
cancelRetry();
params.cancelRetry = true;
}
} else if (error.type === 'bad_request') {
keelConfig.onError?.({ error, params, input, type: from, action: fn.name || fn.toString() });

if (retryNum > 4) {
cancelRetry();
params.cancelRetry = true;
}

throw new Error(error.message);
Expand Down
58 changes: 37 additions & 21 deletions src/sync/syncObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ async function doChangeRemote(changeInfo: PreppedChangeRemote | undefined) {
syncOptions.onSetError?.(error, setParams as SyncedSetParams<any>);
};

const setParams: Omit<SyncedSetParams<any>, 'cancelRetry' | 'retryNum'> = {
const setParams: SyncedSetParams<any> = {
node,
value$: obs$,
changes: changesRemote,
Expand All @@ -659,22 +659,29 @@ async function doChangeRemote(changeInfo: PreppedChangeRemote | undefined) {
}
},
refresh: syncState.sync,
retryNum: 0,
cancelRetry: false,
};

let savedPromise = runWithRetry({ retryNum: 0, retry: syncOptions.retry }, async (retryEvent) => {
const params = setParams as SyncedSetParams<any>;
params.cancelRetry = retryEvent.cancelRetry;
params.retryNum = retryEvent.retryNum;
let savedPromise = runWithRetry(
setParams,
syncOptions.retry,
async () => {
return syncOptions!.set!(setParams);
},
onError,
);
let didError = false;

return syncOptions!.set!(params);
});
if (isPromise(savedPromise)) {
savedPromise = savedPromise.catch(onError);
savedPromise = savedPromise.catch((error) => {
didError = true;
onError(error);
});
await savedPromise;
}

await savedPromise;

if (!state$.error.peek()) {
if (!didError) {
// If this remote save changed anything then update cache and metadata
// Because save happens after a timeout and they're batched together, some calls to save will
// return saved data and others won't, so those can be ignored.
Expand Down Expand Up @@ -907,7 +914,7 @@ export function syncObservable<T>(
allSyncStates.set(syncState$, node);
syncStateValue.getPendingChanges = () => localState.pendingChanges;

const onError = (error: Error, getParams: SyncedGetParams<T> | undefined, source: 'get' | 'subscribe') => {
const onGetError = (error: Error, getParams: SyncedGetParams<T> | undefined, source: 'get' | 'subscribe') => {
syncState$.error.set(error);
syncOptions.onGetError?.(error, getParams, source);
};
Expand Down Expand Up @@ -1090,7 +1097,7 @@ export function syncObservable<T>(
});
},
refresh: () => when(syncState$.isLoaded, sync),
onError: (error) => onError(error, undefined, 'subscribe'),
onError: (error) => onGetError(error, undefined, 'subscribe'),
});
};

Expand All @@ -1102,7 +1109,9 @@ export function syncObservable<T>(
}
const existingValue = getNodeValue(node);

const getParams: Omit<SyncedGetParams<T>, 'cancelRetry' | 'retryNum'> = {
const onError = (error: Error) => onGetError(error, getParams as SyncedGetParams<T>, 'get');

const getParams: SyncedGetParams<T> = {
node,
value$: obs$,
value: isFunction(existingValue) || existingValue?.[symbolLinked] ? undefined : existingValue,
Expand All @@ -1111,7 +1120,9 @@ export function syncObservable<T>(
options: syncOptions,
lastSync,
updateLastSync: (lastSync: number) => (getParams.lastSync = lastSync),
onError: (error) => onError(error, getParams as SyncedGetParams<T>, 'get'),
onError,
retryNum: 0,
cancelRetry: false,
};

let modeBeforeReset: GetMode | undefined = undefined;
Expand All @@ -1137,12 +1148,17 @@ export function syncObservable<T>(
numPendingGets: (syncStateValue.numPendingGets! || 0) + 1,
isGetting: true,
});
const got = runWithRetry({ retryNum: 0, retry: syncOptions.retry }, (retryEvent) => {
const params = getParams as SyncedGetParams<T>;
params.cancelRetry = retryEvent.cancelRetry;
params.retryNum = retryEvent.retryNum;
return get(params);
});
const got = runWithRetry(
getParams,
syncOptions.retry,
(retryEvent) => {
const params = getParams as SyncedGetParams<T>;
params.cancelRetry = retryEvent.cancelRetry;
params.retryNum = retryEvent.retryNum;
return get(params);
},
onError,
);
const numGets = (node.numGets = (node.numGets || 0) + 1);
const handle = (value: any) => {
syncState$.numPendingGets.set((v) => v! - 1);
Expand Down
19 changes: 10 additions & 9 deletions src/sync/syncTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,25 @@ export interface SyncedGetSetSubscribeBaseParams<T = any> {
refresh: () => void;
}

export interface SyncedGetParams<T> extends SyncedGetSetSubscribeBaseParams<T> {
export interface SyncedGetSetBaseParams<T = any> extends SyncedGetSetSubscribeBaseParams<T>, OnErrorRetryParams {}

export interface OnErrorRetryParams {
retryNum: number;
cancelRetry: boolean;
}

export interface SyncedGetParams<T> extends SyncedGetSetBaseParams<T> {
value: any;
lastSync: number | undefined;
updateLastSync: (lastSync: number) => void;
mode: GetMode;
retryNum: number;
cancelRetry: () => void;
onError: (error: Error) => void;
options: SyncedOptions;
}

export interface SyncedSetParams<T>
extends Pick<SetParams<T>, 'changes' | 'value'>,
SyncedGetSetSubscribeBaseParams<T> {
export interface SyncedSetParams<T> extends Pick<SetParams<T>, 'changes' | 'value'>, SyncedGetSetBaseParams<T> {
update: UpdateFn<T>;
cancelRetry: () => void;
retryNum: number;
onError: (error: Error) => void;
onError: (error: Error, retryParams: OnErrorRetryParams) => void;
}

export interface SyncedSubscribeParams<T = any> extends SyncedGetSetSubscribeBaseParams<T> {
Expand Down
6 changes: 6 additions & 0 deletions tests/persist.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,15 @@ describe('Pending', () => {
test('Pending created and updated', async () => {
const persistName = getPersistName();
let isOk = false;
let didSetWrongValue = false;
const obs$ = observable(
synced({
get: () => {
return { test: 'hi' };
},
set: ({ value }) => {
didSetWrongValue = value.test !== obs$.get().test;
expect(value.test).toEqual(obs$.get().test);
if (!isOk) {
throw new Error('Did not save' + value);
}
Expand Down Expand Up @@ -246,6 +249,8 @@ describe('Pending', () => {
// Updates pending
obs$.test.set('hello2');
await promiseTimeout(0);
expect(didSetWrongValue).toEqual(false);

pending = state$.getPendingChanges();
expect(pending).toEqual({ test: { p: 'hi', t: ['object'], v: 'hello2' } });

Expand Down Expand Up @@ -280,6 +285,7 @@ describe('Pending', () => {

pending = state$.getPendingChanges();
expect(pending).toEqual({});
expect(didSetWrongValue).toEqual(false);
});
test('Pending applied if changed', async () => {
const persistName = getPersistName();
Expand Down

0 comments on commit b88957d

Please sign in to comment.