From d33efcdcaaa4b274a38865da8744d9acc1341d7b Mon Sep 17 00:00:00 2001 From: Tim Perry Date: Mon, 24 Feb 2025 17:14:44 +0100 Subject: [PATCH] Fire passthrough-abort events for failing upstream requests --- src/rules/requests/request-handlers.ts | 27 ++++++- .../subscriptions/rule-events.spec.ts | 78 ++++++++++++++++++- 2 files changed, 101 insertions(+), 4 deletions(-) diff --git a/src/rules/requests/request-handlers.ts b/src/rules/requests/request-handlers.ts index 8ee6ee1b4..bdefb1b60 100644 --- a/src/rules/requests/request-handlers.ts +++ b/src/rules/requests/request-handlers.ts @@ -766,7 +766,7 @@ export class PassThroughHandler extends PassThroughHandlerDefinition { ...caConfig }, (serverRes) => (async () => { serverRes.on('error', (e: any) => { - e.causedByUpstreamError = true; + reportUpstreamAbort(e) reject(e); }); @@ -1042,7 +1042,7 @@ export class PassThroughHandler extends PassThroughHandlerDefinition { overridden: true, rawBody: upstreamBody }); - }); + }).catch((e) => reportUpstreamAbort(e)); } else { options.emitEventCallback('passthrough-response-body', { overridden: false @@ -1110,6 +1110,27 @@ export class PassThroughHandler extends PassThroughHandlerDefinition { serverReq.abort(); } + // If the upstream fails, for any reason, we need to fire an event to any rule + // listeners who might be present (although only the first time) + let reportedUpstreamError = false; + function reportUpstreamAbort(e: ErrorLike & { causedByUpstreamError?: true }) { + e.causedByUpstreamError = true; + + if (!options.emitEventCallback) return; + + if (reportedUpstreamError) return; + reportedUpstreamError = true; + + options.emitEventCallback('passthrough-abort', { + error: { + name: e.name, + code: e.code, + message: e.message, + stack: e.stack + } + }); + } + // Handle the case where the downstream connection is prematurely closed before // fully sending the request or receiving the response. clientReq.on('aborted', abortUpstream); @@ -1122,7 +1143,7 @@ export class PassThroughHandler extends PassThroughHandlerDefinition { }); serverReq.on('error', (e: any) => { - e.causedByUpstreamError = true; + reportUpstreamAbort(e); reject(e); }); diff --git a/test/integration/subscriptions/rule-events.spec.ts b/test/integration/subscriptions/rule-events.spec.ts index 5868997f4..c173d68f4 100644 --- a/test/integration/subscriptions/rule-events.spec.ts +++ b/test/integration/subscriptions/rule-events.spec.ts @@ -1,5 +1,6 @@ import * as _ from 'lodash'; import * as WebSocket from 'isomorphic-ws'; +import { PassThrough } from 'stream'; import { getLocal, @@ -10,7 +11,8 @@ import { delay, expect, fetch, - isNode + isNode, + nodeOnly } from "../../test-utils"; describe("Rule event susbcriptions", () => { @@ -227,6 +229,80 @@ describe("Rule event susbcriptions", () => { expect(responseBodyEvent.rawBody.toString('utf8')).to.equal('Original response body'); }); + + it("should fire abort event if upstream body response fails", async () => { + await remoteServer.forAnyRequest().thenCloseConnection(); + const forwardingRule = await server.forAnyRequest().thenForwardTo(remoteServer.url); + + const ruleEvents: RuleEvent[] = []; + await server.on('rule-event', (e) => ruleEvents.push(e)); + + await fetch(server.url).catch(() => {}); + + await delay(100); + expect(ruleEvents.length).to.equal(3); + + const requestId = (await forwardingRule.getSeenRequests())[0].id; + ruleEvents.forEach((event) => { + expect(event.ruleId).to.equal(forwardingRule.id); + expect(event.requestId).to.equal(requestId); + }); + + expect(ruleEvents.map(e => e.eventType)).to.deep.equal([ + 'passthrough-request-head', + 'passthrough-request-body', + 'passthrough-abort' + ]); + + const responseAbortEvent = ruleEvents[2].eventData; + expect(responseAbortEvent.error.name).to.equal('Error'); + expect(responseAbortEvent.error.message).to.equal('socket hang up'); + }); + + nodeOnly(() => { + it("should fire abort event if upstream body response fails", async () => { + const stream = new PassThrough(); + await remoteServer.forAnyRequest().thenStream(200, stream); + const forwardingRule = await server.forAnyRequest().thenForwardTo(remoteServer.url, { + transformResponse: { + replaceBody: 'replaced body' + } + }); + + const ruleEvents: RuleEvent[] = []; + await server.on('rule-event', (e) => ruleEvents.push(e)); + + const response = await fetch(server.url); + expect(response.status).to.equal(200); + + stream.emit('error', new Error()); // Hard-fail part way through the body response + await delay(10); + + expect(ruleEvents.length).to.equal(4); + + const requestId = (await forwardingRule.getSeenRequests())[0].id; + ruleEvents.forEach((event) => { + expect(event.ruleId).to.equal(forwardingRule.id); + expect(event.requestId).to.equal(requestId); + }); + + expect(ruleEvents.map(e => e.eventType)).to.deep.equal([ + 'passthrough-request-head', + 'passthrough-request-body', + 'passthrough-response-head', + 'passthrough-abort' + ]); + + const responseHeadEvent = ruleEvents[2].eventData; + expect(responseHeadEvent.statusCode).to.equal(200); // <-- Original status + + const responseAbortEvent = ruleEvents[3].eventData; + expect(responseAbortEvent.error.name).to.equal('Error'); + expect(responseAbortEvent.error.code).to.equal('ECONNRESET'); + expect(responseAbortEvent.error.message).to.equal('aborted'); + }); + }); + it("should fire for proxied websockets", async () => { await remoteServer.forAnyWebSocket().thenPassivelyListen(); const forwardingRule = await server.forAnyWebSocket().thenForwardTo(remoteServer.url);