From e45d6b1eb916ee8a8bece4f8648e181de9a54a32 Mon Sep 17 00:00:00 2001 From: Denis Badurina Date: Mon, 12 Feb 2024 18:17:14 +0100 Subject: [PATCH] fix(server): Dispose of subscriptions on close even if added late to the subscriptions list (#534) Closes #532 --- src/__tests__/server.ts | 49 ++++++++++++++++++++++++++++++++++++++++- src/server.ts | 3 ++- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/src/__tests__/server.ts b/src/__tests__/server.ts index 74758e7b..a6a9fb52 100644 --- a/src/__tests__/server.ts +++ b/src/__tests__/server.ts @@ -1841,9 +1841,56 @@ describe('Disconnect/close', () => { client.ws.close(4321, 'Byebye'); }); + + it('should dispose of subscriptions on close even if added late to the subscriptions list', async () => { + let resolveOnOperation: () => void = () => { + throw new Error('On operation resolved early'); + }; + const waitForOnOperation = new Promise( + (resolve) => (resolveOnOperation = resolve), + ); + let resolveOperation: () => void = () => { + throw new Error('Operation resolved early'); + }; + const { url, waitForConnect, waitForComplete, waitForClientClose } = + await startTServer({ + onOperation: () => { + resolveOnOperation(); + return new Promise((resolve) => (resolveOperation = resolve)); + }, + }); + + const client = await createTClient(url); + + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + await waitForConnect(); + + client.ws.send( + stringifyMessage({ + type: MessageType.Subscribe, + id: '1', + payload: { + query: 'subscription { ping }', + }, + }), + ); + + await waitForOnOperation; + + client.ws.close(4321, 'Byebye'); + await waitForClientClose(); + + resolveOperation(); + + await waitForComplete(); + }); }); -it('should only accept a Set, Array or string in handleProtocols', () => { +it('should only accept a Set, Array or string in handleProtocol', () => { for (const test of [ { in: new Set(['not', 'me']), diff --git a/src/server.ts b/src/server.ts index 3b630f8c..ff116a7f 100644 --- a/src/server.ts +++ b/src/server.ts @@ -848,8 +848,9 @@ export function makeServer< // wait for close, cleanup and the disconnect callback return async (code, reason) => { if (connectionInitWait) clearTimeout(connectionInitWait); - for (const sub of Object.values(ctx.subscriptions)) { + for (const [id, sub] of Object.entries(ctx.subscriptions)) { if (isAsyncGenerator(sub)) await sub.return(undefined); + delete ctx.subscriptions[id]; // deleting the subscription means no further activity should take place } if (ctx.acknowledged) await onDisconnect?.(ctx, code, reason); await onClose?.(ctx, code, reason);