Skip to content

Commit

Permalink
🐛 Terminate socket reading on timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Tomas Dvorak <[email protected]>
  • Loading branch information
Tomas2D committed May 11, 2024
1 parent 873acdf commit eac5d0e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
17 changes: 13 additions & 4 deletions src/gateway/BaseGateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
extractSocketAddress,
identity,
logException,
raceWithAbort,
safeAsync,
} from "../utils.js";
import util from "node:util";
Expand Down Expand Up @@ -86,7 +87,11 @@ export abstract class BaseGateway extends EventEmitter {
return `[${this.constructor.name}]`;
}

protected async _waitForData(socket: Socket, size: number): Promise<string> {
protected async _waitForData(
socket: Socket,
size: number,
signal: AbortSignal,
): Promise<string> {
const logger = this._getSocketLogger(socket);
if (!socket.readable) {
logger.warn("Socket is not readable!", { socket, size });
Expand All @@ -95,6 +100,7 @@ export abstract class BaseGateway extends EventEmitter {

// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _ of setInterval(100)) {
signal.throwIfAborted();
const response: Buffer | null = socket.read(size);
if (response && response.length >= size) {
return response.toString();
Expand All @@ -108,9 +114,12 @@ export abstract class BaseGateway extends EventEmitter {
const logger = this._getSocketLogger(socket);
logger.debug(`reading ${size}B from the socket`);

let buffer = await Promise.race([
this._waitForData(socket, size),
setTimeout(this.options.socketFirstDataTimeout * 1000, null),
let buffer = await raceWithAbort([
(signal) => this._waitForData(socket, size, signal),
(signal) =>
setTimeout(this.options.socketFirstDataTimeout * 1000, null, {
signal,
}),
]);
if (!buffer) {
await this._closeSocket(socket);
Expand Down
27 changes: 23 additions & 4 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ export async function closeSocket(socket: Socket, timeout: number) {
socket.destroy();
return;
}
return await Promise.race([
util.promisify(socket.end.bind(socket))(),
setTimeout(timeout * 1000).then(() => {

await raceWithAbort([
util.promisify(socket.end.bind(socket)),
async (signal) => {
await setTimeout(timeout * 1000, null, { signal });
signal.throwIfAborted();
if (!socket.closed) {
socket.destroy();
}
}),
},
]);
};

Expand Down Expand Up @@ -120,3 +123,19 @@ export function extractSocketAddress(socket: Socket) {

return address.replace("::ffff:", "");
}

export class AbortError extends Error {
name = "AbortError";
}

export type RaceWithAbortHandler<T> = (signal: AbortSignal) => Promise<T>;
export async function raceWithAbort<T>(
handlers: readonly RaceWithAbortHandler<T>[],
): Promise<T> {
const controller = new AbortController();
const result = await Promise.race(
handlers.map((handler) => handler(controller.signal)),
);
controller.abort(new AbortError("Action has been cancelled!"));
return result;
}

0 comments on commit eac5d0e

Please sign in to comment.