From a15ee3fcc436c2ce520920f3c0d92f02aaf8221c Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Tue, 24 Oct 2023 19:10:18 -0700 Subject: [PATCH] * add tests --- .npmignore | 12 ++ .prettierrc | 8 ++ package.json | 3 +- pnpm-lock.yaml | 42 ++++++ src/application/index.ts | 160 +++++++++++----------- src/broadcast/index.ts | 36 +++-- src/channel/index.ts | 78 +++++------ src/index.ts | 24 ++-- src/jwt/index.ts | 42 +++--- src/rpc/index.ts | 78 +++++------ src/service/index.ts | 83 ++++++------ src/test/application.test.ts | 11 -- tests/application.test.ts | 254 +++++++++++++++++++++++++++++++++++ tests/broadcast.test.ts | 82 +++++++++++ tests/jwt.test.ts | 91 +++++++++++++ tests/service.test.ts | 174 ++++++++++++++++++++++++ 16 files changed, 920 insertions(+), 258 deletions(-) create mode 100644 .npmignore create mode 100644 .prettierrc delete mode 100644 src/test/application.test.ts create mode 100644 tests/application.test.ts create mode 100644 tests/broadcast.test.ts create mode 100644 tests/jwt.test.ts create mode 100644 tests/service.test.ts diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..7e443a9 --- /dev/null +++ b/.npmignore @@ -0,0 +1,12 @@ +yarn-error.log +yarn.lock +pnpm-lock.yaml + +node_modules/ +tests/ +coverage/ +tsconfig.json +**/*.test.ts +**/types.ts +**/errors.ts +.github/ diff --git a/.prettierrc b/.prettierrc new file mode 100644 index 0000000..82ad122 --- /dev/null +++ b/.prettierrc @@ -0,0 +1,8 @@ +{ + "arrowParens": "avoid", + "jsxSingleQuote": false, + "quoteProps": "consistent", + "semi": false, + "singleQuote": true, + "trailingComma": "none" +} diff --git a/package.json b/package.json index 1f80bff..a2a967c 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ }, "scripts": { "prepublishOnly": "pnpm build", - "test": "uvu -r @esbuild-kit/cjs-loader src/test", + "test": "uvu -r @esbuild-kit/cjs-loader tests/", "lint": "eslint src/", "typecheck": "tsc --noEmit", "build": "tsc" @@ -49,6 +49,7 @@ "eslint": "^8.49.0", "eslint-config-prettier": "^9.0.0", "eslint-plugin-prettier": "^5.0.0", + "node-fetch": "^3.3.2", "prettier": "^3.0.3", "typescript": "^5.2.2", "uvu": "^0.5.6" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d07660f..c7c0811 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -31,6 +31,9 @@ devDependencies: eslint-plugin-prettier: specifier: ^5.0.0 version: 5.0.0(eslint-config-prettier@9.0.0)(eslint@8.49.0)(prettier@3.0.3) + node-fetch: + specifier: ^3.3.2 + version: 3.3.2 prettier: specifier: ^3.0.3 version: 3.0.3 @@ -615,6 +618,11 @@ packages: which: 2.0.2 dev: true + /data-uri-to-buffer@4.0.1: + resolution: {integrity: sha512-0R9ikRb668HB7QDxT1vkpuUBtqc53YyAwMwGeUFKRojY/NWKvdZ+9UYtRfGmhqNbRkTSVpMbmyhXipFFv2cb/A==} + engines: {node: '>= 12'} + dev: true + /debug@4.3.4: resolution: {integrity: sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==} engines: {node: '>=6.0'} @@ -898,6 +906,14 @@ packages: reusify: 1.0.4 dev: true + /fetch-blob@3.2.0: + resolution: {integrity: sha512-7yAQpD2UMJzLi1Dqv7qFYnPbaPx7ZfFK6PiIxQ4PfkGPyNyl2Ugx+a/umUonmKqjhM4DnfbMvdX6otXq83soQQ==} + engines: {node: ^12.20 || >= 14.13} + dependencies: + node-domexception: 1.0.0 + web-streams-polyfill: 3.2.1 + dev: true + /file-entry-cache@6.0.1: resolution: {integrity: sha512-7Gps/XWymbLk2QLYK4NzpMOrYjMhdIxXuIvy2QBsLE6ljuodKvdkWs/cpyJJ3CVIVpH0Oi1Hvg1ovbMzLdFBBg==} engines: {node: ^10.12.0 || >=12.0.0} @@ -933,6 +949,13 @@ packages: resolution: {integrity: sha512-36yxDn5H7OFZQla0/jFJmbIKTdZAQHngCedGxiMmpNfEZM0sdEeT+WczLQrjK6D7o2aiyLYDnkw0R3JK0Qv1RQ==} dev: true + /formdata-polyfill@4.0.10: + resolution: {integrity: sha512-buewHzMvYL29jdeQTVILecSaZKnt/RJWjoZCF5OW60Z67/GmSLBkOFM7qh1PI3zFNtJbaZL5eQu1vLfazOwj4g==} + engines: {node: '>=12.20.0'} + dependencies: + fetch-blob: 3.2.0 + dev: true + /fs.realpath@1.0.0: resolution: {integrity: sha512-OO0pH2lK6a0hZnAdau5ItzHPI6pUlvI7jMVnxUQRtw4owF2wk8lOSabtGDCTP4Ggrg2MbGnWO9X8K1t4+fGMDw==} dev: true @@ -1209,6 +1232,20 @@ packages: resolution: {integrity: sha512-OWND8ei3VtNC9h7V60qff3SVobHr996CTwgxubgyQYEpg290h9J0buyECNNJexkFm5sOajh5G116RYA1c8ZMSw==} dev: true + /node-domexception@1.0.0: + resolution: {integrity: sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==} + engines: {node: '>=10.5.0'} + dev: true + + /node-fetch@3.3.2: + resolution: {integrity: sha512-dRB78srN/l6gqWulah9SrxeYnxeddIG30+GOqK/9OlLVyLg3HPnr6SqOWTWOXKRwC2eGYCkZ59NNuSgvSrpgOA==} + engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + dependencies: + data-uri-to-buffer: 4.0.1 + fetch-blob: 3.2.0 + formdata-polyfill: 4.0.10 + dev: true + /npm-run-path@4.0.1: resolution: {integrity: sha512-S48WzZW777zhNIrn7gxOlISNAqi9ZC/uQFnRdbeIHhZhCA6UqpkOT8T1G7BvfdgP4Er8gF4sUbaS0i7QvIfCWw==} engines: {node: '>=8'} @@ -1535,6 +1572,11 @@ packages: sade: 1.8.1 dev: true + /web-streams-polyfill@3.2.1: + resolution: {integrity: sha512-e0MO3wdXWKrLbL0DgGnUV7WHVuw9OUvL4hjgnPkIeEvESk74gAITi5G606JtZPp39cd8HA9VQzCIvA49LpPN5Q==} + engines: {node: '>= 8'} + dev: true + /which@2.0.2: resolution: {integrity: sha512-BLI3Tl1TW3Pvl70l3yq3Y64i+awpwXqsGBYWkkqMtnbXgrMD+yj7rhW0kuEDxzJaYXGjEW5ogapKNMEKNMjibA==} engines: {node: '>= 8'} diff --git a/src/application/index.ts b/src/application/index.ts index b46df6c..d67d41b 100644 --- a/src/application/index.ts +++ b/src/application/index.ts @@ -1,128 +1,128 @@ -import { Channel, ChannelHandle } from "../channel"; -import { Env, EnvResponse } from "../rpc"; +import { Channel, ChannelHandle } from '../channel' +import { Env, EnvResponse } from '../rpc' -export type IdentifiersMap = { [id: string]: any }; +export type IdentifiersMap = { [id: string]: unknown } export class ConnectionHandle { - readonly id: string | null; + readonly id: string | null - rejected: boolean = false; - closed: boolean = false; - transmissions: string[]; - streams: string[] = []; - stoppedStreams: string[] = []; - stopStreams: boolean = false; - env: Env; - identifiers: IdentifiersType | null = null; + rejected: boolean = false + closed: boolean = false + transmissions: string[] + streams: string[] = [] + stoppedStreams: string[] = [] + stopStreams: boolean = false + env: Env + identifiers: IdentifiersType | null = null constructor(id: string | null, env: Env) { - this.id = id; - this.env = env; - this.transmissions = []; + this.id = id + this.env = env + this.transmissions = [] } reject() { - this.rejected = true; - return this; + this.rejected = true + return this } - transmit(data: any) { - if (typeof data !== "string") { - data = JSON.stringify(data); + transmit(data: unknown) { + if (typeof data !== 'string') { + data = JSON.stringify(data) } - this.transmissions.push(data); - return this; + this.transmissions.push(data as string) + return this } streamFrom(name: string) { - this.streams.push(name); - return this; + this.streams.push(name) + return this } stopStreamFrom(name: string) { - this.stoppedStreams.push(name); - return this; + this.stoppedStreams.push(name) + return this } stopAllStreams() { - this.stopStreams = true; - return this; + this.stopStreams = true + return this } close() { - this.closed = true; - return this; + this.closed = true + return this } identifiedBy(identifiers: IdentifiersType) { - this.identifiers = identifiers; - return this; + this.identifiers = identifiers + return this } buildChannelHandle(identifier: string): ChannelHandle { - const rawState = this.env.istate ? this.env.istate[identifier] : null; + const rawState = this.env.istate ? this.env.istate[identifier] : null - let istate = null; + let istate = null if (rawState) { - istate = JSON.parse(rawState); + istate = JSON.parse(rawState) } - return new ChannelHandle(this, identifier, istate); + return new ChannelHandle(this, identifier, istate) } mergeChannelHandle(handle: ChannelHandle) { if (handle.rejected) { - this.reject(); + this.reject() } for (const transmission of handle.transmissions) { - this.transmit({ identifier: handle.identifier, message: transmission }); + this.transmit({ identifier: handle.identifier, message: transmission }) } if (handle.state) { - this.env.istate = this.env.istate || {}; - this.env.istate[handle.identifier] = JSON.stringify(handle.state); + this.env.istate = this.env.istate || {} + this.env.istate[handle.identifier] = JSON.stringify(handle.state) } - this.streams = this.streams.concat(handle.streams); - this.stoppedStreams = this.stoppedStreams.concat(handle.stoppedStreams); - this.stopStreams = this.stopStreams || handle.stopStreams; + this.streams = this.streams.concat(handle.streams) + this.stoppedStreams = this.stoppedStreams.concat(handle.stoppedStreams) + this.stopStreams = this.stopStreams || handle.stopStreams - return this; + return this } get envChanges(): EnvResponse { return { cstate: this.env.cstate, - istate: this.env.istate, - }; + istate: this.env.istate + } } } export class Application { - private channels: Record> = {}; + private channels: Record> = {} constructor() { - this.channels = {}; + this.channels = {} } registerChannel(channelName: string, channelClass: Channel) { - this.channels[channelName] = channelClass; + this.channels[channelName] = channelClass } buildHandle(id: string | null, env: Env): ConnectionHandle { - return new ConnectionHandle(id, env); + return new ConnectionHandle(id, env) } async handleOpen(handle: ConnectionHandle) { - await this.connect(handle); + await this.connect(handle) if (handle.rejected) { - handle.transmit({ type: "disconnect", reason: "unauthorized" }); + handle.transmit({ type: 'disconnect', reason: 'unauthorized' }) } else { - handle.transmit({ type: "welcome", sid: handle.id }); + handle.transmit({ type: 'welcome', sid: handle.id }) } } @@ -135,46 +135,46 @@ export class Application { handle: ConnectionHandle, command: string, identifier: string, - data: string | null, + data: string | null ) { - const { channel, params } = this.findChannel(identifier); + const { channel, params } = this.findChannel(identifier) - const channelHandle = handle.buildChannelHandle(identifier); + const channelHandle = handle.buildChannelHandle(identifier) - if (command === "subscribe") { - await channel.subscribed(channelHandle, params); + if (command === 'subscribe') { + await channel.subscribed(channelHandle, params) if (channelHandle.rejected) { - handle.transmit({ identifier, type: "reject_subscription" }); + handle.transmit({ identifier, type: 'reject_subscription' }) } else { - handle.transmit({ identifier, type: "confirm_subscription" }); + handle.transmit({ identifier, type: 'confirm_subscription' }) } - } else if (command === "unsubscribe") { - await channel.unsubscribed(channelHandle, params); - } else if (command === "message") { - const { action, ...payload } = JSON.parse(data!); - await channel.handleAction(channelHandle, params, action, payload); + } else if (command === 'unsubscribe') { + await channel.unsubscribed(channelHandle, params) + } else if (command === 'message') { + const { action, ...payload } = JSON.parse(data!) + await channel.handleAction(channelHandle, params, action, payload) } else { - throw new Error(`Unknown command: ${command}`); + throw new Error(`Unknown command: ${command}`) } - handle.mergeChannelHandle(channelHandle); + handle.mergeChannelHandle(channelHandle) } async handleClose( handle: ConnectionHandle, - subscriptions: string[] | null, + subscriptions: string[] | null ) { if (subscriptions) { for (const identifier of subscriptions) { - const { channel, params } = this.findChannel(identifier); + const { channel, params } = this.findChannel(identifier) - const channelHandle = handle.buildChannelHandle(identifier); + const channelHandle = handle.buildChannelHandle(identifier) - await channel.unsubscribed(channelHandle, params); + await channel.unsubscribed(channelHandle, params) } } - await this.disconnect(handle); + await this.disconnect(handle) } async disconnect(_handle: ConnectionHandle) { @@ -182,26 +182,26 @@ export class Application { } encodeIdentifiers(identifiers: IdentifiersType): string { - return JSON.stringify(identifiers); + return JSON.stringify(identifiers) } decodeIdentifiers(identifiers: string): IdentifiersType { - return JSON.parse(identifiers); + return JSON.parse(identifiers) } // Identifier is a JSON string with the channel name and params findChannel(identifier: string): { - channel: Channel; - params: any; + channel: Channel + params: any } { - const { channel, ...params } = JSON.parse(identifier); + const { channel, ...params } = JSON.parse(identifier) - const channelInstance = this.channels[channel]; + const channelInstance = this.channels[channel] if (!channelInstance) { - throw new Error(`Channel ${channel} is not registered`); + throw new Error(`Channel ${channel} is not registered`) } - return { channel: channelInstance, params }; + return { channel: channelInstance, params } } } diff --git a/src/broadcast/index.ts b/src/broadcast/index.ts index 8c265d8..e860fb7 100644 --- a/src/broadcast/index.ts +++ b/src/broadcast/index.ts @@ -1,23 +1,31 @@ -export type IBroadcast = (stream: string, data: any) => Promise; +export type IBroadcast = (stream: string, data: any) => Promise + +export const broadcaster = ( + url: string, + secret: string | undefined +): IBroadcast => { + const broadcastHeaders: Record = { + 'Content-Type': 'application/json' + } + + if (secret) { + broadcastHeaders['Authorization'] = `Bearer ${secret}` + } -export const broadcaster = (url: string, secret: string): IBroadcast => { const broadcast = async (stream: string, data: any) => { const res = await fetch(url, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${secret}`, - }, + method: 'POST', + headers: broadcastHeaders, body: JSON.stringify({ stream, - data: JSON.stringify(data), - }), - }); + data: JSON.stringify(data) + }) + }) if (!res.ok) { - throw new Error(`Error broadcasting to ${stream}: ${res.statusText}`); + throw new Error(`Error broadcasting to ${stream}: ${res.statusText}`) } - }; + } - return broadcast; -}; + return broadcast +} diff --git a/src/channel/index.ts b/src/channel/index.ts index 282e04b..b362667 100644 --- a/src/channel/index.ts +++ b/src/channel/index.ts @@ -1,111 +1,111 @@ -import { Env } from "../rpc"; +import { Env } from '../rpc' interface ConnectionDelegate { - get env(): Env; - get identifiers(): I | null; + get env(): Env + get identifiers(): I | null } -export type ChannelState = { [key: string]: any }; +export type ChannelState = { [key: string]: any } export class ChannelHandle { - readonly identifier: string; - private delegate: ConnectionDelegate; - state: Partial = {}; + readonly identifier: string + private delegate: ConnectionDelegate + state: Partial = {} - rejected: boolean = false; - transmissions: T[] = []; - streams: string[] = []; - stoppedStreams: string[] = []; - stopStreams: boolean = false; + rejected: boolean = false + transmissions: T[] = [] + streams: string[] = [] + stoppedStreams: string[] = [] + stopStreams: boolean = false constructor( delegate: ConnectionDelegate, identifier: string, - state: Partial, + state: Partial ) { - this.delegate = delegate; - this.identifier = identifier; - this.state = state; + this.delegate = delegate + this.identifier = identifier + this.state = state } reject() { - this.rejected = true; - return this; + this.rejected = true + return this } streamFrom(name: string) { - this.streams.push(name); - return this; + this.streams.push(name) + return this } stopStreamFrom(name: string) { - this.stoppedStreams.push(name); - return this; + this.stoppedStreams.push(name) + return this } stopAllStreams() { - this.stopStreams = true; - return this; + this.stopStreams = true + return this } transmit(data: T) { - this.transmissions.push(data); - return this; + this.transmissions.push(data) + return this } get env(): Env { - return this.delegate.env; + return this.delegate.env } get identifiers(): I | null { - return this.delegate.identifiers; + return this.delegate.identifiers } } -export type ChannelParamsMap = { [token: string]: boolean | number | string }; +export type ChannelParamsMap = { [token: string]: boolean | number | string } export type ServerAction< ClientActions extends (...args: any[]) => void, I, S extends ChannelState = {}, T = any, - P extends ChannelParamsMap = {}, + P extends ChannelParamsMap = {} > = ( handle: ChannelHandle, params: P, ...args: Parameters -) => ReturnType; +) => ReturnType export class Channel< IdentifiersType, ParamsType extends ChannelParamsMap = {}, TransmissionsType = any, - StateType extends ChannelState = {}, + StateType extends ChannelState = {} > { async subscribed( _handle: ChannelHandle, - _params: ParamsType | null, + _params: ParamsType | null ): Promise {} async unsubscribed( _handle: ChannelHandle, - _params: ParamsType | null, + _params: ParamsType | null ): Promise { - return; + return } async handleAction( handle: ChannelHandle, params: ParamsType | null, action: string, - payload: any, + payload: any ) { - const self = this as any; + const self = this as any if (!self[action]) { - throw new Error(`Unknown action: ${action}`); + throw new Error(`Unknown action: ${action}`) } - await self[action](handle, params, payload); + await self[action](handle, params, payload) } } diff --git a/src/index.ts b/src/index.ts index cbb6cda..ff4f269 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,4 @@ -export { Status } from "./rpc"; +export { Status } from './rpc' export type { Env, EnvResponse, @@ -7,15 +7,15 @@ export type { CommandMessage, CommandResponse, DisconnectRequest, - DisconnectResponse, -} from "./rpc"; + DisconnectResponse +} from './rpc' -export { connectHandler, commandHandler, disconnectHandler } from "./service"; -export { Application, ConnectionHandle } from "./application"; -export type { IdentifiersMap } from "./application"; -export { Channel, ChannelHandle } from "./channel"; -export type { ChannelParamsMap, ChannelState, ServerAction } from "./channel"; -export type { IBroadcast } from "./broadcast"; -export { broadcaster } from "./broadcast"; -export { identificator } from "./jwt"; -export type { IIdentificator } from "./jwt"; +export { connectHandler, commandHandler, disconnectHandler } from './service' +export { Application, ConnectionHandle } from './application' +export type { IdentifiersMap } from './application' +export { Channel, ChannelHandle } from './channel' +export type { ChannelParamsMap, ChannelState, ServerAction } from './channel' +export type { IBroadcast } from './broadcast' +export { broadcaster } from './broadcast' +export { identificator } from './jwt' +export type { IIdentificator } from './jwt' diff --git a/src/jwt/index.ts b/src/jwt/index.ts index 8df7a31..ab82fcf 100644 --- a/src/jwt/index.ts +++ b/src/jwt/index.ts @@ -1,47 +1,47 @@ -import * as jose from "jose"; +import * as jose from 'jose' -export type IdentifiersMap = Record; +export type IdentifiersMap = Record export interface IIdentificator { - verify: (token: string) => Promise; - verifyAndFetch(token: string): Promise; - generateToken: (identifiers: I) => Promise; + verify: (token: string) => Promise + verifyAndFetch(token: string): Promise + generateToken: (identifiers: I) => Promise } export const identificator = ( secret: string, - exp: string | number, + exp: string | number ) => { - const secretEncoder = new TextEncoder().encode(secret); - const alg = "HS256"; + const secretEncoder = new TextEncoder().encode(secret) + const alg = 'HS256' return { verify: async (token: string) => { await jose.jwtVerify(token, secretEncoder, { - maxTokenAge: exp, - }); + maxTokenAge: exp + }) - return true; + return true }, verifyAndFetch: async (token: string) => { const { payload } = await jose.jwtVerify(token, secretEncoder, { - algorithms: [alg], - }); + algorithms: [alg] + }) - if (typeof payload.ext == "string") { - return JSON.parse(payload.ext) as I; + if (typeof payload.ext == 'string') { + return JSON.parse(payload.ext) as I } - return null; + return null }, generateToken: async (identifiers: I) => { const token = await new jose.SignJWT({ ext: JSON.stringify(identifiers) }) .setProtectedHeader({ alg }) .setExpirationTime(exp) .setIssuedAt() - .sign(secretEncoder); + .sign(secretEncoder) - return token; - }, - }; -}; + return token + } + } +} diff --git a/src/rpc/index.ts b/src/rpc/index.ts index d4c84f2..27b9724 100644 --- a/src/rpc/index.ts +++ b/src/rpc/index.ts @@ -2,60 +2,60 @@ export enum Status { ERROR = 0, SUCCESS = 1, - FAILURE = 2, + FAILURE = 2 } // Message definitions export type Env = { - url: string; - headers: Record | null; - cstate: Record | null; - istate: Record | null; -}; + url: string + headers?: Record | null + cstate?: Record | null + istate?: Record | null +} export type EnvResponse = { - cstate: Record | null; - istate: Record | null; -}; + cstate?: Record | null + istate?: Record | null +} export type ConnectionRequest = { - env: Env; -}; + env: Env +} export type ConnectionResponse = { - status: Status; - identifiers: string | null; - transmissions: string[] | null; - error_msg: string | null; - env: EnvResponse; -}; + status: Status + identifiers: string | null + transmissions: string[] | null + error_msg: string | null + env: EnvResponse +} export type CommandMessage = { - command: string; - identifier: string; - connection_identifiers: string; - data: string | null; - env: Env; -}; + command: string + identifier: string + connection_identifiers: string + data: string | null + env: Env +} export type CommandResponse = { - status: Status; - disconnect: boolean; - stop_streams: boolean; - streams: string[] | null; - transmissions: string[] | null; - error_msg: string | null; - env: EnvResponse; - stopped_streams: string[] | null; -}; + status: Status + disconnect: boolean + stop_streams: boolean + streams: string[] | null + transmissions: string[] | null + error_msg: string | null + env: EnvResponse + stopped_streams: string[] | null +} export type DisconnectRequest = { - identifiers: string; - subscriptions: string[]; - env: Env; -}; + identifiers: string + subscriptions: string[] + env: Env +} export type DisconnectResponse = { - status: Status; - error_msg: string | null; -}; + status: Status + error_msg: string | null +} diff --git a/src/service/index.ts b/src/service/index.ts index 94de511..94be7ac 100644 --- a/src/service/index.ts +++ b/src/service/index.ts @@ -1,4 +1,4 @@ -import type { Readable } from "node:stream"; +import type { Readable } from 'node:stream' import { ConnectionResponse, @@ -7,67 +7,68 @@ import { CommandResponse, CommandMessage, DisconnectRequest, - DisconnectResponse, -} from "../rpc"; -import { Application } from "../application"; + DisconnectResponse +} from '../rpc' + +import { Application } from '../application' async function buffer(readable: Readable) { - const chunks = []; + const chunks = [] for await (const chunk of readable) { - chunks.push(typeof chunk === "string" ? Buffer.from(chunk) : chunk); + chunks.push(typeof chunk === 'string' ? Buffer.from(chunk) : chunk) } - return Buffer.concat(chunks); + return Buffer.concat(chunks) } async function parsePayload(req: Request) { - const stream = req.body as unknown; - const buf = await buffer(stream as Readable); - const rawBody = buf.toString("utf8"); + const stream = req.body as unknown + const buf = await buffer(stream as Readable) + const rawBody = buf.toString('utf8') - return JSON.parse(rawBody); + return JSON.parse(rawBody) } // AnyCable passes the session ID in the `x-anycable-meta-sid` header function extractSessionId(req: Request) { - return req.headers.get("x-anycable-meta-sid"); + return req.headers.get('x-anycable-meta-sid') } export const connectHandler = async ( request: Request, - app: Application, + app: Application ): Promise => { - const payload = (await parsePayload(request)) as ConnectionRequest; - const sid = extractSessionId(request); - const handle = app.buildHandle(sid, payload.env); + const payload = (await parsePayload(request)) as ConnectionRequest + const sid = extractSessionId(request) + const handle = app.buildHandle(sid, payload.env) - await app.handleOpen(handle); + await app.handleOpen(handle) return { status: handle.rejected ? Status.FAILURE : Status.SUCCESS, identifiers: handle.identifiers ? app.encodeIdentifiers(handle.identifiers) - : "", + : '', transmissions: handle.transmissions, - error_msg: handle.rejected ? "Auth failed" : "", - env: handle.envChanges, - }; -}; + error_msg: handle.rejected ? 'Auth failed' : '', + env: handle.envChanges + } +} export const commandHandler = async ( request: Request, - app: Application, + app: Application ): Promise => { - const payload = (await parsePayload(request)) as CommandMessage; - const sid = extractSessionId(request); - const handle = app.buildHandle(sid, payload.env); - handle.identifiedBy(app.decodeIdentifiers(payload.connection_identifiers)); + const payload = (await parsePayload(request)) as CommandMessage + const sid = extractSessionId(request) + const handle = app.buildHandle(sid, payload.env) + handle.identifiedBy(app.decodeIdentifiers(payload.connection_identifiers)) await app.handleCommand( handle, payload.command, payload.identifier, - payload.data, - ); + payload.data + ) return { status: handle.rejected ? Status.FAILURE : Status.SUCCESS, @@ -77,23 +78,23 @@ export const commandHandler = async ( streams: handle.streams, stopped_streams: handle.stoppedStreams, env: handle.envChanges, - error_msg: "", - }; -}; + error_msg: '' + } +} export const disconnectHandler = async ( request: Request, - app: Application, + app: Application ): Promise => { - const payload = (await parsePayload(request)) as DisconnectRequest; - const sid = extractSessionId(request); - const handle = app.buildHandle(sid, payload.env); - handle.identifiedBy(app.decodeIdentifiers(payload.identifiers)); + const payload = (await parsePayload(request)) as DisconnectRequest + const sid = extractSessionId(request) + const handle = app.buildHandle(sid, payload.env) + handle.identifiedBy(app.decodeIdentifiers(payload.identifiers)) - await app.handleClose(handle, payload.subscriptions); + await app.handleClose(handle, payload.subscriptions) return { status: Status.SUCCESS, - error_msg: "", - }; -}; + error_msg: '' + } +} diff --git a/src/test/application.test.ts b/src/test/application.test.ts deleted file mode 100644 index 7e7191c..0000000 --- a/src/test/application.test.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { Application } from "@/application"; -import { test } from "uvu"; -import * as assert from "uvu/assert"; - -test("application is created", () => { - const app = new Application(); - - assert.ok(app); -}); - -test.run(); diff --git a/tests/application.test.ts b/tests/application.test.ts new file mode 100644 index 0000000..2ec80ce --- /dev/null +++ b/tests/application.test.ts @@ -0,0 +1,254 @@ +import { + Application, + ConnectionHandle, + Channel, + ChannelHandle +} from '../src/index' +import { suite } from 'uvu' +import * as assert from 'uvu/assert' + +const ApplicationTest = suite('Application') + +type TestIdentifiers = { + userId: string +} + +class TestApplication extends Application { + async connect(handle: ConnectionHandle) { + const url = handle.env.url + const params = new URL(url).searchParams + + if (params.has('user_id')) { + handle.identifiedBy({ userId: params.get('user_id')! }) + } else { + handle.reject() + } + } + + async disconnect(handle: ConnectionHandle) {} +} + +ApplicationTest('handleOpen when authenticated', async () => { + const app = new TestApplication() + + const handle = new ConnectionHandle('123', { + url: 'http://localhost?user_id=1' + }) + + await app.handleOpen(handle) + + assert.is(handle.rejected, false) + assert.is(handle.identifiers?.userId, '1') + assert.equal(handle.transmissions, [ + JSON.stringify({ type: 'welcome', sid: '123' }) + ]) +}) + +ApplicationTest('handleOpen when unauthorized', async () => { + const app = new TestApplication() + + const handle = new ConnectionHandle('123', { + url: 'http://localhost' + }) + + await app.handleOpen(handle) + + assert.is(handle.rejected, true) + assert.equal(handle.transmissions, [ + JSON.stringify({ type: 'disconnect', reason: 'unauthorized' }) + ]) +}) + +ApplicationTest('handleClose', async () => { + const app = new TestApplication() + + let disconnectedId: string = '' + + app.disconnect = async (handle: ConnectionHandle) => { + disconnectedId = handle.identifiers!.userId + } + + const handle = new ConnectionHandle('123', { + url: 'http://localhost' + }) + handle.identifiedBy({ userId: '1' }) + + await app.handleClose(handle, []) + + assert.is(disconnectedId, '1') + assert.equal(handle.transmissions, []) +}) + +type TestChannelParams = { + roomId: string +} + +type TestMessage = { + id: string + username: string + body: string +} + +class ChatChannel extends Channel< + TestIdentifiers, + TestChannelParams, + TestMessage +> { + async subscribed( + handle: ChannelHandle, + params: TestChannelParams | null + ) { + if (!params) { + handle.reject() + return + } + + if (!params.roomId) { + handle.reject() + return + } + + handle.streamFrom(`room:${params.roomId}`) + } + + async sendMessage( + handle: ChannelHandle, + params: TestChannelParams, + data: TestMessage + ) { + const { body } = data + + if (!body) { + throw new Error('Body is required') + } + + console.log(`User ${handle.identifiers!.userId} sent message: ${data.body}`) + + const message: TestMessage = { + id: Math.random().toString(36).substr(2, 9), + username: `User ${handle.identifiers!.userId}`, + body + } + + handle.transmit(message) + } +} + +ApplicationTest('handleCommand with unknown channel', async () => { + const app = new TestApplication() + const handle = new ConnectionHandle('123', { + url: 'http://localhost' + }) + + try { + await app.handleCommand(handle, 'subscribe', `{"channel":"chat"}`, null) + assert.unreachable('Should throw if channel is not registered') + } catch (e) { + assert.is(e.message, 'Channel chat is not registered') + } +}) + +ApplicationTest('handleCommand + subscribe + confirmed', async () => { + const app = new TestApplication() + app.registerChannel('chat', new ChatChannel()) + const handle = new ConnectionHandle('123', { + url: 'http://localhost' + }) + handle.identifiedBy({ userId: '42' }) + + const identifier = `{"channel":"chat","roomId":2023}` + + await app.handleCommand(handle, 'subscribe', identifier, null) + + assert.is(handle.rejected, false) + assert.equal(handle.transmissions, [ + JSON.stringify({ + identifier, + type: 'confirm_subscription' + }) + ]) + assert.is(handle.streams.length, 1) + assert.is(handle.streams[0], 'room:2023') +}) + +ApplicationTest('handleCommand + subscribe + rejected', async () => { + const app = new TestApplication() + app.registerChannel('chat', new ChatChannel()) + const handle = new ConnectionHandle('123', { + url: 'http://localhost' + }) + handle.identifiedBy({ userId: '42' }) + + const identifier = `{"channel":"chat","room_id":2023}` + + await app.handleCommand(handle, 'subscribe', identifier, null) + + assert.is(handle.rejected, true) + assert.equal(handle.transmissions, [ + JSON.stringify({ + identifier, + type: 'reject_subscription' + }) + ]) + assert.is(handle.streams.length, 0) +}) + +ApplicationTest('handleCommand + perform', async () => { + const app = new TestApplication() + app.registerChannel('chat', new ChatChannel()) + const handle = new ConnectionHandle('123', { + url: 'http://localhost' + }) + handle.identifiedBy({ userId: '42' }) + + const identifier = `{"channel":"chat","room_id":2023}` + + await app.handleCommand( + handle, + 'message', + identifier, + JSON.stringify({ + action: 'sendMessage', + body: '2023 - Linea' + }) + ) + + assert.is(handle.streams.length, 0) + assert.is(handle.rejected, false) + assert.is(handle.transmissions.length, 1) + + const transmission = JSON.parse(handle.transmissions[0]) + assert.is(transmission['identifier'], identifier) + const message = transmission['message'] as TestMessage + + assert.is(message.username, 'User 42') + assert.is(message.body, '2023 - Linea') +}) + +ApplicationTest('handleCommand + perform + unknown action', async () => { + const app = new TestApplication() + app.registerChannel('chat', new ChatChannel()) + const handle = new ConnectionHandle('123', { + url: 'http://localhost' + }) + handle.identifiedBy({ userId: '42' }) + + const identifier = `{"channel":"chat","room_id":2023}` + + try { + await app.handleCommand( + handle, + 'message', + identifier, + JSON.stringify({ + action: 'send_message', + body: '2023 - Linea' + }) + ) + assert.unreachable('Should throw if action is not defined') + } catch (e) { + assert.is(e.message, 'Unknown action: send_message') + } +}) + +ApplicationTest.run() diff --git a/tests/broadcast.test.ts b/tests/broadcast.test.ts new file mode 100644 index 0000000..db60816 --- /dev/null +++ b/tests/broadcast.test.ts @@ -0,0 +1,82 @@ +import { broadcaster } from '../src/index' +import { suite } from 'uvu' +import * as assert from 'uvu/assert' +import fetch from 'node-fetch' + +// Mock global fetch +;(global as any).fetch = fetch + +const BroadcastTest = suite('Broadcaster') + +BroadcastTest.before.each(context => { + context.originalFetch = global.fetch +}) + +BroadcastTest.after.each(context => { + global.fetch = context.originalFetch +}) + +BroadcastTest('broadcasts with secret', async () => { + const mockUrl = 'http://mock-url.com' + const mockSecret = 'secret-token' + const mockStream = 'stream' + const mockData = { key: 'value' } + + let requestBody: any = null + + global.fetch = async ( + url: RequestInfo | URL, + options: any + ): Promise => { + assert.is(url, mockUrl) + assert.is(options.headers['Authorization'], `Bearer ${mockSecret}`) + requestBody = JSON.parse(options.body) + return new Response(null, { status: 200 }) + } + + const broadcast = broadcaster(mockUrl, mockSecret) + await broadcast(mockStream, mockData) + + assert.is(requestBody.stream, mockStream) + assert.is(requestBody.data, JSON.stringify(mockData)) +}) + +BroadcastTest('broadcasts without secret', async () => { + const mockUrl = 'http://mock-url.com' + const mockStream = 'stream' + const mockData = { key: 'value' } + + global.fetch = async ( + url: RequestInfo | URL, + options: any + ): Promise => { + assert.is(options.headers['Authorization'], undefined) + return new Response(null, { status: 200 }) + } + + const broadcast = broadcaster(mockUrl, undefined) + await broadcast(mockStream, mockData) +}) + +BroadcastTest('handles non-OK response', async () => { + const mockUrl = 'http://mock-url.com' + const mockStream = 'error-stream' + const mockData = { key: 'error' } + + global.fetch = async ( + url: RequestInfo | URL, + options: any + ): Promise => { + return new Response('Not Found', { status: 404, statusText: 'Not Found' }) + } + + const broadcast = broadcaster(mockUrl, undefined) + + try { + await broadcast(mockStream, mockData) + } catch (error) { + assert.is(error.message, `Error broadcasting to ${mockStream}: Not Found`) + } +}) + +BroadcastTest.run() diff --git a/tests/jwt.test.ts b/tests/jwt.test.ts new file mode 100644 index 0000000..6e214a9 --- /dev/null +++ b/tests/jwt.test.ts @@ -0,0 +1,91 @@ +import { suite } from 'uvu' +import * as assert from 'uvu/assert' +import * as jose from 'jose' +import { identificator } from '../src/index' + +const IdentificatorTest = suite('Identificator') + +const generateToken = async ( + secret: string, + payload: object, + options?: Partial<{ + exp: string | number + iat: number + }> +) => { + const secretEncoder = new TextEncoder().encode(secret) + const alg = 'HS256' + + options = options || {} + + const token = await new jose.SignJWT({ ext: JSON.stringify(payload) }) + .setProtectedHeader({ alg }) + .setExpirationTime(options.exp || '1h') + .setIssuedAt(options.iat) + .sign(secretEncoder) + + return token +} + +IdentificatorTest('verifies token', async () => { + const secret = 'super-secret' + const token = await generateToken(secret, {}) + + const id = identificator(secret, '1h') + const result = await id.verify(token) + assert.is(result, true) +}) + +IdentificatorTest('verify throws when expired token', async () => { + const secret = 'super-secret' + const token = await generateToken(secret, {}, { exp: '1s' }) + + await new Promise(resolve => setTimeout(resolve, 1100)) + + const id = identificator(secret, '1s') + + try { + await id.verify(token) + assert.unreachable('Should have thrown an error for expired token') + } catch (error) { + assert.is(error.code, 'ERR_JWT_EXPIRED') + } +}) + +IdentificatorTest('verifies and fetches token', async () => { + const secret = 'super-secret' + const mockData = { key: 'value' } + const token = await generateToken(secret, mockData) + + const id = identificator(secret, '1h') + const result = await id.verifyAndFetch(token) + assert.equal(result, mockData) +}) + +IdentificatorTest('verifyAndFetch throws when expired token', async () => { + const secret = 'super-secret' + const token = await generateToken(secret, {}, { exp: '1s' }) + + await new Promise(resolve => setTimeout(resolve, 1100)) + + const id = identificator(secret, '1s') + + try { + await id.verifyAndFetch(token) + assert.unreachable('Should have thrown an error for expired token') + } catch (error) { + assert.is(error.code, 'ERR_JWT_EXPIRED') + } +}) + +IdentificatorTest('generates token', async () => { + const secret = 'super-secret' + const mockData = { key: 'value' } + const mockToken = await generateToken(secret, mockData) + + const id = identificator(secret, '1h') + const result = await id.generateToken(mockData) + assert.is(result, mockToken) +}) + +IdentificatorTest.run() diff --git a/tests/service.test.ts b/tests/service.test.ts new file mode 100644 index 0000000..2a34489 --- /dev/null +++ b/tests/service.test.ts @@ -0,0 +1,174 @@ +import { suite } from 'uvu' +import * as assert from 'uvu/assert' +import { + Application, + Channel, + ConnectionHandle, + ChannelHandle, + connectHandler, + commandHandler, + disconnectHandler, + Status +} from '../src/index' + +const HandlersTest = suite('Handlers') +function createMockRequest( + url: string, + options?: Partial<{ body: any; sid: string }> +): Request { + const { body, sid } = options || {} + const payload = body || {} + + payload['env'] = payload['env'] || {} + payload['env']['url'] = payload['env']['url'] || url + + const bodyBuffer = Buffer.from(JSON.stringify(payload)) + + const headers = new Map() + if (sid) { + headers.set('x-anycable-meta-sid', sid) + } + + return new Request('http://cable.test/api', { + method: 'POST', + body: bodyBuffer, + headers: headers as any + }) +} + +type TestIdentifiers = { + userId: string +} + +class TestApplication extends Application { + async connect(handle: ConnectionHandle) { + const url = handle.env.url + const params = new URL(url).searchParams + + if (params.has('user_id')) { + handle.identifiedBy({ userId: params.get('user_id')! }) + } else { + handle.reject() + } + } +} + +class TestChannel extends Channel { + async subscribed( + handle: ChannelHandle, + params: { roomId: string } | null + ) { + if (!params) { + handle.reject() + return + } + + if (!params.roomId) { + handle.reject() + return + } + + handle.streamFrom('room:' + params.roomId) + handle.streamFrom( + 'room:' + params.roomId + ':user:' + handle.identifiers?.userId + ) + } +} + +HandlersTest('connect + welcomed', async () => { + const app = new TestApplication() + const request = createMockRequest('http://localhost?user_id=13', { + sid: 's42' + }) + const response = await connectHandler(request, app) + + assert.is(response.status, Status.SUCCESS) + assert.is(response.error_msg, '') + assert.is(response.identifiers, '{"userId":"13"}') + assert.equal(response.transmissions, [ + JSON.stringify({ type: 'welcome', sid: 's42' }) + ]) +}) + +HandlersTest('connect + rejected', async () => { + const app = new TestApplication() + const request = createMockRequest('http://localhost', { + sid: 's42' + }) + const response = await connectHandler(request, app) + + assert.is(response.status, Status.FAILURE) + assert.is(response.error_msg, 'Auth failed') + assert.is(response.identifiers, '') + assert.equal(response.transmissions, [ + JSON.stringify({ type: 'disconnect', reason: 'unauthorized' }) + ]) +}) + +HandlersTest('command + subscribed + confirmed', async () => { + const app = new TestApplication() + app.registerChannel('TestChannel', new TestChannel()) + + const identifier = `{"channel":"TestChannel","roomId":"42"}` + + const request = createMockRequest('http://localhost', { + body: { + connection_identifiers: '{"userId":"13"}', + command: 'subscribe', + identifier + } + }) + + const response = await commandHandler(request, app) + + assert.is(response.status, Status.SUCCESS) + assert.is(response.disconnect, false) + assert.is(response.error_msg, '') + assert.is(response.stop_streams, false) + assert.is(response.transmissions?.length, 1) + + const transmission = JSON.parse(response.transmissions![0]) + assert.is(transmission.identifier, identifier) + assert.is(transmission.type, 'confirm_subscription') + + assert.equal(response.streams, ['room:42', 'room:42:user:13']) +}) + +HandlersTest('disconnect', async () => { + const app = new TestApplication() + const channel = new TestChannel() + + let disconnectedId: string = '' + let unsubscribedId: string = '' + + app.disconnect = async (handle: ConnectionHandle) => { + disconnectedId = handle.identifiers!.userId + } + + channel.unsubscribed = async ( + handle: ChannelHandle, + params: {} | null + ) => { + unsubscribedId = + (handle.identifiers?.userId || '') + '-' + (params || {})['roomId'] + } + + app.registerChannel('TestChannel', channel) + + const identifier = `{"channel":"TestChannel","roomId":"42"}` + + const request = createMockRequest('http://localhost', { + body: { + identifiers: '{"userId":"13"}', + subscriptions: [identifier] + } + }) + + const response = await disconnectHandler(request, app) + + assert.is(response.status, Status.SUCCESS) + assert.is(disconnectedId, '13') + assert.is(unsubscribedId, '13-42') +}) + +HandlersTest.run()