Skip to content

Commit

Permalink
Fire passthrough-abort events for failing upstream requests
Browse files Browse the repository at this point in the history
  • Loading branch information
pimterry committed Feb 24, 2025
1 parent 27b48bb commit d33efcd
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 4 deletions.
27 changes: 24 additions & 3 deletions src/rules/requests/request-handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
...caConfig
}, (serverRes) => (async () => {
serverRes.on('error', (e: any) => {
e.causedByUpstreamError = true;
reportUpstreamAbort(e)
reject(e);
});

Expand Down Expand Up @@ -1042,7 +1042,7 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
overridden: true,
rawBody: upstreamBody
});
});
}).catch((e) => reportUpstreamAbort(e));
} else {
options.emitEventCallback('passthrough-response-body', {
overridden: false
Expand Down Expand Up @@ -1110,6 +1110,27 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
serverReq.abort();
}

// If the upstream fails, for any reason, we need to fire an event to any rule
// listeners who might be present (although only the first time)
let reportedUpstreamError = false;
function reportUpstreamAbort(e: ErrorLike & { causedByUpstreamError?: true }) {
e.causedByUpstreamError = true;

if (!options.emitEventCallback) return;

if (reportedUpstreamError) return;
reportedUpstreamError = true;

options.emitEventCallback('passthrough-abort', {
error: {
name: e.name,
code: e.code,
message: e.message,
stack: e.stack
}
});
}

// Handle the case where the downstream connection is prematurely closed before
// fully sending the request or receiving the response.
clientReq.on('aborted', abortUpstream);
Expand All @@ -1122,7 +1143,7 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
});

serverReq.on('error', (e: any) => {
e.causedByUpstreamError = true;
reportUpstreamAbort(e);
reject(e);
});

Expand Down
78 changes: 77 additions & 1 deletion test/integration/subscriptions/rule-events.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as _ from 'lodash';
import * as WebSocket from 'isomorphic-ws';
import { PassThrough } from 'stream';

import {
getLocal,
Expand All @@ -10,7 +11,8 @@ import {
delay,
expect,
fetch,
isNode
isNode,
nodeOnly
} from "../../test-utils";

describe("Rule event susbcriptions", () => {
Expand Down Expand Up @@ -227,6 +229,80 @@ describe("Rule event susbcriptions", () => {
expect(responseBodyEvent.rawBody.toString('utf8')).to.equal('Original response body');
});


it("should fire abort event if upstream body response fails", async () => {
await remoteServer.forAnyRequest().thenCloseConnection();
const forwardingRule = await server.forAnyRequest().thenForwardTo(remoteServer.url);

const ruleEvents: RuleEvent<any>[] = [];
await server.on('rule-event', (e) => ruleEvents.push(e));

await fetch(server.url).catch(() => {});

await delay(100);
expect(ruleEvents.length).to.equal(3);

const requestId = (await forwardingRule.getSeenRequests())[0].id;
ruleEvents.forEach((event) => {
expect(event.ruleId).to.equal(forwardingRule.id);
expect(event.requestId).to.equal(requestId);
});

expect(ruleEvents.map(e => e.eventType)).to.deep.equal([
'passthrough-request-head',
'passthrough-request-body',
'passthrough-abort'
]);

const responseAbortEvent = ruleEvents[2].eventData;
expect(responseAbortEvent.error.name).to.equal('Error');
expect(responseAbortEvent.error.message).to.equal('socket hang up');
});

nodeOnly(() => {
it("should fire abort event if upstream body response fails", async () => {
const stream = new PassThrough();
await remoteServer.forAnyRequest().thenStream(200, stream);
const forwardingRule = await server.forAnyRequest().thenForwardTo(remoteServer.url, {
transformResponse: {
replaceBody: 'replaced body'
}
});

const ruleEvents: RuleEvent<any>[] = [];
await server.on('rule-event', (e) => ruleEvents.push(e));

const response = await fetch(server.url);
expect(response.status).to.equal(200);

stream.emit('error', new Error()); // Hard-fail part way through the body response
await delay(10);

expect(ruleEvents.length).to.equal(4);

const requestId = (await forwardingRule.getSeenRequests())[0].id;
ruleEvents.forEach((event) => {
expect(event.ruleId).to.equal(forwardingRule.id);
expect(event.requestId).to.equal(requestId);
});

expect(ruleEvents.map(e => e.eventType)).to.deep.equal([
'passthrough-request-head',
'passthrough-request-body',
'passthrough-response-head',
'passthrough-abort'
]);

const responseHeadEvent = ruleEvents[2].eventData;
expect(responseHeadEvent.statusCode).to.equal(200); // <-- Original status

const responseAbortEvent = ruleEvents[3].eventData;
expect(responseAbortEvent.error.name).to.equal('Error');
expect(responseAbortEvent.error.code).to.equal('ECONNRESET');
expect(responseAbortEvent.error.message).to.equal('aborted');
});
});

it("should fire for proxied websockets", async () => {
await remoteServer.forAnyWebSocket().thenPassivelyListen();
const forwardingRule = await server.forAnyWebSocket().thenForwardTo(remoteServer.url);
Expand Down

0 comments on commit d33efcd

Please sign in to comment.