Skip to content

Commit

Permalink
fix: retry socket when hanging for too long
Browse files Browse the repository at this point in the history
  • Loading branch information
b-ma committed Sep 5, 2024
1 parent 838fb06 commit 00bab85
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 48 deletions.
6 changes: 5 additions & 1 deletion src/client/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ class Client {

this.#role = config.role;
this.#target = isBrowser() ? 'browser' : 'node';
this.#socket = new ClientSocket(this.#role, this.#config, { path: 'socket' });
this.#socket = new ClientSocket(this.#role, this.#config, {
path: 'socket',
retryConnectionRefusedTimeout: 1000,
retryHangingTimeout: 5 * 1000,
});
this.#contextManager = new ClientContextManager();
this.#pluginManager = new ClientPluginManager(this);
this.#stateManager = new ClientStateManager();
Expand Down
57 changes: 22 additions & 35 deletions src/client/ClientSocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,73 +118,60 @@ class ClientSocket {
const trySocket = async () => {
const ws = new WebSocket(url, webSocketOptions);

// If after a given delay, we receive neither an 'open' nor an 'error'
// message (e.g. hardware is not ready), let's just drop and recreate a new socket
// cf. https://github.com/collective-soundworks/soundworks/issues/97
const hangingTimeoutId = setTimeout(() => {
ws.terminate ? ws.terminate() : ws.close();
trySocket();
}, this.#socketOptions.retryHangingTimeout);

ws.addEventListener('open', openEvent => {
clearTimeout(hangingTimeoutId);
// parse incoming messages for pubsub
this.#socket = ws;

this.#socket.addEventListener('message', e => {
if (e.data === PING_MESSAGE) {
// heartbeat();
this.#socket.send(PONG_MESSAGE);
// do not propagate ping / pong messages
return;
return; // do not propagate ping pong messages
}

const [channel, args] = unpackStringMessage(e.data);
this.#dispatchEvent(channel, ...args);
});

// propagate "native" events
// `close`:
// Fired when a connection with a websocket is closed.
// Also available via the onclose property
// `error`:
// Fired when a connection with a websocket has been closed because of an error,
// such as whensome data couldn't be sent.
// Also available via the onerror property.
// `message`:
// Fired when data is received through a websocket.
// Also available via the onmessage property.
// `open`:
// Fired when a connection with a websocket is opened.
// Also available via the onopen property.
[
'close',
'error',
'upgrade',
'message',
].forEach(eventName => {
// - `close`: Fired when a connection with a websocket is closed.
// - `error`: Fired when a connection with a websocket has been closed
// because of an error, such as whensome data couldn't be sent.
// - `message`: Fired when data is received through a websocket.
// - `open`: Fired when a connection with a websocket is opened.
['close', 'error', 'upgrade', 'message'].forEach(eventName => {
this.#socket.addEventListener(eventName, e => {
this.#dispatchEvent(eventName, e);
});
})

// forward open event
// Forward open event and continue initialization
this.#dispatchEvent('open', openEvent);
// continue with raw socket
resolve();
});

// cf. https://github.com/collective-soundworks/soundworks/issues/17
ws.addEventListener('error', e => {
if (e.type === 'error') {
if (ws.terminate) {
ws.terminate();
} else {
ws.close();
}
clearTimeout(hangingTimeoutId);

// for node clients, retry connection
if (e.type === 'error') {
ws.terminate ? ws.terminate() : ws.close();
// Try to establish new connection in 1 second
if (e.error && e.error.code === 'ECONNREFUSED') {
// we want to log the warning just once
if (!connectionRefusedLogged) {
logger.log('[soundworks.Socket] Connection refused, waiting for the server to start');
// console.log(e.error);
connectionRefusedLogged = true;
}

// retry in 1 second
setTimeout(trySocket, 1000);
setTimeout(trySocket, this.#socketOptions.retryConnectionRefusedTimeout);
}
}
});
Expand Down
47 changes: 35 additions & 12 deletions tests/essentials/Socket.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ import { kSocketTerminate } from '../../src/client/ClientSocket.js';
import config from '../utils/config.js';

describe('# client::Socket', () => {
let server;
describe(`## "close" event`, () => {
let server;

beforeEach(async () => {
server = new Server(config);
await server.start();
});
beforeEach(async () => {
server = new Server(config);
await server.start();
});

afterEach(async () => {
if (server.status !== 'stopped') {
await server.stop();
}
});
afterEach(async () => {
if (server.status !== 'stopped') {
await server.stop();
}
});

describe(`## "close" event`, () => {
it('should be triggered when calling socket[kSocketTerminate]()', async () => {
const client = new Client({ role: 'test', ...config });
await client.start();
Expand Down Expand Up @@ -49,7 +49,7 @@ describe('# client::Socket', () => {
assert.equal(closeCalled, true);
});

it.skip('[long 12 seconds test, unskip manually] should be called when heartbeat is not received from server', async function() {
it.skip('[DEPRECATED] [long 12 seconds test, unskip manually] should be called when heartbeat is not received from server', async function() {
this.timeout(20 * 1000);
// server will send haertbeat ping to clients
server.sockets._DEBUG_PREVENT_HEARTBEAT = true;
Expand All @@ -65,6 +65,29 @@ describe('# client::Socket', () => {
assert.equal(closeCalled, 1);
});
});

describe('connect retry', () => {
it(`should connect when server is started later than client`, async function() {
this.timeout(4 * 1000);

let connected = false;
const client = new Client({ role: 'test', ...config });
client.start();
client.socket.addListener('open', () => {
connected = true;
});

await delay(2 * 1000);

const server = new Server(config);
await server.start();
// socket connection retry timeout is 1 second
await delay(1 * 1000);
await server.stop();

assert.isTrue(connected);
});
});
});


0 comments on commit 00bab85

Please sign in to comment.