Skip to content

Commit

Permalink
emit close on websocket (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney authored Jul 18, 2024
1 parent 8e9a316 commit 9bff878
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 3 deletions.
5 changes: 5 additions & 0 deletions .changeset/nasty-mugs-battle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-rsocket-router': patch
---

Fix issue where WebSocket close events would not immediately propagate to router handlers.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ export class WebsocketServerTransport implements ServerTransport {
try {
websocket.binaryType = 'nodebuffer';
const duplex = WebSocket.createWebSocketStream(websocket);
websocket.on('close', (e) => {
duplex.emit('close', e);
});
WebsocketDuplexConnection.create(duplex, connectionAcceptor, multiplexerDemultiplexerFactory, websocket);
} catch (ex) {
logger.error(`Could not create duplex connection`, ex);
Expand Down
20 changes: 17 additions & 3 deletions packages/rsocket-router/tests/src/socket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ describe('Sockets', () => {
wsCreator: () => server
});

const onCancelWrapper = (callback: () => void) => callback();
const serverCancelSpy = vi.fn(onCancelWrapper);

// Create a simple server which will spam a lot of data to any connection
const rSocketServer = new RSocketServer({
transport,
Expand All @@ -143,7 +146,9 @@ describe('Sockets', () => {
request: () => {},
onExtension: () => {},
cancel: () => {
stop = true;
serverCancelSpy(() => {
stop = true;
});
}
};
}
Expand All @@ -154,7 +159,8 @@ describe('Sockets', () => {
rSocketServer.bind();

// Try and connect 100 times, closing the socket as soon as it is available
for (let i = 0; i < 100; i++) {
const testCount = 100;
const promises = new Array(testCount).fill(null).map(async () => {
const testSocket = new WebSocket.WebSocket(WS_ADDRESS);

const connector = new RSocketConnector({
Expand All @@ -166,13 +172,18 @@ describe('Sockets', () => {
setup: {
dataMimeType: 'application/bson',
metadataMimeType: 'application/bson',

keepAlive: 20000,
lifetime: 30000,

payload: {
data: null
}
}
});

const connection = await connector.connect();

connection.requestStream({ data: null }, 1, {
onNext() {},
onComplete: () => {},
Expand All @@ -182,6 +193,9 @@ describe('Sockets', () => {

// The socket closing here should not throw any unhandled errors
testSocket.close();
}
});

await Promise.all(promises);
await vi.waitFor(() => expect(serverCancelSpy.mock.calls.length).equals(testCount), { timeout: 2000 });
});
});

0 comments on commit 9bff878

Please sign in to comment.