Skip to content

Commit

Permalink
Merge pull request #81 from collective-soundworks/feat/batched-transport
Browse files Browse the repository at this point in the history
feat: batched transport
  • Loading branch information
b-ma authored Jan 6, 2024
2 parents c22bfe2 + 6827cd6 commit f3aa1ce
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 30 deletions.
11 changes: 9 additions & 2 deletions src/common/BaseSharedState.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,16 @@ ${JSON.stringify(initValues, null, 2)}`);
// remove listeners
this._client.transport.removeAllListeners(`${UPDATE_RESPONSE}-${this.id}-${this.remoteId}`);
this._client.transport.removeAllListeners(`${UPDATE_NOTIFICATION}-${this.id}-${this.remoteId}`);
this._client.transport.removeAllListeners(`${DELETE_RESPONSE}-${this.id}-${this.remoteId}`);
this._client.transport.removeAllListeners(`${UPDATE_ABORT}-${this.id}-${this.remoteId}`);
this._client.transport.removeAllListeners(`${DELETE_NOTIFICATION}-${this.id}-${this.remoteId}`);
this._client.transport.removeAllListeners(`${DETACH_RESPONSE}-${this.id}-${this.remoteId}`);

if (this._isOwner) {
this._client.transport.removeAllListeners(`${DELETE_RESPONSE}-${this.id}-${this.remoteId}`);
this._client.transport.removeAllListeners(`${DELETE_ERROR}-${this.id}-${this.remoteId}`);
} else {
this._client.transport.removeAllListeners(`${DETACH_RESPONSE}-${this.id}-${this.remoteId}`);
this._client.transport.removeAllListeners(`${DETACH_ERROR}-${this.id}-${this.remoteId}`);
}
}

/** @private */
Expand Down
4 changes: 3 additions & 1 deletion src/common/BaseStateManager.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { isString, isFunction, isPlainObject } from '@ircam/sc-utils';
import SharedState from './BaseSharedState.js';
import SharedStateCollection from './BaseSharedStateCollection.js';
import BatchedTransport from './BatchedTransport.js';
import PromiseStore from './PromiseStore.js';
import {
CREATE_REQUEST,
Expand Down Expand Up @@ -30,7 +31,8 @@ class BaseStateManager {
* Must implement a basic EventEmitter API.
*/
constructor(id, transport) {
this.client = { id, transport };
// proxy transport with BatchedTransport;
this.client = { id, transport: new BatchedTransport(transport) };

this._statesById = new Map(); // <id, state>
this._cachedSchemas = new Map(); // <shemaName, definition>
Expand Down
62 changes: 62 additions & 0 deletions src/common/BatchedTransport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { BATCHED_TRANSPORT_CHANNEL } from './constants.js'

class BatchedTransport {
constructor(transport) {
this._transport = transport;
this._stack = [];
this._listeners = new Map();
this._sending = false;

this._transport.addListener(BATCHED_TRANSPORT_CHANNEL, stack => {
stack.forEach(entry => {
const [channel, args] = entry;
// server side the transport is the same EventEmitter instance
// for both state manager server and client, so channel might not exist
// one side or the other.
if (this._listeners.has(channel)) {
const callbacks = this._listeners.get(channel);
callbacks.forEach(callback => callback(...args));
};
});
});
}

addListener(channel, callback) {
if (!this._listeners.has(channel)) {
this._listeners.set(channel, new Set());
}

const callbacks = this._listeners.get(channel);
callbacks.add(callback);
}

async emit(channel, ...args) {
this._stack.push([channel, args]);

if (!this._sending) {
this._sending = true;
this._sending = await false;
const stack = this._stack;
this._stack = [];
this._transport.emit(BATCHED_TRANSPORT_CHANNEL, stack);
}
}

removeListener(channel, callback) {
if (!this._listeners.has(channel)) {
const callbacks = this._listeners.get(channel);
callbacks.delete(callback);
}
}

removeAllListeners(channel = null) {
if (channel === null) {
this._listeners.clear();
} else if (this._listeners.has(channel)) {
const callbacks = this._listeners.get(channel);
callbacks.clear();
}
}
}

export default BatchedTransport;
3 changes: 3 additions & 0 deletions src/common/constants.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// id of the server when owner of a state
export const SERVER_ID = -1;

// batched transport channel
export const BATCHED_TRANSPORT_CHANNEL = 'b:t';

// shared states channels
export const CREATE_REQUEST = 's:c:req';
export const CREATE_RESPONSE = 's:c:res';
Expand Down
5 changes: 1 addition & 4 deletions src/server/Sockets.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { Worker } from 'node:worker_threads';
import path from 'node:path';
// import { fileURLToPath } from 'node:url';

import querystring from 'querystring';
import { WebSocketServer } from 'ws';
import WebSocket from 'ws';
import { default as WebSocket, WebSocketServer } from 'ws';

import Socket from './Socket.js';
import networkLatencyWorker from './audit-network-latency.worker.js';
Expand Down
3 changes: 3 additions & 0 deletions src/server/StateManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { idGenerator, isString, isPlainObject } from '@ircam/sc-utils';
import clonedeep from 'lodash/cloneDeep.js';

import BaseStateManager from '../common/BaseStateManager.js';
import BatchedTransport from '../common/BatchedTransport.js';
import ParameterBag from '../common/ParameterBag.js';
import SharedStatePrivate from '../common/SharedStatePrivate.js';
import {
Expand Down Expand Up @@ -344,6 +345,8 @@ class StateManager extends BaseStateManager {
* @private
*/
addClient(nodeId, transport) {
transport = new BatchedTransport(transport);

const client = { id: nodeId, transport };
this._clientByNodeId.set(nodeId, client);

Expand Down
89 changes: 71 additions & 18 deletions tests/essentials/Server.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import fs from 'node:fs';
import path from 'node:path';
import * as url from 'node:url';

import { delay } from '@ircam/sc-utils';
import { assert } from 'chai';
import dotenv from 'dotenv';
import merge from 'lodash.merge';
Expand Down Expand Up @@ -650,33 +651,85 @@ describe('# server::Server', () => {
const server = new Server(config);
await server.start();

// const auditState = await server.getAuditState();
const auditState = await server.getAuditState();

// {
// const numClients = auditState.get('numClients');
// assert.equal(numClients.test, 0);
// }
{
const numClients = auditState.get('numClients');
assert.equal(numClients.test, 0);
}

// // const client = new Client({ role: 'test', ...config });
// // await client.start();
const client = new Client({ role: 'test', ...config });
await client.start();

// // {
// // const numClients = auditState.get('numClients');
// // assert.equal(numClients.test, 1);
// // }
{
const numClients = auditState.get('numClients');
assert.equal(numClients.test, 1);
}

// // await client.stop();
// // // wait for the server to clean things
// // await new Promise(resolve => setTimeout(resolve, 50));
await client.stop();
// wait for the server to clean things
await new Promise(resolve => setTimeout(resolve, 50));

// {
// const numClients = auditState.get('numClients');
// assert.equal(numClients.test, 0);
// }
{
const numClients = auditState.get('numClients');
assert.equal(numClients.test, 0);
}

// await auditState.delete();
await server.stop();
console.log('server stopped');
});
});

describe(`## server.onClientConnect(func)`, () => {
it(`should be called`, async () => {
const server = new Server(config);
await server.start();

let onConnectCalled = false;
let onConnectClientId = null;

server.onClientConnect(client => {
onConnectCalled = true;
onConnectClientId = client.id;
});

const client = new Client({ role: 'test', ...config });
await client.start();

await delay(20);

assert.equal(onConnectCalled, true);
assert.equal(onConnectClientId, client.id);

await client.stop();
await server.stop();
});
});

describe(`## server.onClientDisconnect(func)`, () => {
it(`should be called`, async () => {
const server = new Server(config);
await server.start();

let onDisconnectCalled = false;
let onDisconnectClientId = null;

server.onClientDisconnect(client => {
onDisconnectCalled = true;
onDisconnectClientId = client.id;
});

const client = new Client({ role: 'test', ...config });
await client.start();
await client.stop();

await delay(20);

assert.equal(onDisconnectCalled, true);
assert.equal(onDisconnectClientId, client.id);

await server.stop();
});
});
});
59 changes: 54 additions & 5 deletions tests/states/SharedState.spec.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { assert } from 'chai';
import { delay } from '@ircam/sc-utils';
import { assert } from 'chai';

import { Server } from '../../src/server/index.js';
import { Client } from '../../src/client/index.js';
import { BATCHED_TRANSPORT_CHANNEL } from '../../src/common/constants.js';

import config from '../utils/config.js';
import { a, b } from '../utils/schemas.js';
Expand Down Expand Up @@ -502,10 +503,11 @@ describe('# SharedState', () => {
const aCreated = await server.stateManager.create('a');
const aAttached = await client.stateManager.attach('a');

// DELETE_REQUEST is received first on the SharedStatePrivate which deletes
// all its listeners.
// Concurrently DETACH_REQUEST is sent but cannot have a response,
// flush pending requests when DELETE_NOTIFICATION is received
// - DELETE_REQUEST sent by `aCreated` is received first on the
// SharedStatePrivate which deletes all its listeners.
// - Concurrently DETACH_REQUEST is sent by `aAttached` but cannot have a response,
// - Flush pending requests on `aAttached` when DELETE_NOTIFICATION is received

aCreated.delete();

let errored = false;
Expand All @@ -522,4 +524,51 @@ describe('# SharedState', () => {
}
});
});

describe(`## Batched transport`, () => {
it(`should send only one message on several consecutive update requests`, async () => {
// launch new server so we can grab the server side representation of the client
const localConfig = structuredClone(config);
localConfig.env.port = 8082;

const server = new Server(localConfig);
server.stateManager.registerSchema('a', a);
await server.start();

let batchedRequests = 0;
let batchedResponses = 0;

server.onClientConnect(client => {
client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => {
batchedRequests += 1;
});
});

const client = new Client({ role: 'test', ...localConfig });
await client.start();

// update response
client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => {
batchedResponses += 1;
});

const state = await client.stateManager.create('a');

state.set({ bool: true });
for (let i = 1; i < 42; i++) {
state.set({ int: i });
}

await delay(20);

// 1 message for create request / response (i.e.await client.stateManager.create)
// 1 message for the batched updates requests / responses
assert.equal(batchedRequests, 2);
assert.equal(batchedResponses, 2);

state.delete();
await client.stop();
await server.stop();
});
});
});

0 comments on commit f3aa1ce

Please sign in to comment.