Skip to content

Commit

Permalink
fix(StateManager): race condition between OBSERVE_RESPONSE and OBSERV…
Browse files Browse the repository at this point in the history
…E_NOTIFICATION
  • Loading branch information
b-ma committed Dec 12, 2023
1 parent 97be502 commit 2058d6e
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 191 deletions.
18 changes: 15 additions & 3 deletions src/common/BaseSharedStateCollection.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/**
* @private
*/
let id = 0;

class BaseSharedStateCollection {
constructor(stateManager, schemaName) {
this._stateManager = stateManager;
Expand All @@ -10,6 +12,8 @@ class BaseSharedStateCollection {
this._onAttachCallbacks = new Set();
this._onDetachCallbacks = new Set();
this._unobserve = null;
// testing
this._id = id++;
}

async _init() {
Expand All @@ -36,24 +40,32 @@ class BaseSharedStateCollection {
}

/**
* Size of the collection
* Size of the collection, alias `size`
* @type {number}
*/
get length() {
return this._states.length;
}

/**
* Size of the collection, , alias `length`
* @type {number}
*/
get size() {
return this._states.length;
}

/**
* Detach from the collection, i.e. detach all underlying shared states.
* @type {number}
*/
async detach() {
this._unobserve();
this._onUpdateCallbacks.clear();

const promises = Array.from(this._states).map(state => state.detach());
const promises = this._states.map(state => state.detach());
await Promise.all(promises);

this._onUpdateCallbacks.clear();
this._onDetachCallbacks.clear();
}

Expand Down
22 changes: 20 additions & 2 deletions src/common/BaseStateManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class BaseStateManager {
const [callback, filterSchemaName] = this._observeRequestCallbacks.get(reqId);
this._observeRequestCallbacks.delete(reqId);

// now that the OBSERVE_REPOSNSE callback is executed, store it in
// OBSERVE_NOTIFICATION listeners
this._observeListeners.set(callback, filterSchemaName);

const promises = list.map(([schemaName, stateId, nodeId]) => {
if (filterSchemaName === '*' || filterSchemaName === schemaName) {
return callback(schemaName, stateId, nodeId);
Expand Down Expand Up @@ -231,8 +235,22 @@ class BaseStateManager {
// store the callback for execution on the response. the returned Promise
// is fullfiled once callback has been executed with each existing states
this._observeRequestCallbacks.set(reqId, [callback, filterSchemaName]);
// store the callback for execution on subsequent notifications
this._observeListeners.set(callback, filterSchemaName);

// NOTE: do not store in `_observeListeners` yet as it can produce races, e.g.:
// cf. test `observe should properly behave in race condition`
// ```
// await client.stateManager.observe(async (schemaName, stateId, nodeId) => {});
// // client now receives OBSERVE_NOTIFICATIONS
// await otherClient.stateManager.create('a');
// // second observer added in between
// client.stateManager.observe(async (schemaName, stateId, nodeId) => {});
// ````
// OBSERVE_NOTIFICATION is received before the OBSERVE_RESPONSE, then the
// second observer is called twice:
// - OBSERVE_RESPONSE 1 []
// - OBSERVE_NOTIFICATION [ 'a', 1, 0 ]
// - OBSERVE_NOTIFICATION [ 'a', 1, 0 ] // this should not be executed
// - OBSERVE_RESPONSE 1 [ [ 'a', 1, 0 ] ]

this.client.transport.emit(OBSERVE_REQUEST, reqId);
});
Expand Down
1 change: 1 addition & 0 deletions src/common/SharedStatePrivate.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class SharedStatePrivate {
} else {
// detach only if not creator
client.transport.addListener(`${DETACH_REQUEST}-${this.id}-${remoteId}`, (reqId) => {
console.log('SharedStatePrivate: receive detach request', reqId);
this._detachClient(remoteId, client);
client.transport.emit(`${DETACH_RESPONSE}-${this.id}-${remoteId}`, reqId);
});
Expand Down
18 changes: 18 additions & 0 deletions tests/states/SharedState.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -496,4 +496,22 @@ describe('# ShareState', () => {
}
});
});

describe(`## Race conditions`, () => {
it.skip(`[FIXME #73] should not stuck the program`, async () => {
const aCreated = await server.stateManager.create('a');
const aAttached = await client.stateManager.attach('a');

// DELETE_REQUEST is received first on the SharedStatePrivate which deletes all its listeners
// The DETACH_REQUEST is sent but have not response, then it never resolves so the program is stuck...

// Possible fix:
// keep track of all requests and reject them if a DELETE_NOTIFICATION or
// a DETACH_NOTIFICATION is received
// is received

await aCreated.delete();
await aAttached.detach(); // program is stuck here...
});
});
});
172 changes: 0 additions & 172 deletions tests/states/SharedStateCollection.spec.js

This file was deleted.

Loading

0 comments on commit 2058d6e

Please sign in to comment.