From 4987c60d969b147e3ece80a47a49d0e5325203ee Mon Sep 17 00:00:00 2001 From: Ingvar Stepanyan Date: Wed, 27 Mar 2024 15:16:47 +0000 Subject: [PATCH] Add Brotli decoding via brotli-dec-wasm --- package.json | 10 +- src/spacetimedb.ts | 307 +++++++++++++++++----------------- src/websocket_test_adapter.ts | 4 +- tsconfig.json | 2 +- 4 files changed, 162 insertions(+), 161 deletions(-) diff --git a/package.json b/package.json index 5e24c53..b910f30 100644 --- a/package.json +++ b/package.json @@ -25,27 +25,27 @@ "devDependencies": { "@types/jest": "^29.4.0", "@types/node": "^18.14.6", - "@types/websocket": "^1.0.5", + "@types/ws": "^8.5.10", "husky": ">=6", "jest": "^29.5.0", "lint-staged": ">=10", "prettier": "2.8.7", + "prettier-eslint": "^15.0.1", "pretty-quick": "^3.1.3", "rimraf": "^3.0.2", "ts-jest": "^29.0.5", "ts-node": "^10.9.1", - "typescript": "^4.9.3", "ts-proto": "^1.150.1", - "prettier-eslint": "^15.0.1" + "typescript": "^4.9.3" }, "dependencies": { + "brotli-dec-wasm": "^2.1.0", "buffer": "^6.0.3", "events": "^3.3.0", - "ws": "^8.13.0", "isomorphic-ws": "^5.0.0", "long": "^5.2.3", "protobufjs": "^7.2.4", - "websocket": "^1.0.34" + "ws": "^8.13.0" }, "files": [ "/dist" diff --git a/src/spacetimedb.ts b/src/spacetimedb.ts index 72f199d..03c384d 100644 --- a/src/spacetimedb.ts +++ b/src/spacetimedb.ts @@ -43,6 +43,7 @@ import { } from "./message_types"; import { SpacetimeDBGlobals } from "./global"; import { stdbLogger } from "./logger"; +import brotliPromise from "brotli-dec-wasm"; export { ProductValue, @@ -68,7 +69,10 @@ const g = (typeof window === "undefined" ? global : window)!; type CreateWSFnType = ( url: string, protocol: string -) => WebSocket | WebsocketTestAdapter; +) => + | WebSocket + | WebsocketTestAdapter + | Promise; /** * The database client connection to a SpacetimeDB server. @@ -217,8 +221,6 @@ export class SpacetimeDBClient { if (typeof window === "undefined" || !this.runtime.auth_token) { // NodeJS environment const ws = new WebSocket(url, protocol, { - maxReceivedFrameSize: 100000000, - maxReceivedMessageSize: 100000000, headers, }); return ws; @@ -272,101 +274,96 @@ export class SpacetimeDBClient { * Handles WebSocket onMessage event. * @param wsMessage MessageEvent object. */ - private handleOnMessage(wsMessage: any) { + private async handleOnMessage(wsMessage: any) { this.emitter.emit("receiveWSMessage", wsMessage); - this.processMessage(wsMessage, (message: Message) => { - if (message instanceof SubscriptionUpdateMessage) { - for (let tableUpdate of message.tableUpdates) { - const tableName = tableUpdate.tableName; - const entityClass = SpacetimeDBClient.getTableClass(tableName); - const table = this.db.getOrCreateTable( - tableUpdate.tableName, - undefined, - entityClass - ); + const message = await this.processMessage(wsMessage); + if (message instanceof SubscriptionUpdateMessage) { + for (let tableUpdate of message.tableUpdates) { + const tableName = tableUpdate.tableName; + const entityClass = SpacetimeDBClient.getTableClass(tableName); + const table = this.db.getOrCreateTable( + tableUpdate.tableName, + undefined, + entityClass + ); + + table.applyOperations(this.protocol, tableUpdate.operations, undefined); + } - table.applyOperations( - this.protocol, - tableUpdate.operations, - undefined + if (this.emitter) { + this.emitter.emit("initialStateSync"); + } + } else if (message instanceof TransactionUpdateMessage) { + const reducerName = message.event.reducerName; + const reducer: any | undefined = reducerName + ? SpacetimeDBClient.getReducerClass(reducerName) + : undefined; + + let reducerEvent: ReducerEvent | undefined; + let reducerArgs: any; + if (reducer && message.event.status === "committed") { + let adapter: ReducerArgsAdapter; + if (this.protocol === "binary") { + adapter = new BinaryReducerArgsAdapter( + new BinaryAdapter( + new BinaryReader(message.event.args as Uint8Array) + ) ); + } else { + adapter = new JSONReducerArgsAdapter(message.event.args as any[]); } - if (this.emitter) { - this.emitter.emit("initialStateSync"); - } - } else if (message instanceof TransactionUpdateMessage) { - const reducerName = message.event.reducerName; - const reducer: any | undefined = reducerName - ? SpacetimeDBClient.getReducerClass(reducerName) - : undefined; - - let reducerEvent: ReducerEvent | undefined; - let reducerArgs: any; - if (reducer && message.event.status === "committed") { - let adapter: ReducerArgsAdapter; - if (this.protocol === "binary") { - adapter = new BinaryReducerArgsAdapter( - new BinaryAdapter( - new BinaryReader(message.event.args as Uint8Array) - ) - ); - } else { - adapter = new JSONReducerArgsAdapter(message.event.args as any[]); - } - - reducerArgs = reducer.deserializeArgs(adapter); - } + reducerArgs = reducer.deserializeArgs(adapter); + } - reducerEvent = new ReducerEvent( - message.event.identity, - message.event.address, - message.event.originalReducerName, - message.event.status, - message.event.message, - reducerArgs - ); + reducerEvent = new ReducerEvent( + message.event.identity, + message.event.address, + message.event.originalReducerName, + message.event.status, + message.event.message, + reducerArgs + ); - for (let tableUpdate of message.tableUpdates) { - const tableName = tableUpdate.tableName; - const entityClass = SpacetimeDBClient.getTableClass(tableName); - const table = this.db.getOrCreateTable( - tableUpdate.tableName, - undefined, - entityClass - ); + for (let tableUpdate of message.tableUpdates) { + const tableName = tableUpdate.tableName; + const entityClass = SpacetimeDBClient.getTableClass(tableName); + const table = this.db.getOrCreateTable( + tableUpdate.tableName, + undefined, + entityClass + ); - table.applyOperations( - this.protocol, - tableUpdate.operations, - reducerEvent - ); - } + table.applyOperations( + this.protocol, + tableUpdate.operations, + reducerEvent + ); + } - if (reducer) { - this.emitter.emit( - "reducer:" + reducerName, - reducerEvent, - ...(reducerArgs || []) - ); - } - } else if (message instanceof IdentityTokenMessage) { - this.identity = message.identity; - if (this.runtime.auth_token) { - this.token = this.runtime.auth_token; - } else { - this.token = message.token; - } - this.clientAddress = message.address; + if (reducer) { this.emitter.emit( - "connected", - this.token, - this.identity, - this.clientAddress + "reducer:" + reducerName, + reducerEvent, + ...(reducerArgs || []) ); } - }); + } else if (message instanceof IdentityTokenMessage) { + this.identity = message.identity; + if (this.runtime.auth_token) { + this.token = this.runtime.auth_token; + } else { + this.token = message.token; + } + this.clientAddress = message.address; + this.emitter.emit( + "connected", + this.token, + this.identity, + this.clientAddress + ); + } } /** @@ -492,13 +489,15 @@ export class SpacetimeDBClient { const stdbProtocol = this.protocol === "binary" ? "bin" : "text"; this.ws = await this.createWSFn(url, `v1.${stdbProtocol}.spacetimedb`); + this.ws.binaryType = "arraybuffer"; + this.ws.onclose = this.handleOnClose.bind(this); this.ws.onerror = this.handleOnError.bind(this); this.ws.onopen = this.handleOnOpen.bind(this); this.ws.onmessage = this.handleOnMessage.bind(this); } - private processMessage(wsMessage: any, callback: (message: Message) => void) { + private async processMessage(wsMessage: MessageEvent): Promise { if (this.protocol === "binary") { // Helpers for parsing message components which appear in multiple messages. const parseTableRowOperation = ( @@ -536,76 +535,73 @@ export class SpacetimeDBClient { return new SubscriptionUpdateMessage(tableUpdates); }; - let data = wsMessage.data; - if (typeof data.arrayBuffer === "undefined") { - data = new Blob([data]); - } - data.arrayBuffer().then((data: any) => { - const message: Proto.Message = Proto.Message.decode( - new Uint8Array(data) + const compressedData = wsMessage.data as ArrayBuffer; + const brotli = await brotliPromise; + const decompressedData = brotli.decompress( + new Uint8Array(compressedData) + ); + const message: Proto.Message = Proto.Message.decode(decompressedData); + if (message["subscriptionUpdate"]) { + const rawSubscriptionUpdate = message.subscriptionUpdate; + const subscriptionUpdate = parseSubscriptionUpdate( + rawSubscriptionUpdate ); - if (message["subscriptionUpdate"]) { - const rawSubscriptionUpdate = message.subscriptionUpdate; - const subscriptionUpdate = parseSubscriptionUpdate( - rawSubscriptionUpdate - ); - callback(subscriptionUpdate); - } else if (message["transactionUpdate"]) { - const txUpdate = message.transactionUpdate; - const rawSubscriptionUpdate = txUpdate.subscriptionUpdate; - if (!rawSubscriptionUpdate) { - throw new Error( - "Received TransactionUpdate without SubscriptionUpdate" - ); - } - const subscriptionUpdate = parseSubscriptionUpdate( - rawSubscriptionUpdate + return subscriptionUpdate; + } else if (message["transactionUpdate"]) { + const txUpdate = message.transactionUpdate; + const rawSubscriptionUpdate = txUpdate.subscriptionUpdate; + if (!rawSubscriptionUpdate) { + throw new Error( + "Received TransactionUpdate without SubscriptionUpdate" ); + } + const subscriptionUpdate = parseSubscriptionUpdate( + rawSubscriptionUpdate + ); - const event = txUpdate.event; - if (!event) { - throw new Error("Received TransactionUpdate without Event"); - } - const functionCall = event.functionCall; - if (!functionCall) { - throw new Error( - "Received TransactionUpdate with Event but no FunctionCall" - ); - } - const identity: Identity = new Identity(event.callerIdentity); - const address = Address.nullIfZero(event.callerAddress); - const originalReducerName: string = functionCall.reducer; - const reducerName: string = toPascalCase(originalReducerName); - const args = functionCall.argBytes; - const status: string = Proto.event_StatusToJSON(event.status); - const messageStr = event.message; - - const transactionUpdateEvent: TransactionUpdateEvent = - new TransactionUpdateEvent( - identity, - address, - originalReducerName, - reducerName, - args, - status, - messageStr - ); - - const transactionUpdate = new TransactionUpdateMessage( - subscriptionUpdate.tableUpdates, - transactionUpdateEvent + const event = txUpdate.event; + if (!event) { + throw new Error("Received TransactionUpdate without Event"); + } + const functionCall = event.functionCall; + if (!functionCall) { + throw new Error( + "Received TransactionUpdate with Event but no FunctionCall" ); - callback(transactionUpdate); - } else if (message["identityToken"]) { - const identityToken = message.identityToken; - const identity = new Identity(identityToken.identity); - const token = identityToken.token; - const address = new Address(identityToken.address); - const identityTokenMessage: IdentityTokenMessage = - new IdentityTokenMessage(identity, token, address); - callback(identityTokenMessage); } - }); + const identity: Identity = new Identity(event.callerIdentity); + const address = Address.nullIfZero(event.callerAddress); + const originalReducerName: string = functionCall.reducer; + const reducerName: string = toPascalCase(originalReducerName); + const args = functionCall.argBytes; + const status: string = Proto.event_StatusToJSON(event.status); + const messageStr = event.message; + + const transactionUpdateEvent: TransactionUpdateEvent = + new TransactionUpdateEvent( + identity, + address, + originalReducerName, + reducerName, + args, + status, + messageStr + ); + + const transactionUpdate = new TransactionUpdateMessage( + subscriptionUpdate.tableUpdates, + transactionUpdateEvent + ); + return transactionUpdate; + } else if (message["identityToken"]) { + const identityToken = message.identityToken; + const identity = new Identity(identityToken.identity); + const token = identityToken.token; + const address = new Address(identityToken.address); + const identityTokenMessage: IdentityTokenMessage = + new IdentityTokenMessage(identity, token, address); + return identityTokenMessage; + } } else { const parseTableRowOperation = ( rawTableOperation: JsonApi.TableRowOperation @@ -647,7 +643,7 @@ export class SpacetimeDBClient { const subscriptionUpdate = parseSubscriptionUpdate( data.SubscriptionUpdate ); - callback(subscriptionUpdate); + return subscriptionUpdate; } else if (data["TransactionUpdate"]) { const txUpdate = data.TransactionUpdate; const subscriptionUpdate = parseSubscriptionUpdate( @@ -679,7 +675,7 @@ export class SpacetimeDBClient { subscriptionUpdate.tableUpdates, transactionUpdateEvent ); - callback(transactionUpdate); + return transactionUpdate; } else if (data["IdentityToken"]) { const identityToken = data.IdentityToken; const identity = new Identity(identityToken.identity); @@ -687,9 +683,12 @@ export class SpacetimeDBClient { const address = Address.fromString(identityToken.address); const identityTokenMessage: IdentityTokenMessage = new IdentityTokenMessage(identity, token, address); - callback(identityTokenMessage); + return identityTokenMessage; } } + // If we reached here, we encountered a message we don't have a handler for. + // This shouldn't happen. + throw new Error("Unhandled message type"); } /** diff --git a/src/websocket_test_adapter.ts b/src/websocket_test_adapter.ts index 5d474ff..63ae50c 100644 --- a/src/websocket_test_adapter.ts +++ b/src/websocket_test_adapter.ts @@ -7,6 +7,8 @@ class WebsocketTestAdapter { public messageQueue: any[]; public closed: boolean; + public binaryType: BinaryType = "blob"; + constructor() { this.messageQueue = []; this.closed = false; @@ -25,7 +27,7 @@ class WebsocketTestAdapter { } public sendToClient(message: any) { - if (typeof message.data !== 'string') { + if (typeof message.data !== "string") { message.data = JSON.stringify(message.data); } this.onmessage(message); diff --git a/tsconfig.json b/tsconfig.json index e9c5f0d..9b8c3a5 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -10,7 +10,7 @@ "forceConsistentCasingInFileNames": true, "strict": true, "noImplicitAny": false, - "moduleResolution": "nodenext", + "moduleResolution": "Node", "allowSyntheticDefaultImports": true }, "include": ["src"],