From 3e9ca5ec83074c74060c9b6502efc1bcb0c84ed1 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 17 Jul 2024 14:06:13 +0300 Subject: [PATCH] feat: add fast header support --- src/event/event-handler.js | 7 +- src/message/connection.js | 90 ++++++-------- src/message/message-builder.js | 65 ---------- src/message/message-parser.js | 91 -------------- src/message/message.js | 221 +++++++++++++++++++++++++++++++++ src/record/record.js | 4 +- src/rpc/rpc-handler.js | 9 +- src/rpc/rpc-response.js | 4 +- 8 files changed, 273 insertions(+), 218 deletions(-) delete mode 100644 src/message/message-builder.js delete mode 100644 src/message/message-parser.js create mode 100644 src/message/message.js diff --git a/src/event/event-handler.js b/src/event/event-handler.js index 7a327ae6..d1f5c79e 100644 --- a/src/event/event-handler.js +++ b/src/event/event-handler.js @@ -1,9 +1,8 @@ -const messageBuilder = require('../message/message-builder') -const messageParser = require('../message/message-parser') const C = require('../constants/constants') const MulticastListener = require('../utils/multicast-listener') const UnicastListener = require('../utils/unicast-listener') const EventEmitter = require('component-emitter2') +const Message = require('../message/message') const rxjs = require('rxjs') const EventHandler = function (options, connection, client) { @@ -105,7 +104,7 @@ EventHandler.prototype.emit = function (name, data) { throw new Error('invalid argument name') } - this._connection.sendMsg(C.TOPIC.EVENT, C.ACTIONS.EVENT, [name, messageBuilder.typed(data)]) + this._connection.sendMsg(C.TOPIC.EVENT, C.ACTIONS.EVENT, [name, Message.encodeTyped(data)]) this._emitter.emit(name, data) this._stats.emitted += 1 } @@ -143,7 +142,7 @@ EventHandler.prototype._$handle = function (message) { if (message.action === C.ACTIONS.EVENT) { if (message.data && message.data.length === 2) { - this._emitter.emit(name, messageParser.convertTyped(data, this._client)) + this._emitter.emit(name, Message.decodeTyped(data, this._client)) } else { this._emitter.emit(name) } diff --git a/src/message/connection.js b/src/message/connection.js index aa0487ac..25bec43b 100644 --- a/src/message/connection.js +++ b/src/message/connection.js @@ -1,8 +1,7 @@ const BrowserWebSocket = globalThis.WebSocket || globalThis.MozWebSocket const utils = require('../utils/utils') const NodeWebSocket = utils.isNode ? require('ws') : null -const messageParser = require('./message-parser') -const messageBuilder = require('./message-builder') +const Message = require('./message') const C = require('../constants/constants') const pkg = require('../../package.json') const xxhash = require('xxhash-wasm') @@ -79,15 +78,15 @@ Connection.prototype.authenticate = function (authParams, callback) { } Connection.prototype.sendMsg = function (topic, action, data) { - return this.send(messageBuilder.getMsg(topic, action, data)) + return this.send(Message.encode(topic, action, data)) } Connection.prototype.sendMsg1 = function (topic, action, p0) { - return this.send(messageBuilder.getMsg1(topic, action, p0)) + return this.send(Message.encode(topic, action, [p0])) } Connection.prototype.sendMsg2 = function (topic, action, p0, p1) { - return this.send(messageBuilder.getMsg2(topic, action, p0, p1)) + return this.send(Message.encode(topic, action, [p0, p1])) } Connection.prototype.close = function () { @@ -101,19 +100,22 @@ Connection.prototype.close = function () { } Connection.prototype._createEndpoint = function () { - this._endpoint = NodeWebSocket - ? new NodeWebSocket(this._url, { - generateMask() {}, - }) - : new BrowserWebSocket(this._url) + if (NodeWebSocket) { + this._endpoint = new NodeWebSocket(this._url, { + generateMask() {}, + }) + } else { + this._endpoint = new BrowserWebSocket(this._url) + this._endpoint.binaryType = 'arraybuffer' + } this._corked = false this._endpoint.onopen = this._onOpen.bind(this) this._endpoint.onerror = this._onError.bind(this) this._endpoint.onclose = this._onClose.bind(this) this._endpoint.onmessage = BrowserWebSocket - ? ({ data }) => this._onMessage(typeof data === 'string' ? data : Buffer.from(data).toString()) - : ({ data }) => this._onMessage(typeof data === 'string' ? data : data.toString()) + ? ({ data }) => this._onMessage(Buffer.from(data)) + : ({ data }) => this._onMessage(data) } Connection.prototype.send = function (message) { @@ -125,7 +127,10 @@ Connection.prototype.send = function (message) { C.TOPIC.CONNECTION, C.EVENT.CONNECTION_ERROR, err, - message.split(C.MESSAGE_PART_SEPERATOR).map((x) => x.slice(0, 256)) + message + .toString() + .split(C.MESSAGE_PART_SEPERATOR) + .map((x) => x.slice(0, 256)) ) return false } @@ -172,14 +177,15 @@ Connection.prototype._submit = function (message) { Connection.prototype._sendAuthParams = function () { this._setState(C.CONNECTION_STATE.AUTHENTICATING) - const authMessage = messageBuilder.getMsg(C.TOPIC.AUTH, C.ACTIONS.REQUEST, [ - this._authParams, - pkg.version, - utils.isNode - ? `Node/${process.version}` - : globalThis.navigator && globalThis.navigator.userAgent, - ]) - this._submit(authMessage) + this._submit( + Message.encode(C.TOPIC.AUTH, C.ACTIONS.REQUEST, [ + this._authParams, + pkg.version, + utils.isNode + ? `Node/${process.version}` + : globalThis.navigator && globalThis.navigator.userAgent, + ]) + ) } Connection.prototype._onOpen = function () { @@ -219,13 +225,11 @@ Connection.prototype._onClose = function () { } } -Connection.prototype._onMessage = function (data) { - // Remove MESSAGE_SEPERATOR if exists. - if (data.charCodeAt(data.length - 1) === 30) { - data = data.slice(0, -1) +Connection.prototype._onMessage = function (raw) { + if (raw.length <= 2) { + return } - - this._recvQueue.push(data) + this._recvQueue.push(Message.decode(raw)) if (!this._processingRecv) { this._processingRecv = true this._schedule(this._recvMessages) @@ -245,24 +249,14 @@ Connection.prototype._recvMessages = function (deadline) { return } - if (message.length <= 2) { - continue - } - - if (this._logger) { - this._logger.trace(message, 'receive') - } - - messageParser.parseMessage(message, this._client, this._message) + this.emit('recv', message) - this.emit('recv', this._message) - - if (this._message.topic === C.TOPIC.CONNECTION) { - this._handleConnectionResponse(this._message) - } else if (this._message.topic === C.TOPIC.AUTH) { - this._handleAuthResponse(this._message) + if (message.topic === C.TOPIC.CONNECTION) { + this._handleConnectionResponse(message) + } else if (message.topic === C.TOPIC.AUTH) { + this._handleAuthResponse(message) } else { - this._client._$onMessage(this._message) + this._client._$onMessage(message) } } @@ -271,7 +265,7 @@ Connection.prototype._recvMessages = function (deadline) { Connection.prototype._handleConnectionResponse = function (message) { if (message.action === C.ACTIONS.PING) { - this._submit(messageBuilder.getMsg(C.TOPIC.CONNECTION, C.ACTIONS.PONG)) + this._submit(Message.encode(C.TOPIC.CONNECTION, C.ACTIONS.PONG)) } else if (message.action === C.ACTIONS.ACK) { this._setState(C.CONNECTION_STATE.AWAITING_AUTHENTICATION) if (this._authParams) { @@ -279,9 +273,7 @@ Connection.prototype._handleConnectionResponse = function (message) { } } else if (message.action === C.ACTIONS.CHALLENGE) { this._setState(C.CONNECTION_STATE.CHALLENGING) - this._submit( - messageBuilder.getMsg(C.TOPIC.CONNECTION, C.ACTIONS.CHALLENGE_RESPONSE, [this._url]) - ) + this._submit(Message.encode(C.TOPIC.CONNECTION, C.ACTIONS.CHALLENGE_RESPONSE, [this._url])) } else if (message.action === C.ACTIONS.REJECTION) { this._challengeDenied = true this.close() @@ -316,10 +308,10 @@ Connection.prototype._handleAuthResponse = function (message) { } Connection.prototype._getAuthData = function (data) { - if (data === undefined) { + if (!data) { return null } else { - return messageParser.convertTyped(data, this._client) + return Message.decodeTyped(data, this._client) } } diff --git a/src/message/message-builder.js b/src/message/message-builder.js deleted file mode 100644 index 7a477933..00000000 --- a/src/message/message-builder.js +++ /dev/null @@ -1,65 +0,0 @@ -const C = require('../constants/constants') - -const SEP = C.MESSAGE_PART_SEPERATOR - -module.exports.getMsg = function (topic, action, data) { - if (data && !(data instanceof Array)) { - throw new Error('data must be an array') - } - - const sendData = [topic, action] - - if (data) { - for (let i = 0; i < data.length; i++) { - if (typeof data[i] === 'object') { - sendData.push(JSON.stringify(data[i])) - } else { - sendData.push(data[i]) - } - } - } - - return sendData.join(SEP) -} - -module.exports.getMsg1 = function (topic, action, p0) { - return `${topic}${SEP}${action}${SEP}${p0}` -} - -module.exports.getMsg2 = function (topic, action, p0, p1) { - return `${topic}${SEP}${action}${SEP}${p0}${SEP}${p1}` -} - -module.exports.typed = function (value) { - const type = typeof value - - if (type === 'string') { - return C.TYPES.STRING + value - } - - if (value === null) { - return C.TYPES.NULL - } - - if (type === 'object') { - return C.TYPES.OBJECT + JSON.stringify(value) - } - - if (type === 'number') { - return C.TYPES.NUMBER + value.toString() - } - - if (value === true) { - return C.TYPES.TRUE - } - - if (value === false) { - return C.TYPES.FALSE - } - - if (value === undefined) { - return C.TYPES.UNDEFINED - } - - throw new Error(`Can't serialize type ${value}`) -} diff --git a/src/message/message-parser.js b/src/message/message-parser.js deleted file mode 100644 index e8ee48e5..00000000 --- a/src/message/message-parser.js +++ /dev/null @@ -1,91 +0,0 @@ -const C = require('../constants/constants') - -const MessageParser = function () { - this._actions = this._getActions() -} - -MessageParser.prototype.convertTyped = function (value, client) { - const type = value.charAt(0) - - if (type === C.TYPES.STRING) { - return value.substr(1) - } - - if (type === C.TYPES.OBJECT) { - try { - return JSON.parse(value.substr(1)) - } catch (err) { - client._$onError(C.TOPIC.ERROR, C.EVENT.MESSAGE_PARSE_ERROR, err) - return undefined - } - } - - if (type === C.TYPES.NUMBER) { - return parseFloat(value.substr(1)) - } - - if (type === C.TYPES.NULL) { - return null - } - - if (type === C.TYPES.TRUE) { - return true - } - - if (type === C.TYPES.FALSE) { - return false - } - - if (type === C.TYPES.UNDEFINED) { - return undefined - } - - client._$onError(C.TOPIC.ERROR, C.EVENT.MESSAGE_PARSE_ERROR, new Error(`UNKNOWN_TYPE (${value})`)) - - return undefined -} - -MessageParser.prototype._getActions = function () { - const actions = {} - - for (const key in C.ACTIONS) { - actions[C.ACTIONS[key]] = key - } - - return actions -} - -MessageParser.prototype.parseMessage = function (message, client, result) { - const parts = message.split(C.MESSAGE_PART_SEPERATOR) - - if (parts.length < 2) { - client._$onError( - C.TOPIC.ERROR, - C.EVENT.MESSAGE_PARSE_ERROR, - new Error('Insufficiant message parts') - ) - return null - } - - if (parts[0] === C.TOPIC.ERROR) { - client._$onError(C.TOPIC.ERROR, parts[1], new Error('Message error'), message) - return null - } - - if (this._actions[parts[1]] === undefined) { - client._$onError( - C.TOPIC.ERROR, - C.EVENT.MESSAGE_PARSE_ERROR, - new Error('Unknown action'), - message - ) - return null - } - - result.raw = message - result.topic = parts[0] - result.action = parts[1] - result.data = parts.splice(2) -} - -module.exports = new MessageParser() diff --git a/src/message/message.js b/src/message/message.js new file mode 100644 index 00000000..07d301cd --- /dev/null +++ b/src/message/message.js @@ -0,0 +1,221 @@ +const varint = require('varint') +const C = require('../constants/constants') + +const FAST_HEADER_SIZE = 8 +const FAST_HEADER_NUMBER = 128 + +const Uint8ArrayIndexOf = Uint8Array.prototype.indexOf + +function toCode(val) { + return val.split('').reduce((xs, x, index) => (xs << 8) | x.charCodeAt(0), 0) +} + +const TOPIC_MAP = new Map() +for (const val of Object.values(C.TOPIC)) { + TOPIC_MAP.set(toCode(val), val) +} + +const ACTIONS_MAP = new Map() +for (const val of Object.values(C.ACTIONS)) { + ACTIONS_MAP.set(toCode(val), val) +} + +module.exports.decode = function (raw) { + let pos = 0 + + if (raw[0] === FAST_HEADER_NUMBER) { + pos += FAST_HEADER_SIZE + } + + const len = raw.byteLength + const topic = raw[pos++] + pos++ + + let action = 0 + while (pos < len && raw[pos] !== 31) { + action = (action << 8) | raw[pos++] + } + pos++ + + const data = [] + + if (raw[0] === FAST_HEADER_NUMBER) { + let headerPos = 1 + while (headerPos < FAST_HEADER_SIZE) { + const len = varint.decode(raw, headerPos) + headerPos += varint.decode.bytes + if (len === 0) { + break + } + data.push(raw.toString('utf8', pos, pos + len - 1)) + pos += len + } + } + + while (pos < len) { + let end = Uint8ArrayIndexOf.call(raw, 31, pos) + end = end === -1 ? len : end + data.push(raw.toString('utf8', pos, end)) + pos = end + 1 + } + + // TODO (perf): Make constant into codes... + return { raw, topic: TOPIC_MAP.get(topic), action: ACTIONS_MAP.get(action), data } +} + +let poolSize = 1024 * 1024 +let poolBuffer = Buffer.allocUnsafe(poolSize) +let poolOffset = 0 + +module.exports.encode = function (topic, action, data) { + if (poolBuffer.byteLength - poolOffset < poolSize / 16) { + poolBuffer = Buffer.allocUnsafe(poolSize) + poolOffset = 0 + } + + topic = typeof topic === 'number' ? topic : toCode(topic) + action = typeof action === 'number' ? action : toCode(action) + + const buf = poolBuffer + let pos = poolOffset + + const start = pos + + buf[pos++] = FAST_HEADER_NUMBER + for (let n = 1; n < FAST_HEADER_SIZE; n++) { + buf[pos++] = 0 + } + + if (action <= 0xff) { + buf[pos++] = topic + buf[pos++] = 31 + buf[pos++] = (action >> 0) & 0xff + } else if (action <= 0xffff) { + buf[pos++] = topic + buf[pos++] = 31 + buf[pos++] = (action >> 8) & 0xff + buf[pos++] = (action >> 0) & 0xff + } else if (action <= 0xffffff) { + buf[pos++] = topic + buf[pos++] = 31 + buf[pos++] = (action >> 16) & 0xff + buf[pos++] = (action >> 8) & 0xff + buf[pos++] = (action >> 0) & 0xff + } + + if (Array.isArray(data) && data.length > 0) { + let headerPos = start + 1 + for (let n = 0, len = data.length; n < len; ++n) { + let len = 0 + if (data[n] == null) { + buf[pos++] = 31 + len = 0 + } else if (typeof data[n] === 'string') { + buf[pos++] = 31 + len = buf.write(data[n], pos) + } else if (Buffer.isBuffer(data[n])) { + buf[pos++] = 31 + len = data[n].copy(buf, pos) + } else { + buf[pos++] = 31 + len = buf.write(JSON.stringify(data[n]), pos) + } + + { + const varintLen = varint.encodingLength(len) + if (headerPos + varintLen - start < FAST_HEADER_SIZE) { + varint.encode(len + 1, buf, headerPos) + headerPos += varint.encode.bytes + } + } + + pos += len + + if (pos >= poolSize) { + poolSize = start === 0 ? poolSize * 2 : poolSize + poolBuffer = Buffer.allocUnsafe(poolSize) + poolOffset = 0 + return module.exports.encode(topic, action, data) + } + } + } + + poolOffset = pos + + return buf.subarray(start, pos) +} + +exports.decodeTyped = function (value, client) { + const type = value.charAt(0) + + if (type === C.TYPES.STRING) { + return value.substr(1) + } + + if (type === C.TYPES.OBJECT) { + try { + return JSON.parse(value.substr(1)) + } catch (err) { + client._$onError(C.TOPIC.ERROR, C.EVENT.MESSAGE_PARSE_ERROR, err) + return undefined + } + } + + if (type === C.TYPES.NUMBER) { + return parseFloat(value.substr(1)) + } + + if (type === C.TYPES.NULL) { + return null + } + + if (type === C.TYPES.TRUE) { + return true + } + + if (type === C.TYPES.FALSE) { + return false + } + + if (type === C.TYPES.UNDEFINED) { + return undefined + } + + client._$onError(C.TOPIC.ERROR, C.EVENT.MESSAGE_PARSE_ERROR, new Error(`UNKNOWN_TYPE (${value})`)) + + return undefined +} + +module.exports.encodeTyped = function (value) { + const type = typeof value + + if (type === 'string') { + return C.TYPES.STRING + value + } + + if (value === null) { + return C.TYPES.NULL + } + + if (type === 'object') { + return C.TYPES.OBJECT + JSON.stringify(value) + } + + if (type === 'number') { + return C.TYPES.NUMBER + value.toString() + } + + if (value === true) { + return C.TYPES.TRUE + } + + if (value === false) { + return C.TYPES.FALSE + } + + if (value === undefined) { + return C.TYPES.UNDEFINED + } + + throw new Error(`Can't serialize type ${value}`) +} diff --git a/src/record/record.js b/src/record/record.js index 7f170573..d59eaa64 100644 --- a/src/record/record.js +++ b/src/record/record.js @@ -1,7 +1,7 @@ const jsonPath = require('@nxtedition/json-path') const utils = require('../utils/utils') const C = require('../constants/constants') -const messageParser = require('../message/message-parser') +const Message = require('../message/message') const xuid = require('xuid') const invariant = require('invariant') const cloneDeep = require('lodash.clonedeep') @@ -464,7 +464,7 @@ class Record { const prevState = this._state this._state = - hasProvider && messageParser.convertTyped(hasProvider, this._handler._client) + hasProvider && Message.decodeTyped(hasProvider, this._handler._client) ? C.RECORD_STATE.PROVIDER : this._version.charAt(0) === 'I' ? C.RECORD_STATE.STALE diff --git a/src/rpc/rpc-handler.js b/src/rpc/rpc-handler.js index 3eb03007..9b7f399e 100644 --- a/src/rpc/rpc-handler.js +++ b/src/rpc/rpc-handler.js @@ -1,7 +1,6 @@ const C = require('../constants/constants') const RpcResponse = require('./rpc-response') -const messageParser = require('../message/message-parser') -const messageBuilder = require('../message/message-builder') +const Message = require('../message/message') const xuid = require('xuid') const RpcHandler = function (options, connection, client) { @@ -97,7 +96,7 @@ RpcHandler.prototype.make = function (name, data, callback) { data, callback, }) - this._connection.sendMsg(C.TOPIC.RPC, C.ACTIONS.REQUEST, [name, id, messageBuilder.typed(data)]) + this._connection.sendMsg(C.TOPIC.RPC, C.ACTIONS.REQUEST, [name, id, Message.encodeTyped(data)]) return promise } @@ -111,7 +110,7 @@ RpcHandler.prototype._respond = function (message) { if (callback) { let promise try { - promise = Promise.resolve(callback(messageParser.convertTyped(data, this._client), response)) + promise = Promise.resolve(callback(Message.decodeTyped(data, this._client), response)) } catch (err) { promise = Promise.reject(err) } @@ -156,7 +155,7 @@ RpcHandler.prototype._$handle = function (message) { }) ) } else { - rpc.callback(null, messageParser.convertTyped(data, this._client)) + rpc.callback(null, Message.decodeTyped(data, this._client)) } } } diff --git a/src/rpc/rpc-response.js b/src/rpc/rpc-response.js index ed5fc3a4..8b002ef6 100644 --- a/src/rpc/rpc-response.js +++ b/src/rpc/rpc-response.js @@ -1,5 +1,5 @@ const C = require('../constants/constants') -const messageBuilder = require('../message/message-builder') +const Message = require('../message/message') const RpcResponse = function (connection, name, id) { this._connection = connection @@ -40,7 +40,7 @@ RpcResponse.prototype.send = function (data) { this._connection.sendMsg(C.TOPIC.RPC, C.ACTIONS.RESPONSE, [ this._name, this._id, - messageBuilder.typed(data), + Message.encodeTyped(data), ]) }