Skip to content

Commit

Permalink
fix: Makes concurrent jests non deterministic
Browse files Browse the repository at this point in the history
* Related #4

fix: parameterize replacer toError and fromError, change fromError to return JSONValue, stringify fromError usages

* Related #10
  • Loading branch information
addievo committed Sep 22, 2023
1 parent eaee21a commit 2d92d6d
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 28 deletions.
10 changes: 8 additions & 2 deletions src/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class RPCClient<M extends ClientManifest> {
logger.info(`Created ${this.name}`);
return rpcClient;
}
protected onTimeoutCallback?: () => void;
protected idGen: IdGen;
protected logger: Logger;
protected streamFactory: StreamFactory;
Expand All @@ -110,7 +111,9 @@ class RPCClient<M extends ClientManifest> {
errorData,
metadata?: JSONValue,
) => ErrorRPCRemote<unknown>;

public registerOnTimeoutCallback(callback: () => void) {
this.onTimeoutCallback = callback;
}
// Method proxies
public readonly streamKeepAliveTimeoutTime: number;
public readonly methodsProxy = new Proxy(
Expand Down Expand Up @@ -353,6 +356,9 @@ class RPCClient<M extends ClientManifest> {
void timer.then(
() => {
abortController.abort(timeoutError);
if (this.onTimeoutCallback) {
this.onTimeoutCallback();
}
},
() => {}, // Ignore cancellation error
);
Expand Down Expand Up @@ -547,7 +553,7 @@ class RPCClient<M extends ClientManifest> {
...(rpcStream.meta ?? {}),
command: method,
};
throw rpcUtils.toError(messageValue.error.data, metadata);
throw this.toError(messageValue.error.data, metadata);
}
leadingMessage = messageValue;
} catch (e) {
Expand Down
13 changes: 10 additions & 3 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class RPCServer extends EventTarget {
logger.info(`Created ${this.name}`);
return rpcServer;
}
protected onTimeoutCallback?: () => void;
protected idGen: IdGen;
protected logger: Logger;
protected handlerMap: Map<string, RawHandlerImplementation> = new Map();
Expand All @@ -131,7 +132,10 @@ class RPCServer extends EventTarget {
Uint8Array,
JSONRPCResponseResult
>;

// Function to register a callback for timeout
public registerOnTimeoutCallback(callback: () => void) {
this.onTimeoutCallback = callback;
}
public constructor({
manifest,
middlewareFactory,
Expand Down Expand Up @@ -347,7 +351,7 @@ class RPCServer extends EventTarget {
const rpcError: JSONRPCError = {
code: e.exitCode ?? JSONRPCErrorCode.InternalError,
message: e.description ?? '',
data: rpcUtils.fromError(e, this.sensitive),
data: JSON.stringify(this.fromError(e), this.replacer),
};
const rpcErrorMessage: JSONRPCResponseError = {
jsonrpc: '2.0',
Expand Down Expand Up @@ -468,6 +472,9 @@ class RPCServer extends EventTarget {
delay: this.handlerTimeoutTime,
handler: () => {
abortController.abort(new rpcErrors.ErrorRPCTimedOut());
if (this.onTimeoutCallback) {
this.onTimeoutCallback();
}
},
});

Expand Down Expand Up @@ -608,7 +615,7 @@ class RPCServer extends EventTarget {
const rpcError: JSONRPCError = {
code: e.exitCode ?? JSONRPCErrorCode.InternalError,
message: e.description ?? '',
data: rpcUtils.fromError(e, this.sensitive),
data: JSON.stringify(this.fromError(e), this.replacer),
};
const rpcErrorMessage: JSONRPCResponseError = {
jsonrpc: '2.0',
Expand Down
5 changes: 3 additions & 2 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,13 @@ declare const brand: unique symbol;
type Opaque<K, T> = T & { readonly [brand]: K };

type JSONValue =
| { [key: string]: JSONValue }
| { [key: string]: JSONValue | undefined }
| Array<JSONValue>
| string
| number
| boolean
| null;
| null
| undefined;

type POJO = { [key: string]: any };
type PromiseDeconstructed<T> = {
Expand Down
15 changes: 9 additions & 6 deletions src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,16 @@ const replacer = (key: string, value: any) => {
* prevent sensitive information from being sent over the network
*/
function fromError(error: ErrorRPC<any>): JSONValue {
const data: { [key: string]: JSONValue } = {
message: error.message,
description: error.description,
};
if (error.code !== undefined) {
data.code = error.code;
}
return {
type: error.name,
data: {
message: error.message,
code: error.code,
description: error.description,
},
data,
};
}

Expand Down Expand Up @@ -506,9 +509,9 @@ export {
parseJSONRPCResponseError,
parseJSONRPCResponse,
parseJSONRPCMessage,
replacer,
fromError,
toError,
replacer,
clientInputTransformStream,
clientOutputTransformStream,
getHandlerTypes,
Expand Down
50 changes: 40 additions & 10 deletions tests/RPC.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -464,10 +464,10 @@ describe('RPC', () => {

// The promise should be rejected
const rejection = await callProm;
expect(rejection).toBeInstanceOf(rpcErrors.ErrorRPCRemote);

// The error should have specific properties
expect(rejection).toMatchObject({ code: error.code });
expect(rejection).toBeInstanceOf(rpcErrors.ErrorRPCRemote);
expect(rejection).toMatchObject({ code: -32006 });

// Cleanup
await rpcServer.destroy();
Expand Down Expand Up @@ -602,6 +602,8 @@ describe('RPC', () => {
await rpcClient.destroy();
});
test('RPC client and server timeout concurrently', async () => {
let serverTimedOut = false;
let clientTimedOut = false;
// Generate test data (assuming fc.array generates some mock array)
const values = fc.array(rpcTestUtils.safeJsonValueArb, { minLength: 1 });

Expand Down Expand Up @@ -638,6 +640,10 @@ describe('RPC', () => {
idGen,
handlerTimeoutTime: timeout,
});
// Register callback
rpcServer.registerOnTimeoutCallback(() => {
serverTimedOut = true;
});
rpcServer.handleStream({
...serverPair,
cancel: () => {},
Expand All @@ -659,9 +665,20 @@ describe('RPC', () => {
const callerInterface = await rpcClient.methods.testMethod({
timer: timeout,
});
// Register callback
rpcClient.registerOnTimeoutCallback(() => {
clientTimedOut = true;
});
const writer = callerInterface.writable.getWriter();
const reader = callerInterface.readable.getReader();
await utils.sleep(5);
// Wait for server and client to timeout by checking the flag
await new Promise<void>((resolve) => {
const checkFlag = () => {
if (serverTimedOut && clientTimedOut) resolve();
else setTimeout(() => checkFlag(), 10);
};
checkFlag();
});
// Expect both the client and the server to time out
await expect(writer.write(values[0])).rejects.toThrow(
'Timed out waiting for header',
Expand All @@ -674,6 +691,8 @@ describe('RPC', () => {
});
// Test description
test('RPC server times out before client', async () => {
let serverTimedOut = false;

// Generate test data (assuming fc.array generates some mock array)
const values = fc.array(rpcTestUtils.safeJsonValueArb, { minLength: 1 });

Expand Down Expand Up @@ -707,6 +726,10 @@ describe('RPC', () => {
idGen,
handlerTimeoutTime: 1,
});
// Register callback
rpcServer.registerOnTimeoutCallback(() => {
serverTimedOut = true;
});
rpcServer.handleStream({ ...serverPair, cancel: () => {} });

// Create an instance of the RPC client with a longer timeout
Expand All @@ -723,8 +746,16 @@ describe('RPC', () => {
});
const writer = callerInterface.writable.getWriter();
const reader = callerInterface.readable.getReader();
await utils.sleep(2);
// Actual tests: We expect server to timeout before the client
// Wait for server to timeout by checking the flag
await new Promise<void>((resolve) => {
const checkFlag = () => {
if (serverTimedOut) resolve();
else setTimeout(() => checkFlag(), 10);
};
checkFlag();
});

// We expect server to timeout before the client
await expect(writer.write(values[0])).rejects.toThrow(
'Timed out waiting for header',
);
Expand Down Expand Up @@ -851,11 +882,10 @@ describe('RPC', () => {
// Trigger a read that will hang indefinitely

const readPromise = reader.read();

// Adding a sleep here to check that neither timeout

await utils.sleep(10000);

// Adding a randomized sleep here to check that neither timeout
const randomSleepTime = Math.floor(Math.random() * 1000) + 1;
// Random time between 1 and 1,000 ms
await utils.sleep(randomSleepTime);
// At this point, writePromise and readPromise should neither be resolved nor rejected
// because the server method is hanging.

Expand Down
10 changes: 5 additions & 5 deletions tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { fc } from '@fast-check/jest';
import * as utils from '@/utils';
import { fromError } from '@/utils';
import * as rpcErrors from '@/errors';
import { ErrorRPC } from '@/errors';

/**
* This is used to convert regular chunks into randomly sized chunks based on
Expand Down Expand Up @@ -142,15 +143,14 @@ const jsonRpcResponseResultArb = (
})
.noShrink() as fc.Arbitrary<JSONRPCResponseResult>;
const jsonRpcErrorArb = (
error: fc.Arbitrary<Error> = fc.constant(new Error('test error')),
sensitive: boolean = false,
error: fc.Arbitrary<ErrorRPC<any>> = fc.constant(new ErrorRPC('test error')),
) =>
fc
.record(
{
code: fc.integer(),
message: fc.string(),
data: error.map((e) => fromError(e, sensitive)),
data: error.map((e) => JSON.stringify(fromError(e))),
},
{
requiredKeys: ['code', 'message'],
Expand All @@ -159,13 +159,13 @@ const jsonRpcErrorArb = (
.noShrink() as fc.Arbitrary<JSONRPCError>;

const jsonRpcResponseErrorArb = (
error?: fc.Arbitrary<Error>,
error?: fc.Arbitrary<ErrorRPC<any>>,
sensitive: boolean = false,
) =>
fc
.record({
jsonrpc: fc.constant('2.0'),
error: jsonRpcErrorArb(error, sensitive),
error: jsonRpcErrorArb(error),
id: idArb,
})
.noShrink() as fc.Arbitrary<JSONRPCResponseError>;
Expand Down

0 comments on commit 2d92d6d

Please sign in to comment.