Skip to content

Commit

Permalink
feat: batched transport for shared state
Browse files Browse the repository at this point in the history
  • Loading branch information
b-ma committed Jan 6, 2024
1 parent 433970b commit 6688102
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 4 deletions.
11 changes: 9 additions & 2 deletions src/common/BaseSharedState.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,16 @@ ${JSON.stringify(initValues, null, 2)}`);
// remove listeners
this._client.transport.removeAllListeners(`${UPDATE_RESPONSE}-${this.id}-${this.remoteId}`);
this._client.transport.removeAllListeners(`${UPDATE_NOTIFICATION}-${this.id}-${this.remoteId}`);
this._client.transport.removeAllListeners(`${DELETE_RESPONSE}-${this.id}-${this.remoteId}`);
this._client.transport.removeAllListeners(`${UPDATE_ABORT}-${this.id}-${this.remoteId}`);
this._client.transport.removeAllListeners(`${DELETE_NOTIFICATION}-${this.id}-${this.remoteId}`);
this._client.transport.removeAllListeners(`${DETACH_RESPONSE}-${this.id}-${this.remoteId}`);

if (this._isOwner) {
this._client.transport.removeAllListeners(`${DELETE_RESPONSE}-${this.id}-${this.remoteId}`);
this._client.transport.removeAllListeners(`${DELETE_ERROR}-${this.id}-${this.remoteId}`);
} else {
this._client.transport.removeAllListeners(`${DETACH_RESPONSE}-${this.id}-${this.remoteId}`);
this._client.transport.removeAllListeners(`${DETACH_ERROR}-${this.id}-${this.remoteId}`);
}
}

/** @private */
Expand Down
4 changes: 3 additions & 1 deletion src/common/BaseStateManager.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { isString, isFunction, isPlainObject } from '@ircam/sc-utils';
import SharedState from './BaseSharedState.js';
import SharedStateCollection from './BaseSharedStateCollection.js';
import BatchedTransport from './BatchedTransport.js';
import PromiseStore from './PromiseStore.js';
import {
CREATE_REQUEST,
Expand Down Expand Up @@ -30,7 +31,8 @@ class BaseStateManager {
* Must implement a basic EventEmitter API.
*/
constructor(id, transport) {
this.client = { id, transport };
// proxy transport with BatchedTransport;
this.client = { id, transport: new BatchedTransport(transport) };

this._statesById = new Map(); // <id, state>
this._cachedSchemas = new Map(); // <shemaName, definition>
Expand Down
60 changes: 60 additions & 0 deletions src/common/BatchedTransport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { BATCHED_TRANSPORT_CHANNEL } from './constants.js'

class BatchedTransport {
constructor(transport) {
this._transport = transport;
this._stack = [];
this._listeners = new Map();
this._sending = false;

this._transport.addListener(BATCHED_TRANSPORT_CHANNEL, stack => {
stack.forEach(entry => {
const [channel, args] = entry;
const callbacks = this._listeners.get(channel);

callbacks.forEach(callback => {
callback(...args);
});
});
});
}

addListener(channel, callback) {
if (!this._listeners.has(channel)) {
this._listeners.set(channel, new Set());
}

const callbacks = this._listeners.get(channel);
callbacks.add(callback);
}

async emit(channel, ...args) {
this._stack.push([channel, args]);

if (!this._sending) {
this._sending = true;
this._sending = await false;
const stack = this._stack;
this._stack = [];
this._transport.emit(BATCHED_TRANSPORT_CHANNEL, stack);
}
}

removeListener(channel, callback) {
if (!this._listeners.has(channel)) {
const callbacks = this._listeners.get(channel);
callbacks.delete(callback);
}
}

removeAllListeners(channel = null) {
if (channel === null) {
this._listeners.clear();
} else if (this._listeners.has(channel)) {
const callbacks = this._listeners.get(channel);
callbacks.clear();
}
}
}

export default BatchedTransport;
3 changes: 3 additions & 0 deletions src/common/constants.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// id of the server when owner of a state
export const SERVER_ID = -1;

// batched transport channel
export const BATCHED_TRANSPORT_CHANNEL = 'b:t';

// shared states channels
export const CREATE_REQUEST = 's:c:req';
export const CREATE_RESPONSE = 's:c:res';
Expand Down
9 changes: 8 additions & 1 deletion src/server/StateManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { idGenerator, isString, isPlainObject } from '@ircam/sc-utils';
import clonedeep from 'lodash/cloneDeep.js';

import BaseStateManager from '../common/BaseStateManager.js';
import BatchedTransport from '../common/BatchedTransport.js';
import ParameterBag from '../common/ParameterBag.js';
import SharedStatePrivate from '../common/SharedStatePrivate.js';
import {
Expand Down Expand Up @@ -322,7 +323,7 @@ class StateManager extends BaseStateManager {
this._observers = new Set();
this._hooksBySchemaName = new Map(); // protected

this.addClient(localClientId, localTransport);
this.addClient(localClientId, this.client.transport);
}

[kIsObservableState](state) {
Expand All @@ -344,6 +345,12 @@ class StateManager extends BaseStateManager {
* @private
*/
addClient(nodeId, transport) {
// server adds itself as client, and its transport is a raw EventEmitter
// so we don't want to proxy it twice with BatchedTransport.
if (nodeId !== SERVER_ID) {
transport = new BatchedTransport(transport);
}

const client = { id: nodeId, transport };
this._clientByNodeId.set(nodeId, client);

Expand Down

0 comments on commit 6688102

Please sign in to comment.