diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8f322f0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,35 @@ +# See https://help.github.com/articles/ignoring-files/ for more about ignoring files. + +# dependencies +/node_modules +/.pnp +.pnp.js + +# testing +/coverage + +# next.js +/.next/ +/out/ + +# production +/build + +# misc +.DS_Store +*.pem + +# debug +npm-debug.log* +yarn-debug.log* +yarn-error.log* + +# local env files +.env*.local + +# vercel +.vercel + +# typescript +*.tsbuildinfo +next-env.d.ts diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..5187d36 --- /dev/null +++ b/.npmignore @@ -0,0 +1,9 @@ +yarn-error.log +yarn.lock + +test/ +coverage/ +tsconfig.json +**/*.test.ts +**/types.ts +**/errors.ts diff --git a/README.md b/README.md new file mode 100644 index 0000000..63b1fae --- /dev/null +++ b/README.md @@ -0,0 +1,226 @@ +[![npm version](https://badge.fury.io/js/%40anycable%2serverless-js.svg)](https://badge.fury.io/js/%40anycable%2serverless-js) + +# AnyCable Serverless + +This package provides modules to implement [AnyCable](https://anycable.io) backend APIs to be executed in serverless Node.js environments. + +> See our [demo application](https://github.com/anycable/vercel-anycable-demo) for a working example. + +## Architecture and components + +The package comes with HTTP handlers to handle [AnyCable RPC-over-HTTP](https://docs.anycable.io/architecture) requests and provides **channels** and **application** abstractions to describe real-time features of your application. + +## Usage + +Install the `@anycable/serverless-js` package using your tool of choice, e.g.: + +```sh +npm install @anycable/serverless-js +``` + +### Configuring the application and channels + +An **application** instance is responsible for handling the connection lifecycle and dispatching messages to the appropriate channels. + +```js +// api/cable.ts +import { + Application, + ConnectionHandle, + broadcaster, + identificator, +} from "@anycable/serverless-js"; + +// The identifiers type describe connection identifiers—e.g., user ID, username, etc. +export type CableIdentifiers = { + userId: string; +}; + +// Application instance handles connection lifecycle events +class CableApplication extends Application { + async connect(handle: ConnectionHandle) { + const url = handle.env.url; + const params = new URL(url).searchParams; + + if (params.has("token")) { + const payload = await verifyToken(params.get("token")!); + + if (payload) { + const { userId } = payload; + + handle.identifiedBy({ userId }); + } + return; + } + + // Reject connection if not authenticated + handle.reject(); + } + + async disconnect(handle: ConnectionHandle) { + // Here you can perform any cleanup work + console.log(`User ${handle.identifiers!.userId} disconnected`); + } +} + +// Create and instance of the class to use in HTTP handlers (see the next section) +const app = new CableApplication(); + +// Register channels (see below) + +export default app; +``` + +**Channels** instances reflect particular features (e.g, chat room, notifications, etc.) and are responsible for handling incoming commands and subscription lifecycle events: + +```js +import { Channel, ChannelHandle } from "@/lib/anycable"; +// We re-using the identifiers type from the cable application +import type { CableIdentifiers } from "../cable"; + +// Define the channel params (used by the client according to Action Cable protocol) +type ChatChannelParams = { + roomId: string; +}; + +export type ChatMessage = { + id: string; + username: string; + body: string; + createdAt: string; +}; + +export default class ChatChannel + extends Channel +{ + // The `subscribed` method is called when the client subscribes to the channel + // You can use it to authorize the subscription and setup streaming + async subscribed( + handle: ChannelHandle, + params: ChatChannelParams | null, + ) { + if (!params) { + handle.reject(); + return; + } + + if (!params.roomId) { + handle.reject(); + return; + } + + handle.streamFrom(`room:${params.roomId}`); + } + + // This method is called by the client + async sendMessage( + handle: ChannelHandle, + params: ChatChannelParams, + data: SentMessage, + ) { + const { body } = data; + + if (!body) { + throw new Error("Body is required"); + } + + console.log( + `User ${handle.identifiers!.username} sent message: ${data.body}`, + ); + + const message: ChatMessage = { + id: Math.random().toString(36).substr(2, 9), + username: handle.identifiers!.username, + body, + createdAt: new Date().toISOString(), + }; + + // Broadcast the message to all subscribers (see below) + await broadcastTo(`room:${params.roomId}`, message); + } +} + +// You MUST register a channel instance within the application +// The client MUST use the provided identifier to subscribe to the channel. +app.registerChannel("chat", new ChatChannel()); +``` + +Finally, to **broadcast** messages to connected clients, you must use a broadcaster instance: + +```js +// Broadcasting configuration +const broadcastURL = + process.env.ANYCABLE_BROADCAST_URL || "http://127.0.0.1:8090/_broadcast"; +const broadcastToken = process.env.ANYCABLE_HTTP_BROADCAST_SECRET || ""; + +// Create a broadcasting function to send broadcast messages via HTTP API +export const broadcastTo = broadcaster(broadcastURL, broadcastToken); +``` + +Currently, this package only supports broadcasting over HTTP. However, AnyCable provides different [broadcasting adapters](https://docs.anycable.io/ruby/broadcast_adapters) (e.g., Redis, NATS, etc.) that you can integrate yourself. + +### HTTP handlers + +To glue our HTTP layer with the channels, we need to configure HTTP handlers. Below you can find an example of [Vercel](https://vercel.com) serverless functions: + +```js +// api/anycable/connect/route.ts +import { NextResponse } from "next/server"; +import { connectHandler, Status } from "@/lib/anycable"; +import app from "../../cable"; + +export async function POST(request: Request) { + try { + const response = await connectHandler(request, app); + return NextResponse.json(response, { + status: 200, + }); + } catch (e) { + console.error(e); + return NextResponse.json({ + status: Status.ERROR, + error_msg: "Server error", + }); + } +} + +// api/anycable/command/route.ts +import { NextResponse } from "next/server"; +import { commandHandler, Status } from "@/lib/anycable"; +import app from "../../cable"; + +export async function POST(request: Request) { + try { + const response = await commandHandler(request, app); + return NextResponse.json(response, { + status: 200, + }); + } catch (e) { + console.error(e); + return NextResponse.json({ + status: Status.ERROR, + error_msg: "Server error", + }); + } +} + +// api/anycable/disconnect/route.ts +import { NextResponse } from "next/server"; +import { disconnectHandler, Status } from "@/lib/anycable"; +import app from "../../cable"; + +export async function POST(request: Request) { + try { + const response = await disconnectHandler(request, app); + return NextResponse.json(response, { + status: 200, + }); + } catch (e) { + console.error(e); + return NextResponse.json({ + status: Status.ERROR, + error_msg: "Server error", + }); + } +} +``` diff --git a/application/index.ts b/application/index.ts new file mode 100644 index 0000000..8d8a49e --- /dev/null +++ b/application/index.ts @@ -0,0 +1,207 @@ +import { Channel, ChannelHandle } from "../channel"; +import { Env, EnvResponse } from "../rpc"; + +export type IdentifiersMap = { [id: string]: any }; + +export class ConnectionHandle { + readonly id: string | null; + + rejected: boolean = false; + closed: boolean = false; + transmissions: string[]; + streams: string[] = []; + stoppedStreams: string[] = []; + stopStreams: boolean = false; + env: Env; + identifiers: IdentifiersType | null = null; + + constructor(id: string | null, env: Env) { + this.id = id; + this.env = env; + this.transmissions = []; + } + + reject() { + this.rejected = true; + return this; + } + + transmit(data: any) { + if (typeof data !== "string") { + data = JSON.stringify(data); + } + + this.transmissions.push(data); + return this; + } + + streamFrom(name: string) { + this.streams.push(name); + return this; + } + + stopStreamFrom(name: string) { + this.stoppedStreams.push(name); + return this; + } + + stopAllStreams() { + this.stopStreams = true; + return this; + } + + close() { + this.closed = true; + return this; + } + + identifiedBy(identifiers: IdentifiersType) { + this.identifiers = identifiers; + return this; + } + + buildChannelHandle(identifier: string): ChannelHandle { + const rawState = this.env.istate ? this.env.istate[identifier] : null; + + let istate = null; + + if (rawState) { + istate = JSON.parse(rawState); + } + + return new ChannelHandle(this, identifier, istate); + } + + mergeChannelHandle(handle: ChannelHandle) { + if (handle.rejected) { + this.reject(); + } + + for (const transmission of handle.transmissions) { + this.transmit({ identifier: handle.identifier, message: transmission }); + } + + if (handle.state) { + this.env.istate = this.env.istate || {}; + this.env.istate[handle.identifier] = JSON.stringify(handle.state); + } + + this.streams = this.streams.concat(handle.streams); + this.stoppedStreams = this.stoppedStreams.concat(handle.stoppedStreams); + this.stopStreams = this.stopStreams || handle.stopStreams; + + return this; + } + + get envChanges(): EnvResponse { + return { + cstate: this.env.cstate, + istate: this.env.istate, + }; + } +} + +export class Application { + private channels: Record> = {}; + + constructor() { + this.channels = {}; + } + + registerChannel(channelName: string, channelClass: Channel) { + this.channels[channelName] = channelClass; + } + + buildHandle(id: string | null, env: Env): ConnectionHandle { + return new ConnectionHandle(id, env); + } + + async handleOpen(handle: ConnectionHandle) { + await this.connect(handle); + + if (handle.rejected) { + handle.transmit({ type: "disconnect", reason: "unauthorized" }); + } else { + handle.transmit({ type: "welcome", sid: handle.id }); + } + } + + async connect(handle: ConnectionHandle) { + // Override this method in your application class to perform authentication + // and set up connection identifiers + } + + async handleCommand( + handle: ConnectionHandle, + command: string, + identifier: string, + data: string | null, + ) { + const { channel, params } = this.findChannel(identifier); + + const channelHandle = handle.buildChannelHandle(identifier); + + if (command === "subscribe") { + await channel.subscribed(channelHandle, params); + if (channelHandle.rejected) { + handle.transmit({ identifier, type: "reject_subscription" }); + } else { + handle.transmit({ identifier, type: "confirm_subscription" }); + } + } else if (command === "unsubscribe") { + await channel.unsubscribed(channelHandle, params); + } else if (command === "message") { + const { action, ...payload } = JSON.parse(data!); + await channel.handleAction(channelHandle, params, action, payload); + } else { + throw new Error(`Unknown command: ${command}`); + } + + handle.mergeChannelHandle(channelHandle); + } + + async handleClose( + handle: ConnectionHandle, + subscriptions: string[] | null, + ) { + if (subscriptions) { + for (const identifier of subscriptions) { + const { channel, params } = this.findChannel(identifier); + + const channelHandle = handle.buildChannelHandle(identifier); + + await channel.unsubscribed(channelHandle, params); + } + } + + await this.disconnect(handle); + } + + async disconnect(handle: ConnectionHandle) { + // Override this method in your application class to perform cleanup on disconnect + } + + encodeIdentifiers(identifiers: IdentifiersType): string { + return JSON.stringify(identifiers); + } + + decodeIdentifiers(identifiers: string): IdentifiersType { + return JSON.parse(identifiers); + } + + // Identifier is a JSON string with the channel name and params + findChannel(identifier: string): { + channel: Channel; + params: any; + } { + const { channel, ...params } = JSON.parse(identifier); + + const channelInstance = this.channels[channel]; + + if (!channelInstance) { + throw new Error(`Channel ${channel} is not registered`); + } + + return { channel: channelInstance, params }; + } +} diff --git a/broadcast/index.ts b/broadcast/index.ts new file mode 100644 index 0000000..8c265d8 --- /dev/null +++ b/broadcast/index.ts @@ -0,0 +1,23 @@ +export type IBroadcast = (stream: string, data: any) => Promise; + +export const broadcaster = (url: string, secret: string): IBroadcast => { + const broadcast = async (stream: string, data: any) => { + const res = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${secret}`, + }, + body: JSON.stringify({ + stream, + data: JSON.stringify(data), + }), + }); + + if (!res.ok) { + throw new Error(`Error broadcasting to ${stream}: ${res.statusText}`); + } + }; + + return broadcast; +}; diff --git a/channel/index.ts b/channel/index.ts new file mode 100644 index 0000000..515763d --- /dev/null +++ b/channel/index.ts @@ -0,0 +1,111 @@ +import { Env } from "../rpc"; + +interface ConnectionDelegate { + get env(): Env; + get identifiers(): I | null; +} + +export type ChannelState = { [key: string]: any }; + +export class ChannelHandle { + readonly identifier: string; + private delegate: ConnectionDelegate; + state: Partial = {}; + + rejected: boolean = false; + transmissions: T[] = []; + streams: string[] = []; + stoppedStreams: string[] = []; + stopStreams: boolean = false; + + constructor( + delegate: ConnectionDelegate, + identifier: string, + state: Partial, + ) { + this.delegate = delegate; + this.identifier = identifier; + this.state = state; + } + + reject() { + this.rejected = true; + return this; + } + + streamFrom(name: string) { + this.streams.push(name); + return this; + } + + stopStreamFrom(name: string) { + this.stoppedStreams.push(name); + return this; + } + + stopAllStreams() { + this.stopStreams = true; + return this; + } + + transmit(data: T) { + this.transmissions.push(data); + return this; + } + + get env(): Env { + return this.delegate.env; + } + + get identifiers(): I | null { + return this.delegate.identifiers; + } +} + +export type ChannelParamsMap = { [token: string]: boolean | number | string }; + +export type ServerAction< + ClientActions extends (...args: any[]) => void, + I, + S extends ChannelState = {}, + T = any, + P extends ChannelParamsMap = {}, +> = ( + handle: ChannelHandle, + params: P, + ...args: Parameters +) => ReturnType; + +export class Channel< + IdentifiersType, + ParamsType extends ChannelParamsMap = {}, + TransmissionsType = any, + StateType extends ChannelState = {}, +> { + async subscribed( + handle: ChannelHandle, + params: ParamsType | null, + ): Promise {} + + async unsubscribed( + handle: ChannelHandle, + params: ParamsType | null, + ): Promise { + return; + } + + async handleAction( + handle: ChannelHandle, + params: ParamsType | null, + action: string, + payload: any, + ) { + const self = this as any; + + if (!self[action]) { + throw new Error(`Unknown action: ${action}`); + } + + await self[action](handle, params, payload); + } +} diff --git a/index.ts b/index.ts new file mode 100644 index 0000000..cbb6cda --- /dev/null +++ b/index.ts @@ -0,0 +1,21 @@ +export { Status } from "./rpc"; +export type { + Env, + EnvResponse, + ConnectionRequest, + ConnectionResponse, + CommandMessage, + CommandResponse, + DisconnectRequest, + DisconnectResponse, +} from "./rpc"; + +export { connectHandler, commandHandler, disconnectHandler } from "./service"; +export { Application, ConnectionHandle } from "./application"; +export type { IdentifiersMap } from "./application"; +export { Channel, ChannelHandle } from "./channel"; +export type { ChannelParamsMap, ChannelState, ServerAction } from "./channel"; +export type { IBroadcast } from "./broadcast"; +export { broadcaster } from "./broadcast"; +export { identificator } from "./jwt"; +export type { IIdentificator } from "./jwt"; diff --git a/jwt/index.ts b/jwt/index.ts new file mode 100644 index 0000000..d0e66a2 --- /dev/null +++ b/jwt/index.ts @@ -0,0 +1,44 @@ +import * as jose from "jose"; + +export type IdentifiersMap = Record; + +export interface IIdentificator { + verify: (token: string | undefined) => boolean; + verifyAndFetch(token: string | undefined): Promise; + generateToken: (identifiers: I) => Promise; +} + +export const identificator = ( + secret: string, + exp: string | number, +) => { + const secretEncoder = new TextEncoder().encode(secret); + const alg = "HS256"; + + return { + verify: (token: string | undefined) => { + if (token) return !!jose.jwtVerify(token, secretEncoder); + }, + verifyAndFetch: async (token: string | undefined) => { + if (token) { + const { payload } = await jose.jwtVerify(token, secretEncoder, { + algorithms: [alg], + }); + + if (typeof payload.ext == "string") { + return JSON.parse(payload.ext) as I; + } + } + + return null; + }, + generateToken: async (identifiers: I) => { + const token = await new jose.SignJWT({ ext: JSON.stringify(identifiers) }) + .setProtectedHeader({ alg }) + .setExpirationTime(exp) + .sign(secretEncoder); + + return token; + }, + }; +}; diff --git a/package.json b/package.json new file mode 100644 index 0000000..577b2b4 --- /dev/null +++ b/package.json @@ -0,0 +1,31 @@ +{ + "name": "@anycable/serverless-js", + "version": "0.0.1", + "description": "AnyCable channels for serverless JS apps", + "engines": { + "node": ">=18.0.0" + }, + "keywords": [ + "anycable", + "actioncable", + "serverless" + ], + "homepage": "https://anycable.io/", + "repository": "https://github.com/anycable/anycable-serverless-js", + "author": "Vladimir Dementyev", + "license": "MIT", + "sideEffects": false, + "main": "index.ts", + "type": "module", + "exports": { + ".": "./index.ts", + "./package.json": "./package.json" + }, + "dependencies": { + }, + "peerDependencies": { + "jose": "^4.14.6" + }, + "devDependencies": { + } +} diff --git a/rpc/index.ts b/rpc/index.ts new file mode 100644 index 0000000..d4c84f2 --- /dev/null +++ b/rpc/index.ts @@ -0,0 +1,61 @@ +// Enum definition +export enum Status { + ERROR = 0, + SUCCESS = 1, + FAILURE = 2, +} + +// Message definitions +export type Env = { + url: string; + headers: Record | null; + cstate: Record | null; + istate: Record | null; +}; + +export type EnvResponse = { + cstate: Record | null; + istate: Record | null; +}; + +export type ConnectionRequest = { + env: Env; +}; + +export type ConnectionResponse = { + status: Status; + identifiers: string | null; + transmissions: string[] | null; + error_msg: string | null; + env: EnvResponse; +}; + +export type CommandMessage = { + command: string; + identifier: string; + connection_identifiers: string; + data: string | null; + env: Env; +}; + +export type CommandResponse = { + status: Status; + disconnect: boolean; + stop_streams: boolean; + streams: string[] | null; + transmissions: string[] | null; + error_msg: string | null; + env: EnvResponse; + stopped_streams: string[] | null; +}; + +export type DisconnectRequest = { + identifiers: string; + subscriptions: string[]; + env: Env; +}; + +export type DisconnectResponse = { + status: Status; + error_msg: string | null; +}; diff --git a/service/index.ts b/service/index.ts new file mode 100644 index 0000000..94de511 --- /dev/null +++ b/service/index.ts @@ -0,0 +1,99 @@ +import type { Readable } from "node:stream"; + +import { + ConnectionResponse, + ConnectionRequest, + Status, + CommandResponse, + CommandMessage, + DisconnectRequest, + DisconnectResponse, +} from "../rpc"; +import { Application } from "../application"; + +async function buffer(readable: Readable) { + const chunks = []; + for await (const chunk of readable) { + chunks.push(typeof chunk === "string" ? Buffer.from(chunk) : chunk); + } + return Buffer.concat(chunks); +} + +async function parsePayload(req: Request) { + const stream = req.body as unknown; + const buf = await buffer(stream as Readable); + const rawBody = buf.toString("utf8"); + + return JSON.parse(rawBody); +} + +// AnyCable passes the session ID in the `x-anycable-meta-sid` header +function extractSessionId(req: Request) { + return req.headers.get("x-anycable-meta-sid"); +} + +export const connectHandler = async ( + request: Request, + app: Application, +): Promise => { + const payload = (await parsePayload(request)) as ConnectionRequest; + const sid = extractSessionId(request); + const handle = app.buildHandle(sid, payload.env); + + await app.handleOpen(handle); + + return { + status: handle.rejected ? Status.FAILURE : Status.SUCCESS, + identifiers: handle.identifiers + ? app.encodeIdentifiers(handle.identifiers) + : "", + transmissions: handle.transmissions, + error_msg: handle.rejected ? "Auth failed" : "", + env: handle.envChanges, + }; +}; + +export const commandHandler = async ( + request: Request, + app: Application, +): Promise => { + const payload = (await parsePayload(request)) as CommandMessage; + const sid = extractSessionId(request); + const handle = app.buildHandle(sid, payload.env); + handle.identifiedBy(app.decodeIdentifiers(payload.connection_identifiers)); + + await app.handleCommand( + handle, + payload.command, + payload.identifier, + payload.data, + ); + + return { + status: handle.rejected ? Status.FAILURE : Status.SUCCESS, + disconnect: handle.closed, + stop_streams: handle.stopStreams, + transmissions: handle.transmissions, + streams: handle.streams, + stopped_streams: handle.stoppedStreams, + env: handle.envChanges, + error_msg: "", + }; +}; + +export const disconnectHandler = async ( + request: Request, + app: Application, +): Promise => { + const payload = (await parsePayload(request)) as DisconnectRequest; + const sid = extractSessionId(request); + const handle = app.buildHandle(sid, payload.env); + handle.identifiedBy(app.decodeIdentifiers(payload.identifiers)); + + await app.handleClose(handle, payload.subscriptions); + + return { + status: Status.SUCCESS, + error_msg: "", + }; +}; diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..fbf9918 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,22 @@ +{ + "compilerOptions": { + "target": "es5", + "lib": ["dom", "dom.iterable", "esnext"], + "allowJs": true, + "skipLibCheck": true, + "strict": true, + "noEmit": true, + "esModuleInterop": true, + "module": "esnext", + "moduleResolution": "bundler", + "resolveJsonModule": true, + "isolatedModules": true, + "jsx": "preserve", + "incremental": true, + "paths": { + "@/*": ["./*"] + } + }, + "include": ["**/*.ts"], + "exclude": ["node_modules"] +}