From 4007a96ce181fed32154eb1d3ddd0e7d04c49abf Mon Sep 17 00:00:00 2001 From: Embbnux Ji Date: Sat, 16 Apr 2022 00:31:07 +0800 Subject: [PATCH] feat: support to stream websocket request (#4) * feat: support to proxy ws request * bugfix and refactor * rename SocketResponse * chore: update readme * chore: update readme --- README.md | 55 +++++++++++-------- lib.js | 49 ++++++++++++++--- package.json | 2 +- server.js | 150 +++++++++++++++++++++++++++++++++++++++------------ 4 files changed, 194 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index bcf44d6..ec6a198 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,12 @@ # Lite HTTP Tunnel -A HTTP Tunnel tool to help you expose local HTTP server behind a NAT or firewall to the internet. Inspired by [Ngrok](https://github.com/inconshreveable/ngrok). +A HTTP Tunnel tool to help you expose local HTTP/WebSocket server behind a NAT or firewall to the internet. Inspired by [Ngrok](https://github.com/inconshreveable/ngrok) and [node-http-proxy](https://github.com/http-party/node-http-proxy). ![http tunnel](https://user-images.githubusercontent.com/7036536/155876708-f30f4921-c8c8-463d-8917-c4f932d3b2e6.png) - ## How it work -The tunnel is based on `WebSocket`. We have a `WebSocket` connection between the client and server to stream HTTP request from public server to your local server. +The tunnel is based on `WebSocket`. We have a `WebSocket` connection between the client and server to stream HTTP/WebSocket requests from public server to your local server. ## Usage @@ -29,40 +28,53 @@ You can also generate JWT Token in your local by following code [here](https://g ### Setup Client -Please install `lite-http-tunnel` client in your local computer where it can access your local HTTP server. - -``` -$ npm i -g lite-http-tunnel -$ lite-http-tunnel -h -``` - -Config remote public server address: +#### Config remote public server address: -``` +```shell $ lite-http-tunnel config server https://your_web_host_domain ``` -Config jwt token that you got from server: +#### Config jwt token that you got from server: -``` +```shell $ lite-http-tunnel config jwt your_jwt_token ``` -Start client +#### Or With specified profile +```shell +$ lite-http-tunnel config server https://your_web_host_domain -p profile1 +$ lite-http-tunnel config jwt your_jwt_token -p profile1 ``` + +#### Start client + +```shell $ lite-http-tunnel start your_local_server_port ``` -Please replace your_local_server_port with your local HTTP server port, eg: `3000`. + +Please replace your_local_server_port with your local HTTP server port, eg: `8080`. After that you can access your local HTTP server by access `your_public_server_domain`. -Change origin +#### Start with specified profile: +```shell +$ lite-http-tunnel start your_local_server_port -p profile1 ``` + +#### Change origin to local server: + +```shell $ lite-http-tunnel start your_local_server_port -o localhost:5000 ``` +#### Change local server host: + +```shell +$ lite-http-tunnel start your_local_server_port -h localhost1 +``` + ## Multiple Clients The server steams HTTP request to WebSocket connection which has same host value in request headers. @@ -74,15 +86,15 @@ For example, you have `https://app1.test.com` and `https://app2.test.com` for th In client 1: ``` -$ lite-http-tunnel config server https://app1.test.com -p app1 -$ lite-http-tunnel start your_local_server_port -p app1 +$ lite-http-tunnel config server https://app1.test.com -p profile1 +$ lite-http-tunnel start your_local_server_port -p profile1 ``` In client 2: ``` -$ lite-http-tunnel config server https://app2.test.com -p app2 -$ lite-http-tunnel start your_local_server_port -p app2 +$ lite-http-tunnel config server https://app2.test.com -p profile2 +$ lite-http-tunnel start your_local_server_port -p profile2 ``` ## Related @@ -93,4 +105,3 @@ A introduce article: [Building a HTTP Tunnel with WebSocket and Node.JS](https:/ - [ ] Add tests - [ ] Support multiple clients based on request path prefix -- [ ] Support to stream WebSocket request diff --git a/lib.js b/lib.js index 91d54f1..865c52d 100644 --- a/lib.js +++ b/lib.js @@ -1,7 +1,7 @@ -const { Writable, Readable } = require('stream'); +const { Writable, Duplex } = require('stream'); -class SocketRequest extends Writable { +class TunnelRequest extends Writable { constructor({ socket, requestId, request }) { super(); this._socket = socket; @@ -42,7 +42,7 @@ class SocketRequest extends Writable { } } -class SocketResponse extends Readable { +class TunnelResponse extends Duplex { constructor({ socket, responseId }) { super(); this._socket = socket; @@ -51,7 +51,12 @@ class SocketResponse extends Readable { if (this._responseId === responseId) { this._socket.off('response', onResponse); this._socket.off('request-error', onRequestError); - this.emit('response', data.statusCode, data.statusMessage, data.headers); + this.emit('response', { + statusCode: data.statusCode, + statusMessage: data.statusMessage, + headers: data.headers, + httpVersion: data.httpVersion, + }); } } const onResponsePipe = (responseId, data) => { @@ -109,7 +114,39 @@ class SocketResponse extends Readable { } _read(size) {} + + _write(chunk, encoding, callback) { + this._socket.emit('response-pipe', this._responseId, chunk); + this._socket.conn.once('drain', () => { + callback(); + }); + } + + _writev(chunks, callback) { + this._socket.emit('response-pipes', this._responseId, chunks); + this._socket.conn.once('drain', () => { + callback(); + }); + } + + _final(callback) { + this._socket.emit('response-pipe-end', this._responseId); + this._socket.conn.once('drain', () => { + callback(); + }); + } + + _destroy(e, callback) { + if (e) { + this._socket.emit('response-pipe-error', this._responseId, e && e.message); + this._socket.conn.once('drain', () => { + callback(); + }); + return; + } + callback(); + } } -exports.SocketRequest = SocketRequest; -exports.SocketResponse = SocketResponse; +exports.TunnelRequest = TunnelRequest; +exports.TunnelResponse = TunnelResponse; diff --git a/package.json b/package.json index e951e08..2a70d63 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "lite-http-tunnel-server", - "version": "0.0.1", + "version": "0.1.0", "main": "server.js", "engines": { "node": ">=14.0" diff --git a/server.js b/server.js index b7c78bd..59f439f 100644 --- a/server.js +++ b/server.js @@ -7,17 +7,20 @@ const jwt = require('jsonwebtoken'); require('dotenv').config(); -const { SocketRequest, SocketResponse } = require('./lib'); +const { TunnelRequest, TunnelResponse } = require('./lib'); const app = express(); const httpServer = http.createServer(app); -const io = new Server(httpServer); +const webTunnelPath = '/$web_tunnel'; +const io = new Server(httpServer, { + path: webTunnelPath, +}); -let connectedSockets = {}; +let tunnelSockets = {}; io.use((socket, next) => { const connectHost = socket.handshake.headers.host; - if (connectedSockets[connectHost]) { + if (tunnelSockets[connectHost]) { return next(new Error(`${connectHost} has a existing connection`)); } if (!socket.handshake.auth || !socket.handshake.auth.token){ @@ -36,7 +39,7 @@ io.use((socket, next) => { io.on('connection', (socket) => { const connectHost = socket.handshake.headers.host; - connectedSockets[connectHost] = socket; + tunnelSockets[connectHost] = socket; console.log(`client connected at ${connectHost}`); const onMessage = (message) => { if (message === 'ping') { @@ -45,18 +48,11 @@ io.on('connection', (socket) => { } const onDisconnect = (reason) => { console.log('client disconnected: ', reason); - delete connectedSockets[connectHost]; - socket.off('message', onMessage); - socket.off('error', onError); - }; - const onError = (e) => { - delete connectedSockets[connectHost]; + delete tunnelSockets[connectHost]; socket.off('message', onMessage); - socket.off('disconnect', onDisconnect); }; socket.on('message', onMessage); socket.once('disconnect', onDisconnect); - socket.once('error', onError); }); app.use(morgan('tiny')); @@ -80,15 +76,15 @@ app.get('/tunnel_jwt_generator', (req, res) => { }); app.use('/', (req, res) => { - const connectedSocket = connectedSockets[req.headers.host]; - if (!connectedSocket) { + const tunnelSocket = tunnelSockets[req.headers.host]; + if (!tunnelSocket) { res.status(404); res.send('Not Found'); return; } const requestId = uuidV4(); - const socketRequest = new SocketRequest({ - socket: connectedSocket, + const tunnelRequest = new TunnelRequest({ + socket: tunnelSocket, requestId, request: { method: req.method, @@ -97,41 +93,129 @@ app.use('/', (req, res) => { }, }); const onReqError = (e) => { - socketRequest.destroy(new Error(e || 'Aborted')); + tunnelRequest.destroy(new Error(e || 'Aborted')); } req.once('aborted', onReqError); req.once('error', onReqError); - req.pipe(socketRequest); + req.pipe(tunnelRequest); req.once('finish', () => { req.off('aborted', onReqError); req.off('error', onReqError); }); - const socketResponse = new SocketResponse({ - socket: connectedSocket, + const tunnelResponse = new TunnelResponse({ + socket: tunnelSocket, responseId: requestId, }); const onRequestError = () => { - socketResponse.off('response', onResponse); - socketResponse.destroy(); + tunnelResponse.off('response', onResponse); + tunnelResponse.destroy(); res.status(502); res.end('Request error'); }; - const onResponse = (statusCode, statusMessage, headers) => { - socketRequest.off('requestError', onRequestError) + const onResponse = ({ statusCode, statusMessage, headers }) => { + tunnelRequest.off('requestError', onRequestError) res.writeHead(statusCode, statusMessage, headers); }; - socketResponse.once('requestError', onRequestError) - socketResponse.once('response', onResponse); - socketResponse.pipe(res); + tunnelResponse.once('requestError', onRequestError) + tunnelResponse.once('response', onResponse); + tunnelResponse.pipe(res); const onSocketError = () => { + res.off('close', onResClose); res.end(500); }; - socketResponse.once('error', onSocketError); - connectedSocket.once('close', onSocketError) - res.once('close', () => { - connectedSocket.off('close', onSocketError); - socketResponse.off('error', onSocketError); + const onResClose = () => { + tunnelSocket.off('disconnect', onSocketError); + }; + tunnelSocket.once('disconnect', onSocketError) + res.once('close', onResClose); +}); + +function createSocketHttpHeader(line, headers) { + return Object.keys(headers).reduce(function (head, key) { + var value = headers[key]; + + if (!Array.isArray(value)) { + head.push(key + ': ' + value); + return head; + } + + for (var i = 0; i < value.length; i++) { + head.push(key + ': ' + value[i]); + } + return head; + }, [line]) + .join('\r\n') + '\r\n\r\n'; +} + +httpServer.on('upgrade', (req, socket, head) => { + if (req.url.indexOf(webTunnelPath) === 0) { + return; + } + console.log(`WS ${req.url}`); + // proxy websocket request + const tunnelSocket = tunnelSockets[req.headers.host]; + if (!tunnelSocket) { + return; + } + if (head && head.length) socket.unshift(head); + const requestId = uuidV4(); + const tunnelRequest = new TunnelRequest({ + socket: tunnelSocket, + requestId, + request: { + method: req.method, + headers: { ...req.headers }, + path: req.url, + }, + }); + req.pipe(tunnelRequest); + const tunnelResponse = new TunnelResponse({ + socket: tunnelSocket, + responseId: requestId, }); + const onRequestError = () => { + tunnelResponse.off('response', onResponse); + tunnelResponse.destroy(); + socket.end(); + }; + const onResponse = ({ statusCode, statusMessage, headers, httpVersion }) => { + tunnelResponse.off('requestError', onRequestError); + if (statusCode) { + socket.once('error', (err) => { + console.log(`WS ${req.url} ERROR`); + // ignore error + }); + // not upgrade event + socket.write(createSocketHttpHeader(`HTTP/${httpVersion} ${statusCode} ${statusMessage}`, headers)); + tunnelResponse.pipe(socket); + return; + } + const onSocketError = (err) => { + console.log(`WS ${req.url} ERROR`); + socket.off('end', onSocketEnd); + tunnelSocket.off('disconnect', onTunnelError); + tunnelResponse.destroy(err); + }; + const onSocketEnd = () => { + console.log(`WS ${req.url} END`); + socket.off('error', onSocketError); + tunnelSocket.off('disconnect', onTunnelError); + tunnelResponse.destroy(); + }; + const onTunnelError = () => { + socket.off('error', onSocketError); + socket.off('end', onSocketEnd); + socket.end(); + tunnelResponse.destroy(); + }; + socket.once('error', onSocketError); + socket.once('end', onSocketEnd); + tunnelSocket.once('disconnect', onTunnelError); + socket.write(createSocketHttpHeader('HTTP/1.1 101 Switching Protocols', headers)); + tunnelResponse.pipe(socket).pipe(tunnelResponse); + } + tunnelResponse.once('requestError', onRequestError) + tunnelResponse.once('response', onResponse); }); httpServer.listen(process.env.PORT);