Skip to content

Commit

Permalink
feat: introduce collection controller proxy see #64, requires more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
b-ma committed Dec 13, 2023
1 parent 6786c72 commit 9319a08
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 132 deletions.
23 changes: 15 additions & 8 deletions src/common/BaseSharedStateCollection.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import {
kCreateCollectionController,
kAttachInCollection,
} from './BaseStateManager.js';

/**
* @private
*/
Expand All @@ -7,6 +12,7 @@ class BaseSharedStateCollection {
this._schemaName = schemaName;
this._options = Object.assign({ excludeLocal: false }, options);

this._controller = null;
this._states = [];

this._onUpdateCallbacks = new Set();
Expand All @@ -16,15 +22,10 @@ class BaseSharedStateCollection {
}

async _init() {
// used for global set improvements
// this._controlState = await this._stateManager.create(schemaName);
this._controller = await this._stateManager[kCreateCollectionController](this._schemaName);

this._unobserve = await this._stateManager.observe(this._schemaName, async (schemaName, stateId) => {
// we don't want the control state to be listed


const state = await this._stateManager.attach(schemaName, stateId);

const state = await this._stateManager[kAttachInCollection](schemaName, stateId);
this._states.push(state);

state.onDetach(() => {
Expand Down Expand Up @@ -68,6 +69,8 @@ class BaseSharedStateCollection {
this._unobserve();
this._onUpdateCallbacks.clear();

await this._controller.delete();

const promises = this._states.map(state => state.detach());
await Promise.all(promises);

Expand Down Expand Up @@ -100,7 +103,11 @@ class BaseSharedStateCollection {
* current call and will be passed as third argument to all update listeners.
*/
async set(updates, context = null) {
const promises = this._states.map(state => state.set(updates, context));
const updatesResult = await this._controller.set(updates, context);
const promises = this._states.map(state => {
return state._commit(updatesResult, context, true, false);
});

return Promise.all(promises);
}

Expand Down
23 changes: 18 additions & 5 deletions src/common/BaseStateManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import {
DELETE_SCHEMA,
} from './constants.js';

export const kCreateCollectionController = Symbol('BaseStateManager::createCollectionController');
export const kAttachInCollection = Symbol('BaseStateManager::attachInCollection');

/**
* @private
*/
Expand Down Expand Up @@ -182,13 +185,15 @@ class BaseStateManager {
});
}

async _createCollectionController(schemaName) {

/**
* @private
*/
async [kCreateCollectionController](schemaName) {
return new Promise((resolve, reject) => {
const reqId = this._promiseStore.add(resolve, reject, 'create-collection-controller-request');
const requireSchema = this._cachedSchemas.has(schemaName) ? false : true;
this.client.transport.emit(CREATE_REQUEST, reqId, schemaName, requireSchema, {}, {
collectionController: true,
});
this.client.transport.emit(CREATE_REQUEST, reqId, schemaName, requireSchema, {}, true);
});
}

Expand All @@ -208,13 +213,20 @@ class BaseStateManager {
*/
async attach(schemaName, stateId = null) {
return new Promise((resolve, reject) => {
// @todo - add a timeout
const reqId = this._promiseStore.add(resolve, reject, 'attach-request');
const requireSchema = this._cachedSchemas.has(schemaName) ? false : true;
this.client.transport.emit(ATTACH_REQUEST, reqId, schemaName, stateId, requireSchema);
});
}

async [kAttachInCollection](schemaName, stateId) {
return new Promise((resolve, reject) => {
const reqId = this._promiseStore.add(resolve, reject, 'attach-in-collection-request');
const requireSchema = this._cachedSchemas.has(schemaName) ? false : true;
this.client.transport.emit(ATTACH_REQUEST, reqId, schemaName, stateId, requireSchema, true);
});
}

/**
* Observe all the `SharedState` instances that are created on the network.
* This can be usefull for clients with some controller role that might want to track
Expand Down Expand Up @@ -376,6 +388,7 @@ class BaseStateManager {
try {
await collection._init();
} catch (err) {
console.log(err.message)
throw new Error(`Cannot create collection, schema "${schemaName}" does not exists`);
}

Expand Down
135 changes: 98 additions & 37 deletions src/common/SharedStatePrivate.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class SharedStatePrivate {
constructor(id, schemaName, schema, manager, initValues = {}) {
this.id = id;
this.schemaName = schemaName;
this.isCollectionController = false;

this._manager = manager;
this._parameters = new ParameterBag(schema, initValues);
Expand All @@ -29,10 +30,16 @@ class SharedStatePrivate {
this._creatorId = null;
}

_attachClient(remoteId, client, isOwner = false) {
this._attachedClients.set(remoteId, client);
_attachClient(remoteId, client, isOwner = false, inCollection = false) {

// isOwner && inCollection -> collection controller
// !isOwner && inCollection -> attached in collection

const clientInfos = { client, isOwner, inCollection };
this._attachedClients.set(remoteId, clientInfos);

if (isOwner) {
this.isCollectionController = inCollection;
this._creatorRemoteId = remoteId;
this._creatorId = client.id;
}
Expand Down Expand Up @@ -89,44 +96,97 @@ class SharedStatePrivate {
}

if (hasUpdates) {
// We need to handle cases where:
// - client state (client.id: 2) sends a request
// - server attached state (client.id: -1) spot a problem and overrides the value
// We want the remote client (id: 2) to receive in the right order:
// * 1. the value it requested,
// * 2. the value overriden by the server-side attached state (id: -1)
//
// such problem is now better solved with the the upateHook system, none
// nonetheway we don't want to introduce inconsistencies here
//
// Then we propagate server-side last, because as the server transport
// is synchronous it can break ordering if a subscription function makes
// itself an update in reaction to an update. Propagating to server last
// alllows to maintain network messages order consistent.

// @note - remoteId correspond to unique remote state id

// propagate RESPONSE to the client that originates the request if not the server
if (client.id !== -1) {
client.transport.emit(`${UPDATE_RESPONSE}-${this.id}-${remoteId}`, reqId, filteredUpdates, context);
}
// Collection Controller logic
if (isOwner && inCollection) {
// notify the requester back
// no need to check for attached clients as it is not observable
client.transport.emit(
`${UPDATE_RESPONSE}-${this.id}-${remoteId}`,
reqId, filteredUpdates, context
);

// loop through all private states
for (let [stateId, state] of this._manager._sharedStatePrivateById) {
// pick all states with same schema name and not this
if (state !== this && this.schemaName === state.schemaName) {
// notify all attached clients except those who belong to a collection
// i.e. !isOwner && inCollection, they will be notified by their own
// collection controller
for (let [remoteId, clientInfos] of state._attachedClients) {
const { client, isOwner, inCollection } = clientInfos;

if (!clientInfos.isOwner && clientInfos.inCollection) {
continue;
} else {
const peer = clientInfos.client;

peer.transport.emit(
`${UPDATE_NOTIFICATION}-${state.id}-${remoteId}`,
filteredUpdates, context
);
}
}
}
}
} else {

// propagate NOTIFICATION to all other attached clients except server
for (let [peerRemoteId, peer] of this._attachedClients) {
if (remoteId !== peerRemoteId && peer.id !== -1) {
peer.transport.emit(`${UPDATE_NOTIFICATION}-${this.id}-${peerRemoteId}`, filteredUpdates, context);
// Normal case

// We need to handle cases where:
// - client state (client.id: 2) sends a request
// - server attached state (client.id: -1) spot a problem and overrides the value
// We want the remote client (id: 2) to receive in the right order:
// * 1. the value it requested,
// * 2. the value overriden by the server-side attached state (id: -1)
//
// such problem is now better solved with the the upateHook system, none
// nonetheway we don't want to introduce inconsistencies here
//
// Then we propagate server-side last, because as the server transport
// is synchronous it can break ordering if a subscription function makes
// itself an update in reaction to an update. Propagating to server last
// alllows to maintain network messages order consistent.

// @note - remoteId correspond to unique remote state id

// propagate RESPONSE to the client that originates the request if not the server
if (client.id !== -1) {
client.transport.emit(
`${UPDATE_RESPONSE}-${this.id}-${remoteId}`,
reqId, filteredUpdates, context
);
}
}

// propagate RESPONSE to server if it is the requester
if (client.id === -1) {
client.transport.emit(`${UPDATE_RESPONSE}-${this.id}-${remoteId}`, reqId, filteredUpdates, context);
}
// propagate NOTIFICATION to all other attached clients except server
for (let [peerRemoteId, clientInfos] of this._attachedClients) {
const peer = clientInfos.client;

if (remoteId !== peerRemoteId && peer.id !== -1) {
peer.transport.emit(
`${UPDATE_NOTIFICATION}-${this.id}-${peerRemoteId}`,
filteredUpdates, context
);
}
}

// propagate RESPONSE to server if it is the requester
if (client.id === -1) {
client.transport.emit(
`${UPDATE_RESPONSE}-${this.id}-${remoteId}`,
reqId, filteredUpdates, context
);
}

// propagate NOTIFICATION other attached state that belongs to server
for (let [peerRemoteId, clientInfos] of this._attachedClients) {
const peer = clientInfos.client;

// propagate NOTIFICATION other attached state that belongs to server
for (let [peerRemoteId, peer] of this._attachedClients) {
if (remoteId !== peerRemoteId && peer.id === -1) {
peer.transport.emit(`${UPDATE_NOTIFICATION}-${this.id}-${peerRemoteId}`, filteredUpdates, context);
if (remoteId !== peerRemoteId && peer.id === -1) {
peer.transport.emit(
`${UPDATE_NOTIFICATION}-${this.id}-${peerRemoteId}`,
filteredUpdates, context
);
}
}
}
} else {
Expand Down Expand Up @@ -156,7 +216,8 @@ class SharedStatePrivate {
// @todo - propagate server-side last, because if a subscription function sends a
// message to a client, network messages order are kept coherent
// this._subscriptions.forEach(func => func(updated));
for (let [remoteId, attached] of this._attachedClients.entries()) {
for (let [remoteId, clientInfos] of this._attachedClients) {
const attached = clientInfos.client;
this._detachClient(remoteId, attached);

if (remoteId === this._creatorRemoteId) {
Expand Down
Loading

0 comments on commit 9319a08

Please sign in to comment.