Skip to content

Commit

Permalink
feat: support streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
cmorten committed Jan 28, 2024
1 parent b36e0b4 commit 2d1630c
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 23 deletions.
17 changes: 13 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Proxy middleware for Deno Opine HTTP servers.
</p>

```ts
import { proxy } from "https://deno.land/x/opineHttpProxy@3.1.0/mod.ts";
import { proxy } from "https://deno.land/x/opineHttpProxy@3.2.0/mod.ts";
import { opine } from "https://deno.land/x/[email protected]/mod.ts";

const app = opine();
Expand All @@ -41,7 +41,7 @@ Before importing, [download and install Deno](https://deno.land/#installation).
You can then import opine-http-proxy straight into your project:

```ts
import { proxy } from "https://deno.land/x/opineHttpProxy@3.1.0/mod.ts";
import { proxy } from "https://deno.land/x/opineHttpProxy@3.2.0/mod.ts";
```

## Docs
Expand Down Expand Up @@ -71,6 +71,15 @@ app.get(
);
```

### Streaming

Proxy requests and user responses are piped/streamed/chunked by default.

If you define a response modifier (`srcResDecorator`, `srcResHeaderDecorator`),
or need to inspect the response before continuing (`filterRes`), streaming is
disabled, and the request and response are buffered. This can cause performance
issues with large payloads.

### Proxy Options

You can also provide several options which allow you to filter, customize and
Expand Down Expand Up @@ -294,7 +303,7 @@ You can also use Promises:
app.use(
"/proxy",
proxy("localhost:3000", {
proxyReqOptDecorator(url, req) {
proxyReqUrlDecorator(url, req) {
return new Promise((resolve, reject) => {
if (url.pathname === "/login") {
url.port = 8080;
Expand Down Expand Up @@ -336,7 +345,7 @@ You can also use Promises:
app.use(
"/proxy",
proxy("www.google.com", {
proxyReqOptDecorator(proxyReqOpts, srcReq) {
proxyReqInitDecorator(proxyReqOpts, srcReq) {
return new Promise((resolve, reject) => {
proxyReqOpts.headers.set("Content-Type", "text/html");

Expand Down
2 changes: 1 addition & 1 deletion egg.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "opine-http-proxy",
"description": "Proxy middleware for Deno Opine HTTP servers.",
"version": "3.1.0",
"version": "3.2.0",
"repository": "https://github.com/cmorten/opine-http-proxy",
"stable": true,
"files": [
Expand Down
13 changes: 12 additions & 1 deletion src/resolveOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ export interface ProxyOptions
* @public
*/
method?: string;

/**
* @private
*/
stream?: boolean;
}

/**
Expand All @@ -181,7 +186,7 @@ function resolveBodyEncoding(reqBodyEncoding: "utf-8" | null | undefined) {
* @private
*/
export function resolveOptions(options: ProxyOptions = {}): ProxyOptions {
return {
const resolved: ProxyOptions = {
filterReq: options.filterReq,
proxyErrorHandler: options.proxyErrorHandler,
proxyReqUrlDecorator: options.proxyReqUrlDecorator,
Expand All @@ -197,4 +202,10 @@ export function resolveOptions(options: ProxyOptions = {}): ProxyOptions {
headers: options.headers,
timeout: options.timeout,
};

resolved.stream = !resolved.filterRes &&
!resolved.srcResDecorator &&
!resolved.srcResHeaderDecorator;

return resolved;
}
14 changes: 5 additions & 9 deletions src/steps/decorateSrcRes.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
// deno-lint-ignore-file no-explicit-any
import type { ProxyState } from "../createState.ts";
import { asBuffer } from "../requestOptions.ts";

const defaultDecorator = (
_req: any,
_res: any,
_proxyRes: Response,
proxyResData: any,
) => proxyResData;

export function decorateSrcRes(state: ProxyState) {
const resolverFn = state.options.srcResDecorator || defaultDecorator;
const resolverFn = state.options.srcResDecorator;

if (!resolverFn) {
return Promise.resolve(state);
}

const req = state.src.req;
const res = state.src.res;
Expand Down
8 changes: 5 additions & 3 deletions src/steps/decorateSrcResHeaders.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { ProxyState } from "../createState.ts";

const defaultDecorator = (headers: Headers): Headers => headers;

export function decorateSrcResHeaders(state: ProxyState) {
const resolverFn = state.options.srcResHeaderDecorator || defaultDecorator;
const resolverFn = state.options.srcResHeaderDecorator;

if (!resolverFn) {
return Promise.resolve(state);
}

const headers = state.src.res.headers || new Headers();

Expand Down
5 changes: 5 additions & 0 deletions src/steps/sendProxyReq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ export function sendProxyReq(state: ProxyState) {
: []),
]).then(async ([res]: Response[]) => {
state.proxy.res = res;

if (state.options.stream) {
return state;
}

const bufferedResponse = await res.arrayBuffer();
state.proxy.resData = bufferedResponse === null
? null
Expand Down
6 changes: 5 additions & 1 deletion src/steps/sendSrcRes.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import type { ProxyState } from "../createState.ts";

export function sendSrcRes(state: ProxyState) {
state.src.res.send(state.proxy.resData);
if (state.options.stream) {
state.src.res.send(state.proxy.res?.body);
} else {
state.src.res.send(state.proxy.resData);
}

return Promise.resolve(state);
}
152 changes: 152 additions & 0 deletions test/streaming.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// deno-lint-ignore-file no-explicit-any
import { describe, it } from "./support/utils.ts";
import { proxyTarget } from "./support/proxyTarget.ts";
import { expect, opine } from "./deps.ts";
import { proxy, ProxyOptions } from "../mod.ts";

function chunkingProxyServer() {
const proxyRouteFn = [{
method: "get",
path: "/stream",
fn: function (_req: any, res: any) {
let timer: number | undefined = undefined;
let counter = 0;

const body = new ReadableStream({
start(controller) {
timer = setInterval(() => {
if (counter > 3) {
clearInterval(timer);
controller.close();

return;
}

const message = `${counter}`;
controller.enqueue(new TextEncoder().encode(message));
counter++;
}, 50);
},

cancel() {
if (timer !== undefined) {
clearInterval(timer);
}
},
});

res.end(body);
},
}];

return proxyTarget({ port: 8309, timeout: 1000, handlers: proxyRouteFn });
}

const decoder = new TextDecoder();

async function simulateUserRequest() {
const response = await fetch("http://localhost:8308/stream");
const chunks = [];

for await (const chunk of response.body!) {
const decodedChunk = decoder.decode(chunk);
chunks.push(decodedChunk);
}

return chunks;
}

function startLocalServer(proxyOptions: ProxyOptions) {
const app = opine();

app.get("/stream", proxy("http://localhost:8309", proxyOptions));

return app.listen(8308);
}

describe("streams / piped requests", function () {
describe("when streaming options are truthy", function () {
const TEST_CASES = [{
name: "vanilla, no options defined",
options: {},
}, {
name: "proxyReqOptDecorator is defined",
options: {
proxyReqInitDecorator: function (reqInit: any) {
return reqInit;
},
},
}, {
name: "proxyReqOptDecorator is a Promise",
options: {
proxyReqInitDecorator: function (reqInit: any) {
return Promise.resolve(reqInit);
},
},
}];

TEST_CASES.forEach(function (testCase) {
describe(testCase.name, function () {
it(
testCase.name +
": chunks are received without any buffering, e.g. before request end",
function (done) {
const targetServer = chunkingProxyServer();
const server = startLocalServer(testCase.options);

simulateUserRequest()
.then(function (res) {
expect(res instanceof Array).toBeTruthy();
expect(res).toHaveLength(4);

server.close();
targetServer.close();
done();
})
.catch((error) => {
server.close();
targetServer.close();

done(error);
});
},
);
});
});
});

describe("when streaming options are falsy", function () {
const TEST_CASES = [{
name: "filterRes is defined",
options: {
filterRes: function () {
return false;
},
},
}];

TEST_CASES.forEach(function (testCase) {
describe(testCase.name, function () {
it("response arrives in one large chunk", function (done) {
const targetServer = chunkingProxyServer();
const server = startLocalServer(testCase.options);

simulateUserRequest()
.then(function (res) {
expect(res instanceof Array).toBeTruthy();
expect(res).toHaveLength(1);

server.close();
targetServer.close();
done();
})
.catch((error) => {
server.close();
targetServer.close();
done(error);
});
});
});
});
});
});
32 changes: 29 additions & 3 deletions test/support/proxyTarget.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,48 @@
// deno-lint-ignore-file no-explicit-any
import { json, opine, urlencoded } from "../deps.ts";
import { ErrorRequestHandler, json, opine, urlencoded } from "../deps.ts";

export function proxyTarget(
{ port = 0, handlers }: {
{ port = 0, timeout = 100, handlers }: {
port?: number;
timeout?: number;
handlers?: any;
} = { port: 0 },
} = { port: 0, timeout: 100 },
) {
const target = opine();

target.use(urlencoded());
target.use(json());

target.use(function (_req, _res, next) {
setTimeout(function () {
next();
}, timeout);
});

if (handlers) {
handlers.forEach((handler: any) => {
(target as any)[handler.method](handler.path, handler.fn);
});
}

target.get("/get", function (_req, res) {
res.send("OK");
});

target.use("/headers", function (req, res) {
res.json({ headers: req.headers });
});

target.use(
(function (err, _req, res, next) {
res.send(err);
next();
}) as ErrorRequestHandler,
);

target.use(function (_req, res) {
res.sendStatus(404);
});

return target.listen(port);
}
2 changes: 1 addition & 1 deletion version.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* Version of opine-http-proxy.
*/
export const VERSION = "3.1.0";
export const VERSION = "3.2.0";

/**
* Supported versions of Deno.
Expand Down

0 comments on commit 2d1630c

Please sign in to comment.