Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add websocket transport #31

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 6 additions & 15 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,5 @@
name: Test

on:
push:
branches:
- main
pull_request:
branches:
- main

permissions:
checks: write
contents: write
on: [push, pull_request]

jobs:
test:
Expand All @@ -23,11 +12,13 @@ jobs:
- name: Set up Node
uses: actions/setup-node@v4
with:
node-version: 18
node-version: 20
- name: Install dependencies
run: npm install
- name: Check types
run: npm run typecheck
- name: Run tests
run: npm test
- name: Run typecheck
run: npm run typecheck
- name: Publish Preview Release
run: npx pkg-pr-new publish

1 change: 1 addition & 0 deletions .taprc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
allow-incomplete-coverage: true
25 changes: 20 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ Lightweight [JSON-RPC](https://www.jsonrpc.org/specification) solution for TypeS
- 📜 JSON-RPC 2.0 protocol
- 🕵️ Full IDE autocompletion
- 🪶 Tiny footprint (< 1kB)
- 🚚 Support for custom transports
- 🏝️ Optional support for non-JSON types
- 🚚 Support for custom transports
- 🔌 Optional websocket support
- 🌎 Support for Deno and edge runtimes
- 🚫 No code generation step
- 🚫 No dependencies
Expand Down Expand Up @@ -169,6 +170,21 @@ const client = rpcClient<MyService>({
});
```

### Websockets

Typed-rpc comes with an alternative transport that uses websockets:

```ts
import { websocketTransport } from "typed-rpc/ws";

import
const client = rpcClient<MyService>({
transport: websocketTransport({
url: "wss://websocket.example.org"
})
});
```

## Support for Other Runtimes

`typed-rpc/server` can be used with any server framework or edge runtime.
Expand Down Expand Up @@ -252,10 +268,9 @@ Pair `typed-rpc` with [react-api-query](https://www.npmjs.com/package/react-api-

## What's new in v6

* Services can now expose APIs with non-JSON types like Dates, Maps, Sets, etc. by plugging in a [transcoder](#support-for-non-json-types) like superjson.
* Previously, typed-rpc only shipped a CommonJS build in `/lib` and Deno users would directily consume the TypeScript code in `/src`. We now use [pkgroll](https://github.com/privatenumber/pkgroll) to create a hybrid module in `/dist` with both `.mjs` and `.cjs` files.
* We removed the previously included express adapter to align with the core philosopy of keeping things as simple as possible.

- Services can now expose APIs with non-JSON types like Dates, Maps, Sets, etc. by plugging in a [transcoder](#support-for-non-json-types) like superjson.
- Previously, typed-rpc only shipped a CommonJS build in `/lib` and Deno users would directily consume the TypeScript code in `/src`. We now use [pkgroll](https://github.com/privatenumber/pkgroll) to create a hybrid module in `/dist` with both `.mjs` and `.cjs` files.
- We removed the previously included express adapter to align with the core philosopy of keeping things as simple as possible.

## License

Expand Down
15 changes: 13 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,23 @@
"types": "./dist/server.d.cts",
"default": "./dist/server.cjs"
}
},
"./ws": {
"import": {
"types": "./dist/ws.d.mts",
"default": "./dist/ws.mjs"
},
"require": {
"types": "./dist/ws.d.cts",
"default": "./dist/ws.cjs"
}
}
},
"scripts": {
"dev": "tsx --watch src/e2e/server.ts",
"build": "pkgroll --clean-dist --sourcemap",
"start": "tsx --watch src/test/server.ts",
"test": "with-server tap src/test/client.ts",
"start": "tsx src/e2e/server.ts",
"test": "with-server tap",
"prepare": "npm run build",
"typecheck": "tsc --noEmit"
},
Expand Down
198 changes: 64 additions & 134 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,45 @@
import type {
JsonRpcRequest,
JsonRpcResponse,
RpcTranscoder,
import {
type JsonRpcErrorResponse,
type JsonRpcRequest,
type JsonRpcResponse,
type RpcTranscoder,
} from "./types.js";

export * from "./types.js";

/**
* Type guard to check if a given object is a valid JSON-RPC response.
*/
export function isJsonRpcResponse(res: unknown): res is JsonRpcResponse {
if (typeof res !== "object" || res === null) return false;
if (!("jsonrpc" in res) || res.jsonrpc !== "2.0") return false;
if (
!("id" in res) ||
(typeof res.id !== "string" &&
typeof res.id !== "number" &&
res.id !== null)
)
return false;

if ("result" in res) {
// Check for JsonRpcSuccessResponse
return !("error" in res);
} else if ("error" in res) {
// Check for JsonRpcErrorResponse
const error = (res as JsonRpcErrorResponse).error;
return (
typeof error === "object" &&
error !== null &&
"code" in error &&
typeof error.code === "number" &&
"message" in error &&
typeof error.message === "string"
);
}

return false;
}

/**
* Error class that is thrown if a remote method returns an error.
*/
Expand All @@ -32,15 +66,18 @@ export type RpcTransport = (
abortSignal: AbortSignal
) => Promise<JsonRpcResponse>;

export type RpcUuid = () => number | string;

type RpcClientOptions =
| string
| (FetchOptions & {
transport?: RpcTransport;
| ((FetchOptions | { transport: RpcTransport }) & {
transcoder?: RpcTranscoder<any>;
uuid?: RpcUuid;
});

type FetchOptions = {
url: string;
transport?: never;
credentials?: RequestCredentials;
getHeaders?():
| Record<string, string>
Expand All @@ -64,11 +101,23 @@ const identityTranscoder: RpcTranscoder<any> = {
};

export function rpcClient<T extends object>(options: RpcClientOptions) {
let transport: RpcTransport;
let transcoder: RpcTranscoder<any> = identityTranscoder;
let uuid: RpcUuid | undefined;

if (typeof options === "string") {
options = { url: options };
transport = fetchTransport({ url: options });
} else if ("transport" in options && options.transport) {
transport = options.transport;
transcoder = options.transcoder || identityTranscoder;
uuid = options.uuid;
} else {
transport = fetchTransport(options);
transcoder = options.transcoder || identityTranscoder;
uuid = options.uuid;
}
const transport = options.transport || fetchTransport(options);
const { serialize, deserialize } = options.transcoder || identityTranscoder;

const { serialize, deserialize } = transcoder;

/**
* Send a request using the configured transport and handle the result.
Expand All @@ -78,23 +127,18 @@ export function rpcClient<T extends object>(options: RpcClientOptions) {
args: any[],
signal: AbortSignal
) => {
const req = createRequest(method, args);
const req = createRequest(method, args, uuid);
const raw = await transport(serialize(req as any), signal);
const res = deserialize(raw);
if (res?.jsonrpc !== "2.0") {
const res: unknown = deserialize(raw);
if (!isJsonRpcResponse(res)) {
throw new TypeError("Not a JSON-RPC 2.0 response");
}

if ("error" in res) {
const { code, message, data } = res.error;
throw new RpcError(message, code, data);
}

if ("result" in res) {
} else {
return res.result;
}

throw new TypeError("Invalid response");
};

// Map of AbortControllers to abort pending requests
Expand Down Expand Up @@ -137,10 +181,10 @@ export function rpcClient<T extends object>(options: RpcClientOptions) {
/**
* Create a JsonRpcRequest for the given method.
*/
export function createRequest(method: string, params?: any[]): JsonRpcRequest {
export function createRequest(method: string, params?: any[], uuid?: RpcUuid): JsonRpcRequest {
const req: JsonRpcRequest = {
jsonrpc: "2.0",
id: Date.now(),
id: uuid ? uuid() : Date.now().toString(36) + Math.random().toString(36).substring(2),
method,
};

Expand Down Expand Up @@ -184,117 +228,3 @@ export function fetchTransport(options: FetchOptions): RpcTransport {
return await res.json();
};
}

export type WebSocketTransportOptions = {
/**
* The URL to connect to.
*/
url: string;
/**
* Reconnection timeout in milliseconds. Default is 1000ms.
* Set to 0 to disable reconnection.
*/
reconnectTimeout?: number;
/**
* The timeout in milliseconds for requests.
* Default is 60_000ms.
*/
timeout?: number;
/**
* Error handler for incoming messages.
*/
onMessageError?: (err: unknown) => void;
/**
* WebSocket open handler.
* Use to access the WebSocket instance.
*/
onOpen?: (ev: Event, ws: WebSocket) => void;
};

export function websocketTransport(options: WebSocketTransportOptions): RpcTransport {
type Request = { resolve: Function, reject: Function, timeoutId?: ReturnType<typeof setTimeout> };
const requests = new Map<string | number, Request>();
const timeout = options.timeout ?? 60_000;

let ws: WebSocket;
function connect() {
ws = new WebSocket(options.url.replace("http", "ws"));

ws.addEventListener('open', (e) => {
options.onOpen?.(e, ws);
});

ws.addEventListener('message', (e) => {
const raw = e.data.toString();
const res = JSON.parse(raw) as JsonRpcResponse;

if (typeof res.id !== "string" && typeof res.id !== "number") {
options.onMessageError?.(new TypeError("Invalid response (missing id)"));
return;
}

const request = requests.get(res.id);
if (!request) {
options.onMessageError?.(new Error("Request not found for id: " + res.id));
return;
}

requests.delete(res.id);
if (request.timeoutId) {
clearTimeout(request.timeoutId);
}

request.resolve(raw);
});

ws.addEventListener('close', (e) => {
const reconnectTimeout = options.reconnectTimeout ?? 1000;
if (reconnectTimeout !== 0 && !e.wasClean) {
setTimeout(connect, reconnectTimeout);
}
});

ws.addEventListener('error', () => {
ws.close();
});
}

connect();

return async (req, signal): Promise<any> => {
const _req: JsonRpcRequest = typeof req === 'string' ? JSON.parse(req) : req;

if (typeof _req.id !== 'string' && typeof _req.id !== 'number') { // skip notifications
return;
}

const requestId = _req.id;

if (requests.has(requestId)) {
throw new RpcError("Request already exists", -32000);
}

const res = await new Promise((resolve, reject) => {
const request: Request = { resolve, reject };

if (timeout > 0) {
request.timeoutId = setTimeout(() => {
reject(new RpcError("Request timed out", -32000));
}, timeout);
}

signal.onabort = () => {
if (request.timeoutId) {
clearTimeout(request.timeoutId);
}
reject(new RpcError("Request aborted", -32000));
};

requests.set(requestId, request);

ws.send(req);
});

return res;
};
}
6 changes: 6 additions & 0 deletions src/e2e/BrokenService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface BrokenService {
sendInvalidVersion(): Promise<void>;
sendInvalidJSON(): Promise<void>;
sendUnknownID(): Promise<void>;
sendServerError(): Promise<void>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ export class RequestAwareService implements Service {
throw err;
}

async sleep(ms: number) {
await new Promise((resolve) => setTimeout(resolve, ms));
return "Operation completed";
}

echoHeader(name: string) {
return this.headers?.[name.toLowerCase()] ?? null;
}
Expand Down
File renamed without changes.
Loading