From 9bff87831bccd04800662d26133e630211d9c5c3 Mon Sep 17 00:00:00 2001 From: stevensJourney <51082125+stevensJourney@users.noreply.github.com> Date: Thu, 18 Jul 2024 14:57:54 +0200 Subject: [PATCH] emit close on websocket (#44) --- .changeset/nasty-mugs-battle.md | 5 +++++ .../transport/WebSocketServerTransport.ts | 3 +++ .../rsocket-router/tests/src/socket.test.ts | 20 ++++++++++++++++--- 3 files changed, 25 insertions(+), 3 deletions(-) create mode 100644 .changeset/nasty-mugs-battle.md diff --git a/.changeset/nasty-mugs-battle.md b/.changeset/nasty-mugs-battle.md new file mode 100644 index 0000000..6f83eb2 --- /dev/null +++ b/.changeset/nasty-mugs-battle.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-rsocket-router': patch +--- + +Fix issue where WebSocket close events would not immediately propagate to router handlers. diff --git a/packages/rsocket-router/src/router/transport/WebSocketServerTransport.ts b/packages/rsocket-router/src/router/transport/WebSocketServerTransport.ts index 53e802b..39dd110 100644 --- a/packages/rsocket-router/src/router/transport/WebSocketServerTransport.ts +++ b/packages/rsocket-router/src/router/transport/WebSocketServerTransport.ts @@ -74,6 +74,9 @@ export class WebsocketServerTransport implements ServerTransport { try { websocket.binaryType = 'nodebuffer'; const duplex = WebSocket.createWebSocketStream(websocket); + websocket.on('close', (e) => { + duplex.emit('close', e); + }); WebsocketDuplexConnection.create(duplex, connectionAcceptor, multiplexerDemultiplexerFactory, websocket); } catch (ex) { logger.error(`Could not create duplex connection`, ex); diff --git a/packages/rsocket-router/tests/src/socket.test.ts b/packages/rsocket-router/tests/src/socket.test.ts index fb87482..c921478 100644 --- a/packages/rsocket-router/tests/src/socket.test.ts +++ b/packages/rsocket-router/tests/src/socket.test.ts @@ -124,6 +124,9 @@ describe('Sockets', () => { wsCreator: () => server }); + const onCancelWrapper = (callback: () => void) => callback(); + const serverCancelSpy = vi.fn(onCancelWrapper); + // Create a simple server which will spam a lot of data to any connection const rSocketServer = new RSocketServer({ transport, @@ -143,7 +146,9 @@ describe('Sockets', () => { request: () => {}, onExtension: () => {}, cancel: () => { - stop = true; + serverCancelSpy(() => { + stop = true; + }); } }; } @@ -154,7 +159,8 @@ describe('Sockets', () => { rSocketServer.bind(); // Try and connect 100 times, closing the socket as soon as it is available - for (let i = 0; i < 100; i++) { + const testCount = 100; + const promises = new Array(testCount).fill(null).map(async () => { const testSocket = new WebSocket.WebSocket(WS_ADDRESS); const connector = new RSocketConnector({ @@ -166,6 +172,10 @@ describe('Sockets', () => { setup: { dataMimeType: 'application/bson', metadataMimeType: 'application/bson', + + keepAlive: 20000, + lifetime: 30000, + payload: { data: null } @@ -173,6 +183,7 @@ describe('Sockets', () => { }); const connection = await connector.connect(); + connection.requestStream({ data: null }, 1, { onNext() {}, onComplete: () => {}, @@ -182,6 +193,9 @@ describe('Sockets', () => { // The socket closing here should not throw any unhandled errors testSocket.close(); - } + }); + + await Promise.all(promises); + await vi.waitFor(() => expect(serverCancelSpy.mock.calls.length).equals(testCount), { timeout: 2000 }); }); });