Skip to content

Commit

Permalink
wip: jests for #4
Browse files Browse the repository at this point in the history
removed, matrixai/id
  • Loading branch information
addievo committed Sep 20, 2023
1 parent 1893e1e commit 88a9af9
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 27 deletions.
17 changes: 0 additions & 17 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -448,21 +448,6 @@ class RPCServer extends EventTarget {
},
});

// `RPCStream.cancel(reason)`.
const handleAbort = () => {
const timer = new Timer({
delay: this.handlerTimeoutTime,
handler: () => {
rpcStream.cancel(abortController.signal.reason);
},
});
void timer
.catch(() => {}) // Ignore cancellation error
.finally(() => {
abortController.signal.removeEventListener('abort', handleAbort);
});
};
abortController.signal.addEventListener('abort', handleAbort);
const prom = (async () => {
const id = await this.idGen();
const headTransformStream = rpcUtilsMiddleware.binaryToJsonMessageStream(
Expand Down Expand Up @@ -606,7 +591,6 @@ class RPCServer extends EventTarget {
await headerWriter.close();
// Clean up and return
timer.cancel(cleanupReason);
abortController.signal.removeEventListener('abort', handleAbort);
rpcStream.cancel(Error('TMP header message was an error'));
return;
}
Expand All @@ -629,7 +613,6 @@ class RPCServer extends EventTarget {
this.logger.info(`Handled stream with method (${method})`);
// Cleaning up abort and timer
timer.cancel(cleanupReason);
abortController.signal.removeEventListener('abort', handleAbort);
abortController.abort(new rpcErrors.ErrorRPCStreamEnded());
})();
const handlerProm = PromiseCancellable.from(prom, abortController).finally(
Expand Down
214 changes: 204 additions & 10 deletions tests/RPC.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ import ServerCaller from '@/callers/ServerCaller';
import ClientCaller from '@/callers/ClientCaller';
import UnaryCaller from '@/callers/UnaryCaller';
import * as rpcUtilsMiddleware from '@/utils/middleware';
import { ErrorRPC, ErrorRPCRemote } from '@/errors';
import {
ErrorRPC,
ErrorRPCHandlerFailed,
ErrorRPCRemote,
ErrorRPCTimedOut,
} from '@/errors';
import * as rpcErrors from '@/errors';
import RPCClient from '@/RPCClient';
import RPCServer from '@/RPCServer';
Expand Down Expand Up @@ -593,18 +598,207 @@ describe('RPC', () => {
await expect(rpcServer.destroy(false)).toResolve();
await rpcClient.destroy();
});
testProp(
'RPC Client and server timeout concurrently',
[fc.array(rpcTestUtils.safeJsonValueArb, { minLength: 1 })],
async (values) => {
const { clientPair, serverPair } = rpcTestUtils.createTapPairs<
Uint8Array,
Uint8Array
>();
const timeout = 600;
class TestMethod extends DuplexHandler {
public handle = async function* (
input: AsyncIterableIterator<JSONValue>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): AsyncIterableIterator<JSONValue> {
// Check for abort event
ctx.signal.throwIfAborted();
const abortProm = utils.promise<never>();
ctx.signal.addEventListener('abort', () => {
abortProm.rejectP(ctx.signal.reason);
});
await abortProm.p;
};
}
const testMethodInstance = new TestMethod({});
// Set up a client and server with matching timeout settings
const rpcServer = await RPCServer.createRPCServer({
manifest: {
testMethod: testMethodInstance,
},
logger,
idGen,
handlerTimeoutTime: timeout,
});
rpcServer.handleStream({
...serverPair,
cancel: () => {},
});

const rpcClient = await RPCClient.createRPCClient({
manifest: {
testMethod: new DuplexCaller(),
},
streamFactory: async () => {
return {
...clientPair,
cancel: () => {},
};
},
logger,
idGen,
streamKeepAliveTimeoutTime: timeout,
});
const callerInterface = await rpcClient.methods.testMethod();
const writer = callerInterface.writable.getWriter();
const reader = callerInterface.readable.getReader();
await expect(reader.read()).rejects.toThrow(
'Timed out waiting for header',
);
await expect(writer.write()).rejects.toThrow(
'Timed out waiting for header',
);
await rpcServer.destroy();
await rpcClient.destroy();
},
{ numRuns: 1 },
);
testProp(
'RPC server times out before client',
[fc.array(rpcTestUtils.safeJsonValueArb, { minLength: 1 })],
async (values) => {
const { clientPair, serverPair } = rpcTestUtils.createTapPairs<
Uint8Array,
Uint8Array
>();
class TestMethod extends DuplexHandler {
public handle = async function* (
input: AsyncIterableIterator<JSONValue>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): AsyncIterableIterator<JSONValue> {
await utils.sleep(7); // Longer than server's timeout, shorter than client'
yield* input;
};
}
// Set up a client and server, server has a shorter timeout than client`
const rpcServer = await RPCServer.createRPCServer({
manifest: {
testMethod: new TestMethod({}),
},
logger,
idGen,
handlerTimeoutTime: 5,
});
rpcServer.handleStream({
...serverPair,
cancel: () => {},
});

/* test('RPC Client and server timeout concurrentrly'), async () => {};
test('RPC client times out and attempts to reconnect'), async () => {};
const rpcClient = await RPCClient.createRPCClient({
manifest: {
testMethod: new DuplexCaller(),
},
streamFactory: async () => {
return {
...clientPair,
cancel: () => {},
};
},
logger,
idGen,
streamKeepAliveTimeoutTime: 10,
});
const callerInterface = await rpcClient.methods.testMethod();
const writer = callerInterface.writable.getWriter();
const reader = callerInterface.readable.getReader();

// Expect the server to time out first
await expect(writer.closed).rejects.toThrow(ErrorRPCHandlerFailed);
await expect(reader.closed).rejects.toThrow(ErrorRPCHandlerFailed);

await rpcServer.destroy();
await rpcClient.destroy();
},
);
testProp(
'Client times out before server',
[fc.array(rpcTestUtils.safeJsonValueArb, { minLength: 1 })],
async (values) => {
const { clientPair, serverPair } = rpcTestUtils.createTapPairs<
Uint8Array,
Uint8Array
>();
class TestMethod extends DuplexHandler {
public handle = async function* (
input: AsyncIterableIterator<JSONValue>,
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): AsyncIterableIterator<JSONValue> {
await utils.sleep(7); // Longer than client's timeout, shorter than server's
yield* input;
};
}
// Set up a client and server with matching timeout settings
const rpcServer = await RPCServer.createRPCServer({
manifest: {
testMethod: new TestMethod({}),
},
logger,
idGen,
handlerTimeoutTime: 10,
});
rpcServer.handleStream({
...serverPair,
cancel: () => {},
});

const rpcClient = await RPCClient.createRPCClient({
manifest: {
testMethod: new DuplexCaller(),
},
streamFactory: async () => {
return {
...clientPair,
cancel: () => {},
};
},
logger,
idGen,
streamKeepAliveTimeoutTime: 5,
});
const callerInterface = await rpcClient.methods.testMethod();
const writer = callerInterface.writable.getWriter();
const reader = callerInterface.readable.getReader();

// Expect the client to time out first
await expect(writer.closed).rejects.toThrow(ErrorRPCHandlerFailed);
await expect(reader.closed).rejects.toThrow(ErrorRPCHandlerFailed);

test('RPC server times out before client'), async () => {};
/!**
await rpcServer.destroy();
await rpcClient.destroy();
},
);
/**
* Hard timeout is absolute time limit, cannot be extended or bypassed. When reached,
* operation is immediately cancelled.
* Soft timeout has a threshold, but can be extended or bypassed. For e.g if server sends
* partial response, client can extend the timeout to receive the rest of the response.
*!/
test('RPC server and client hard and soft timeout limits'), async () => {};
* Soft timeout has a threshold, but can be extended or bypassed. For e.g if a server sends a
* partial response, a client can extend the timeout to receive the rest of the response.
*/
test('RPC server and client hard and soft timeout limits', async () => {
// Set up a client and server with both hard and soft timeout limits
// Trigger various scenarios where one or both could hit soft or hard limits
// Expect the system to behave according to the specified limits
});

test('RPC client and server with infinite timeout'), async () => {};*/
test('RPC client and server with infinite timeout', async () => {
// Set up a client and server with infinite timeout settings
// Trigger a call that will hang indefinitely or for a long time
// Expect neither to time out and verify that they can still handle other operations
});
});

0 comments on commit 88a9af9

Please sign in to comment.