Skip to content

Commit

Permalink
Merge pull request #207 from robtaussig/heartbeat_pr
Browse files Browse the repository at this point in the history
Heartbeat pr
  • Loading branch information
robtaussig authored Sep 28, 2023
2 parents dc320d4 + 3c646ad commit 2407b32
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 2 deletions.
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ A demo of this can be found [here](https://robtaussig.com/socket/). Each compone
- Multiple components can (optionally) use a single WebSocket, which is closed and cleaned up when all subscribed components have unsubscribed/unmounted
- Written in TypeScript
- Socket.io support
- Heartbeat support
- No more waiting for the WebSocket to open before messages can be sent. Pre-connection messages are queued up and sent on connection
- Provides direct access to unshared WebSockets, while proxying shared WebSockets. Proxied WebSockets provide subscribers controlled access to the underlying (shared) WebSocket, without allowing unsafe behavior
- Seamlessly works with server-sent-events and the [EventSource API](https://developer.mozilla.org/en-US/docs/Web/API/EventSource)
Expand Down Expand Up @@ -150,6 +151,12 @@ type UseWebSocket<T = unknown> = (
filter?: (message: WebSocketEventMap['message']) => boolean;
retryOnError?: boolean;
eventSourceOptions?: EventSourceInit;
heartbeat?: boolean | {
message?: "ping" | "pong" | string;
returnMessage?: "ping" | "pong" | string;
timeout?: number;
interval?: number;
};
} = {},
shouldConnect: boolean = true,
): {
Expand Down Expand Up @@ -348,6 +355,12 @@ interface Options {
};
protocols?: string | string[];
eventSourceOptions?: EventSourceInit;
heartbeat?: boolean | {
message?: "ping" | "pong" | string;
returnMessage?: "ping" | "pong" | string;
timeout?: number;
interval?: number;
};
}
```

Expand Down Expand Up @@ -408,6 +421,24 @@ const { sendMessage, lastMessage, readyState } = useSocketIO(

It is important to note that `lastMessage` will not be a `MessageEvent`, but instead an object with two keys: `type` and `payload`.

### heartbeat

If the `heartbeat` option is set to `true` or has additional options, the library will send a 'ping' message to the server every `interval` milliseconds. If no response is received within `timeout` milliseconds, indicating a potential connection issue, the library will close the connection. You can customize the 'ping' message by changing the `message` property in the `heartbeat` object. If a `returnMessage` is defined, it will be ignored so that it won't be set as the `lastMessage`.

```js
const { sendMessage, lastMessage, readyState } = useWebSocket(
'ws://localhost:3000',
{
heartbeat: {
message: 'ping',
returnMessage: 'pong',
timeout: 60000, // 1 minute, if no response is received, the connection will be closed
interval: 25000, // every 25 seconds, a ping message will be sent
},
}
);
```

### filter: Callback

If a function is provided with the key `filter`, incoming messages will be passed through the function, and only if it returns `true` will the hook pass along the `lastMessage` and update your component.
Expand Down
19 changes: 19 additions & 0 deletions src/lib/attach-listener.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { MutableRefObject } from 'react';
import { setUpSocketIOPing } from './socket-io';
import { heartbeat } from './heartbeat';
import {
DEFAULT_RECONNECT_LIMIT,
DEFAULT_RECONNECT_INTERVAL_MS,
Expand All @@ -19,11 +20,29 @@ const bindMessageHandler = (
optionsRef: MutableRefObject<Options>,
setLastMessage: Setters['setLastMessage'],
) => {
let heartbeatCb: () => void;

if (optionsRef.current.heartbeat && webSocketInstance instanceof WebSocket) {
const heartbeatOptions =
typeof optionsRef.current.heartbeat === "boolean"
? undefined
: optionsRef.current.heartbeat;
heartbeatCb = heartbeat(webSocketInstance, heartbeatOptions);
}

webSocketInstance.onmessage = (message: WebSocketEventMap['message']) => {
heartbeatCb?.();
optionsRef.current.onMessage && optionsRef.current.onMessage(message);
if (typeof optionsRef.current.filter === 'function' && optionsRef.current.filter(message) !== true) {
return;
}
if (
optionsRef.current.heartbeat &&
typeof optionsRef.current.heartbeat !== "boolean" &&
optionsRef.current.heartbeat?.returnMessage === message.data
)
return;

setLastMessage(message);
};
};
Expand Down
20 changes: 18 additions & 2 deletions src/lib/attach-shared-listeners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,23 @@ import { sharedWebSockets } from './globals';
import { DEFAULT_RECONNECT_LIMIT, DEFAULT_RECONNECT_INTERVAL_MS, ReadyState, isEventSourceSupported } from './constants';
import { getSubscribers } from './manage-subscribers';
import { MutableRefObject } from 'react';
import { Options, SendMessage, WebSocketLike } from './types';
import { HeartbeatOptions, Options, SendMessage, WebSocketLike } from './types';
import { setUpSocketIOPing } from './socket-io';
import { heartbeat } from './heartbeat';

const bindMessageHandler = (
webSocketInstance: WebSocketLike,
url: string,
heartbeatOptions?: boolean | HeartbeatOptions
) => {
let onMessageCb: () => void;

if (heartbeatOptions && webSocketInstance instanceof WebSocket) {
onMessageCb = heartbeat(webSocketInstance, typeof heartbeatOptions === 'boolean' ? undefined : heartbeatOptions);
}

webSocketInstance.onmessage = (message: WebSocketEventMap['message']) => {
onMessageCb?.();
getSubscribers(url).forEach(subscriber => {
if (subscriber.optionsRef.current.onMessage) {
subscriber.optionsRef.current.onMessage(message);
Expand All @@ -22,6 +31,13 @@ const bindMessageHandler = (
return;
}

if (
heartbeatOptions &&
typeof heartbeatOptions !== "boolean" &&
heartbeatOptions?.returnMessage === message.data
)
return;

subscriber.setLastMessage(message);
});
};
Expand Down Expand Up @@ -122,7 +138,7 @@ export const attachSharedListeners = (
interval = setUpSocketIOPing(sendMessage);
}

bindMessageHandler(webSocketInstance, url);
bindMessageHandler(webSocketInstance, url, optionsRef.current.heartbeat);
bindCloseHandler(webSocketInstance, url);
bindOpenHandler(webSocketInstance, url);
bindErrorHandler(webSocketInstance, url);
Expand Down
5 changes: 5 additions & 0 deletions src/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ export const SOCKET_IO_PING_CODE = '2';
export const DEFAULT_RECONNECT_LIMIT = 20;
export const DEFAULT_RECONNECT_INTERVAL_MS = 5000;
export const UNPARSABLE_JSON_OBJECT = {};
export const DEFAULT_HEARTBEAT = {
message: 'ping',
timeout: 60000,
interval: 25000,
};

export enum ReadyState {
UNINSTANTIATED = -1,
Expand Down
61 changes: 61 additions & 0 deletions src/lib/heartbeat.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { heartbeat } from "./heartbeat";

describe("heartbeat", () => {
let ws: WebSocket;
let sendSpy: jest.Mock;
let closeSpy: jest.Mock;
let addEventListenerSpy: jest.Mock;
jest.useFakeTimers();

beforeEach(() => {
sendSpy = jest.fn();
closeSpy = jest.fn();
addEventListenerSpy = jest.fn();
ws = {
send: sendSpy,
close: closeSpy,
addEventListener: addEventListenerSpy,
} as unknown as WebSocket;
});

afterEach(() => {
jest.clearAllMocks();
});

test("sends a ping message at the specified interval", () => {
heartbeat(ws, { interval: 100 });
expect(sendSpy).not.toHaveBeenCalled();
jest.advanceTimersByTime(99);
expect(sendSpy).not.toHaveBeenCalled();
jest.advanceTimersByTime(1);
expect(sendSpy).toHaveBeenCalledTimes(1);
jest.advanceTimersByTime(100);
expect(sendSpy).toHaveBeenCalledTimes(2);
});

test("closes the WebSocket if onMessageCb is not invoked within the specified timeout", () => {
heartbeat(ws, { timeout: 100 });
expect(closeSpy).not.toHaveBeenCalled();
jest.advanceTimersByTime(99);
expect(closeSpy).not.toHaveBeenCalled();
jest.advanceTimersByTime(1);
expect(closeSpy).toHaveBeenCalledTimes(1);
});

test("does not close the WebSocket if messageCallback is invoked within the specified timeout", () => {
const onMessageCb = heartbeat(ws, { timeout: 100 });
expect(closeSpy).not.toHaveBeenCalled();
jest.advanceTimersByTime(99);
onMessageCb();
expect(closeSpy).not.toHaveBeenCalled();
jest.advanceTimersByTime(1);
expect(closeSpy).not.toHaveBeenCalled();
});

test("sends the custom ping message", () => {
heartbeat(ws, { message: "pong" });
expect(sendSpy).not.toHaveBeenCalled();
jest.advanceTimersByTime(25000);
expect(sendSpy).toHaveBeenCalledWith("pong");
});
});
37 changes: 37 additions & 0 deletions src/lib/heartbeat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { DEFAULT_HEARTBEAT } from "./constants";
import { HeartbeatOptions } from "./types";

export function heartbeat(ws: WebSocket, options?: HeartbeatOptions): () => void {
const {
interval = DEFAULT_HEARTBEAT.interval,
timeout = DEFAULT_HEARTBEAT.timeout,
message = DEFAULT_HEARTBEAT.message,
} = options || {};

let messageAccepted = false;

const pingTimer = setInterval(() => {
try {
ws.send(message);
} catch (error) {
// do nothing
}
}, interval);

const timeoutTimer = setInterval(() => {
if (!messageAccepted) {
ws.close();
} else {
messageAccepted = false;
}
}, timeout);

ws.addEventListener("close", () => {
clearInterval(pingTimer);
clearInterval(timeoutTimer);
});

return () => {
messageAccepted = true;
};
}
8 changes: 8 additions & 0 deletions src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,18 @@ export interface Options {
retryOnError?: boolean;
eventSourceOptions?: EventSourceOnly;
skipAssert?: boolean;
heartbeat?: boolean | HeartbeatOptions;
}

export type EventSourceOnly = Omit<Options, 'eventSourceOptions'> & EventSourceInit;

export type HeartbeatOptions = {
message?: "ping" | "pong" | string;
returnMessage?: "ping" | "pong" | string;
timeout?: number;
interval?: number;
};

export interface EventSourceEventHandlers {
[eventName: string]: (message: EventSourceEventMap['message']) => void;
}
Expand Down
90 changes: 90 additions & 0 deletions src/lib/use-websocket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,4 +438,94 @@ test('Options#eventSourceOptions, if provided, instantiates an EventSource inste
expect(result.current.getWebSocket() instanceof EventSource).toBe(true);
});

test.each([false, true])('Options#heartbeat, if provided, sends a message to the server at the specified interval and works when share is %s', async (shareOption) => {
options.heartbeat = {
message: 'ping',
timeout: 10000,
interval: 500,
};
options.share = shareOption;

renderHook(() => useWebSocket(URL, options));

if (shareOption) {
renderHook(() => useWebSocket(URL, options));
}
await server.connected;
await sleep(1600);
await expect(server).toHaveReceivedMessages(["ping", "ping", "ping"]);
});

test.each([false, true])('Options#heartbeat, if provided, close websocket if no message is received from server within specified timeout and works when share is %s', async (shareOption) => {
options.heartbeat = {
message: 'ping',
timeout: 1000,
interval: 500,
};
options.share = shareOption;

const {
result,
} = renderHook(() => useWebSocket(URL, options));

if (shareOption) {
renderHook(() => useWebSocket(URL, options));
}
await server.connected;
await sleep(1600);
expect(result.current.readyState).toBe(WebSocket.CLOSED);
});

test.each([false, true])('Options#heartbeat, if provided, do not close websocket if a message is received from server within specified timeout and works when share is %s', async (shareOption) => {
options.heartbeat = {
message: 'ping',
timeout: 1000,
interval: 500,
};
options.share = shareOption;

const {
result,
} = renderHook(() => useWebSocket(URL, options));

if (shareOption) {
renderHook(() => useWebSocket(URL, options));
}

await server.connected;
server.send('ping')
await sleep(500);
server.send('ping')
await sleep(500);
server.send('ping')
await sleep(500);
server.send('ping')
await sleep(500);
expect(result.current.readyState).toBe(WebSocket.OPEN);
});

test.each([false, true])('Options#heartbeat, if provided, lastMessage is updated if server message does not matches the returnMessage property of heartbeatOptions and works when share is %s', async (shareOption) => {
options.heartbeat = {
message: 'ping',
returnMessage: 'pong',
timeout: 1000,
interval: 500,
};
options.share = shareOption;

const {
result,
} = renderHook(() => useWebSocket(URL, options));

if (shareOption) {
renderHook(() => useWebSocket(URL, options));
}

await server.connected;
server.send('pong');
expect(result.current.lastMessage?.data).toBe(undefined);
server.send('ping');
expect(result.current.lastMessage?.data).toBe('ping');
});

// //TODO: Write companion tests for useSocketIO

0 comments on commit 2407b32

Please sign in to comment.