Skip to content

Commit

Permalink
fix(server): Dispose of subscriptions on close even if added late to …
Browse files Browse the repository at this point in the history
…the subscriptions list (#534)

Closes #532
  • Loading branch information
enisdenjo authored Feb 12, 2024
1 parent e2603be commit e45d6b1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
49 changes: 48 additions & 1 deletion src/__tests__/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1841,9 +1841,56 @@ describe('Disconnect/close', () => {

client.ws.close(4321, 'Byebye');
});

it('should dispose of subscriptions on close even if added late to the subscriptions list', async () => {
let resolveOnOperation: () => void = () => {
throw new Error('On operation resolved early');
};
const waitForOnOperation = new Promise<void>(
(resolve) => (resolveOnOperation = resolve),
);
let resolveOperation: () => void = () => {
throw new Error('Operation resolved early');
};
const { url, waitForConnect, waitForComplete, waitForClientClose } =
await startTServer({
onOperation: () => {
resolveOnOperation();
return new Promise((resolve) => (resolveOperation = resolve));
},
});

const client = await createTClient(url);

client.ws.send(
stringifyMessage<MessageType.ConnectionInit>({
type: MessageType.ConnectionInit,
}),
);
await waitForConnect();

client.ws.send(
stringifyMessage<MessageType.Subscribe>({
type: MessageType.Subscribe,
id: '1',
payload: {
query: 'subscription { ping }',
},
}),
);

await waitForOnOperation;

client.ws.close(4321, 'Byebye');
await waitForClientClose();

resolveOperation();

await waitForComplete();
});
});

it('should only accept a Set, Array or string in handleProtocols', () => {
it('should only accept a Set, Array or string in handleProtocol', () => {
for (const test of [
{
in: new Set(['not', 'me']),
Expand Down
3 changes: 2 additions & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -848,8 +848,9 @@ export function makeServer<
// wait for close, cleanup and the disconnect callback
return async (code, reason) => {
if (connectionInitWait) clearTimeout(connectionInitWait);
for (const sub of Object.values(ctx.subscriptions)) {
for (const [id, sub] of Object.entries(ctx.subscriptions)) {
if (isAsyncGenerator(sub)) await sub.return(undefined);

This comment has been minimized.

Copy link
@andreasdamm-shure

andreasdamm-shure Feb 13, 2024

While awaiting for sub.return here, other pending subscription requests can complete without them having been removed first. That would still lead to orphan subscriptions. Also, operationResult.return(undefined) should be awaited (line 810).

This comment has been minimized.

Copy link
@enisdenjo

enisdenjo Feb 13, 2024

Author Owner

While awaiting for sub.return here, other pending subscription requests can complete without them having been removed first.

Hmm, how would that happen?

Also, operationResult.return(undefined) should be awaited (line 810).

The return at line 810 was intentionally not awaited because we didn't care about its result and want to proceed handling the subscribe message as soon as possible. However, we might want to await it still because we shouldn't emit the complete event before the subscription's actually completed. 🤔

This comment has been minimized.

Copy link
@andreasdamm-shure

andreasdamm-shure Feb 13, 2024

While awaiting for sub.return here, other pending subscription requests can complete without them having been removed first.

Hmm, how would that happen?

Even though the entries of ctx.subscriptions will be processed in the order they were created, there is no guarantee that they will complete in the same order. Every time execution is surrendered through an await other code can execute including the subscription completion for a subscription that has not yet been removed.

Instead a copy of ctx.subscriptions should be made followed by removing all entries. That copy should then be iterated over in the loop.

Also, operationResult.return(undefined) should be awaited (line 810).

The return at line 810 was intentionally not awaited because we didn't care about its result and want to proceed handling the subscribe message as soon as possible. However, we might want to await it still because we shouldn't emit the complete event before the subscription's actually completed. 🤔

Not worrying about the result is fine, but the promise needs to have a continuation hooked up (e.g. through .catch) as otherwise the entire process will exit on an unhandled promise rejection error (unless nodejs has been configured differently -- but default behavior is to exit).

This comment has been minimized.

Copy link
@enisdenjo

enisdenjo Mar 27, 2024

Author Owner

Thank you for the insights! I've covered this case now with f442288.

delete ctx.subscriptions[id]; // deleting the subscription means no further activity should take place
}
if (ctx.acknowledged) await onDisconnect?.(ctx, code, reason);
await onClose?.(ctx, code, reason);
Expand Down

0 comments on commit e45d6b1

Please sign in to comment.