Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce typescript #68

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ node_modules/
coverage/
~$*
docs/logo.psd
dist
11 changes: 7 additions & 4 deletions index.js → index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
const RPCClient = require('./lib/client');
const RPCServer = require('./lib/server');
//@ts-ignore
import RPCClient from './lib/client';
import RPCServer from './lib/server';
const errors = require('./lib/errors');
const symbols = require('./lib/symbols');
const { createRPCError } = require('./lib/util');
const { createValidator } = require('./lib/validator');

module.exports = {
export { RPCClient, RPCServer };

export default {
RPCServer,
RPCClient,
createRPCError,
createValidator,
...errors,
...symbols,
};
};
126 changes: 84 additions & 42 deletions lib/server.js → lib/server.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,57 @@
const {EventEmitter, once} = require('events');
const {WebSocketServer, OPEN, CLOSING, CLOSED} = require('ws');
const {createServer} = require('http');
import { EventEmitter, once } from 'events';
const { WebSocketServer, OPEN, CLOSING, CLOSED } = require('ws');
const { createServer } = require('http');
const RPCServerClient = require('./server-client');
const { abortHandshake, parseSubprotocols } = require('./ws-util');
const standardValidators = require('./standard-validators');
const { getPackageIdent } = require('./util');
const { WebsocketUpgradeError } = require('./errors');

interface Options {
wssOptions: Object,
protocols: string[],
callTimeoutMs: number,
pingIntervalMs: number,
deferPingsOnActivity: boolean,
respondWithDetailedErrors: boolean,
callConcurrency: number,
maxBadMessages: number,
strictMode: boolean,
strictModeValidators: any[],
}


interface ListenOptions {
signal?: AbortSignal,
}

interface CloseOptions {
code?: number,
reason?: string,
awaitPending?: boolean,
force?: boolean,
}

type AuthCallback = (
accept: (session?: any, protocol?: string | false) => void,
reject: (code?: number, message?: string) => void,
handshake: any,
signal: AbortSignal
) => void;

class RPCServer extends EventEmitter {
constructor(options) {
_state: number;
_clients: Set<any>;
_pendingUpgrades: WeakMap<any, any>;
_options: Options;
_wss: any;
_httpServerAbortControllers: Set<AbortController>;
_strictValidators?: Map<string, any>;
authCallback?: AuthCallback;

constructor(options: Partial<Options>) {
super();

this._httpServerAbortControllers = new Set();
this._state = OPEN;
this._clients = new Set();
Expand All @@ -20,8 +61,8 @@ class RPCServer extends EventEmitter {
// defaults
wssOptions: {},
protocols: [],
callTimeoutMs: 1000*30,
pingIntervalMs: 1000*30,
callTimeoutMs: 1000 * 30,
pingIntervalMs: 1000 * 30,
deferPingsOnActivity: false,
respondWithDetailedErrors: false,
callConcurrency: 1,
Expand All @@ -35,18 +76,18 @@ class RPCServer extends EventEmitter {
this._wss = new WebSocketServer({
...this._options.wssOptions,
noServer: true,
handleProtocols: (protocols, request) => {
const {protocol} = this._pendingUpgrades.get(request);
handleProtocols: (protocols: any, request: any) => {
const { protocol } = this._pendingUpgrades.get(request);
return protocol;
},
});

this._wss.on('headers', h => h.push(`Server: ${getPackageIdent()}`));
this._wss.on('error', err => this.emit('error', err));
this._wss.on('headers', (h: any) => h.push(`Server: ${getPackageIdent()}`));
this._wss.on('error', (err: any) => this.emit('error', err));
this._wss.on('connection', this._onConnection.bind(this));
}
reconfigure(options) {

reconfigure(options: Partial<Options>) {
const newOpts = Object.assign({}, this._options, options);

if (newOpts.strictMode && !newOpts.protocols?.length) {
Expand All @@ -62,15 +103,15 @@ class RPCServer extends EventEmitter {
svs.set(v.subprotocol, v);
return svs;
}, new Map());
let strictProtocols = [];

let strictProtocols: any[] = [];
if (Array.isArray(newOpts.strictMode)) {
strictProtocols = newOpts.strictMode;
} else if (newOpts.strictMode) {
strictProtocols = newOpts.protocols;
}

const missingValidator = strictProtocols.find(protocol => !this._strictValidators.has(protocol));
const missingValidator = strictProtocols.find(protocol => !this._strictValidators?.has(protocol));
if (missingValidator) {
throw Error(`Missing strictMode validator for subprotocol '${missingValidator}'`);
}
Expand All @@ -79,18 +120,18 @@ class RPCServer extends EventEmitter {
}

get handleUpgrade() {
return async (request, socket, head) => {
return async (request: any, socket: any, head: any) => {

let resolved = false;

const ac = new AbortController();
const {signal} = ac;
const { signal } = ac;

const url = new URL('http://localhost' + (request.url || '/'));
const pathParts = url.pathname.split('/');
const identity = decodeURIComponent(pathParts.pop());
const identity = decodeURIComponent(pathParts.pop() || '');

const abortUpgrade = (error) => {
const abortUpgrade = (error: any) => {
resolved = true;

if (error && error instanceof WebsocketUpgradeError) {
Expand All @@ -110,25 +151,25 @@ class RPCServer extends EventEmitter {
}
};

socket.on('error', (err) => {
socket.on('error', (err: any) => {
abortUpgrade(err);
});

try {
if (this._state !== OPEN) {
throw new WebsocketUpgradeError(500, "Server not open");
}

if (socket.readyState !== 'open') {
throw new WebsocketUpgradeError(400, `Client readyState = '${socket.readyState}'`);
}

const headers = request.headers;

if (headers.upgrade.toLowerCase() !== 'websocket') {
throw new WebsocketUpgradeError(400, "Can only upgrade websocket upgrade requests");
}

const endpoint = pathParts.join('/') || '/';
const remoteAddress = request.socket.remoteAddress;
const protocols = ('sec-websocket-protocol' in request.headers)
Expand Down Expand Up @@ -174,10 +215,10 @@ class RPCServer extends EventEmitter {
password,
};

const accept = (session, protocol) => {
const accept = (session: any, protocol: any) => {
if (resolved) return;
resolved = true;

try {
if (socket.readyState !== 'open') {
throw new WebsocketUpgradeError(400, `Client readyState = '${socket.readyState}'`);
Expand All @@ -197,7 +238,7 @@ class RPCServer extends EventEmitter {
handshake
});

this._wss.handleUpgrade(request, socket, head, ws => {
this._wss.handleUpgrade(request, socket, head, (ws: any) => {
this._wss.emit('connection', ws, request);
});
} catch (err) {
Expand Down Expand Up @@ -227,7 +268,7 @@ class RPCServer extends EventEmitter {
signal
);
} else {
accept();
(accept as any)();
}

} catch (err) {
Expand All @@ -236,13 +277,13 @@ class RPCServer extends EventEmitter {
};
}

async _onConnection(websocket, request) {
async _onConnection(websocket: any, request: any) {
try {
if (this._state !== OPEN) {
throw Error("Server is no longer open");
}

const {handshake, session} = this._pendingUpgrades.get(request);
const { handshake, session } = this._pendingUpgrades.get(request);

const client = new RPCServerClient({
identity: handshake.identity,
Expand All @@ -266,50 +307,51 @@ class RPCServer extends EventEmitter {
client.once('close', () => this._clients.delete(client));
this.emit('client', client);

} catch (err) {
} catch (error) {
const err = error as { statusCode: number, message: string };
websocket.close(err.statusCode || 1000, err.message);
}
}

auth(cb) {
auth(cb: AuthCallback) {
this.authCallback = cb;
}

async listen(port, host, options = {}) {
async listen(port: number, host?: string, options: ListenOptions = {}) {
const ac = new AbortController();
this._httpServerAbortControllers.add(ac);
if (options.signal) {
once(options.signal, 'abort').then(() => {
ac.abort(options.signal.reason);
ac.abort(options.signal!.reason);
});
}
const httpServer = createServer({
noDelay: true,
}, (req, res) => {
}, (req: any, res: any) => {
res.setHeader('Server', getPackageIdent());
res.statusCode = 404;
res.end();
});
httpServer.on('upgrade', this.handleUpgrade);
httpServer.once('close', () => this._httpServerAbortControllers.delete(ac));
await new Promise((resolve, reject) => {
await new Promise<void>((resolve, reject) => {
httpServer.listen({
port,
host,
signal: ac.signal,
}, err => err ? reject(err) : resolve());
}, (err: any) => err ? reject(err) : resolve());
});
return httpServer;
}

async close({code, reason, awaitPending, force} = {}) {
async close({ code, reason, awaitPending, force }: CloseOptions = {}) {
if (this._state === OPEN) {
this._state = CLOSING;
this.emit('closing');
code = code ?? 1001;
await Array.from(this._clients).map(cli => cli.close({code, reason, awaitPending, force}));
await new Promise((resolve, reject) => {
this._wss.close(err => err ? reject(err) : resolve());
await Array.from(this._clients).map((client: any) => client.close({ code, reason, awaitPending, force }));
await new Promise<void>((resolve, reject) => {
this._wss.close((err: any) => err ? reject(err) : resolve());
this._httpServerAbortControllers.forEach(ac => ac.abort("Closing"));
});
this._state = CLOSED;
Expand All @@ -318,4 +360,4 @@ class RPCServer extends EventEmitter {
}
}

module.exports = RPCServer;
export default RPCServer;
32 changes: 16 additions & 16 deletions lib/util.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
const errors = require('./errors');
const package = require('../package.json');
const packageJson = require('../package.json');

const rpcErrorLUT = {
'GenericError' : errors.RPCGenericError,
'NotImplemented' : errors.RPCNotImplementedError,
'NotSupported' : errors.RPCNotSupportedError,
'InternalError' : errors.RPCInternalError,
'ProtocolError' : errors.RPCProtocolError,
'SecurityError' : errors.RPCSecurityError,
'FormationViolation' : errors.RPCFormationViolationError,
'FormatViolation' : errors.RPCFormatViolationError,
'PropertyConstraintViolation' : errors.RPCPropertyConstraintViolationError,
'OccurenceConstraintViolation' : errors.RPCOccurenceConstraintViolationError,
'OccurrenceConstraintViolation' : errors.RPCOccurrenceConstraintViolationError,
'TypeConstraintViolation' : errors.RPCTypeConstraintViolationError,
'MessageTypeNotSupported' : errors.RPCMessageTypeNotSupportedError,
'RpcFrameworkError' : errors.RPCFrameworkError,
'GenericError': errors.RPCGenericError,
'NotImplemented': errors.RPCNotImplementedError,
'NotSupported': errors.RPCNotSupportedError,
'InternalError': errors.RPCInternalError,
'ProtocolError': errors.RPCProtocolError,
'SecurityError': errors.RPCSecurityError,
'FormationViolation': errors.RPCFormationViolationError,
'FormatViolation': errors.RPCFormatViolationError,
'PropertyConstraintViolation': errors.RPCPropertyConstraintViolationError,
'OccurenceConstraintViolation': errors.RPCOccurenceConstraintViolationError,
'OccurrenceConstraintViolation': errors.RPCOccurrenceConstraintViolationError,
'TypeConstraintViolation': errors.RPCTypeConstraintViolationError,
'MessageTypeNotSupported': errors.RPCMessageTypeNotSupportedError,
'RpcFrameworkError': errors.RPCFrameworkError,
};

function getPackageIdent() {
return `${package.name}/${package.version} (${process.platform})`;
return `${packageJson.name}/${packageJson.version} (${process.platform})`;
}

function getErrorPlainObject(err) {
Expand Down
Loading