diff --git a/src/client/Context.js b/src/client/Context.js index 9115e332..9bb4f068 100644 --- a/src/client/Context.js +++ b/src/client/Context.js @@ -1,4 +1,5 @@ import Client from './Client.js'; +import PromiseStore from '../common/PromiseStore.js'; import { CONTEXT_ENTER_REQUEST, CONTEXT_ENTER_RESPONSE, @@ -7,11 +8,6 @@ import { CONTEXT_EXIT_RESPONSE, CONTEXT_EXIT_ERROR, } from '../common/constants.js'; -import { - storeRequestPromise, - resolveRequest, - rejectRequest, -} from '../common/promise-store.js'; /** * Base class to extend in order to implement a new Context. @@ -81,12 +77,15 @@ class Context { */ this.status = 'idle'; + /** @private */ + this._promiseStore = new PromiseStore(this.constructor.name); + this.client.socket.addListener(CONTEXT_ENTER_RESPONSE, (reqId, contextName) => { if (contextName !== this.name) { return; } - resolveRequest(reqId); + this._promiseStore.resolve(reqId); }); this.client.socket.addListener(CONTEXT_ENTER_ERROR, (reqId, contextName, msg) => { @@ -94,7 +93,7 @@ class Context { return; } - rejectRequest(reqId, msg); + this._promiseStore.reject(reqId, msg); }); this.client.socket.addListener(CONTEXT_EXIT_RESPONSE, (reqId, contextName) => { @@ -102,7 +101,7 @@ class Context { return; } - resolveRequest(reqId); + this._promiseStore.resolve(reqId); }); this.client.socket.addListener(CONTEXT_EXIT_ERROR, (reqId, contextName, msg) => { @@ -110,7 +109,7 @@ class Context { return; } - rejectRequest(reqId, msg); + this._promiseStore.reject(reqId, msg); }); this.client.contextManager.register(this); @@ -206,7 +205,7 @@ class Context { // we need the try/catch block to change the promise rejection into proper error try { await new Promise((resolve, reject) => { - const reqId = storeRequestPromise(resolve, reject); + const reqId = this._promiseStore.add(resolve, reject, 'enter-context'); this.client.socket.send(CONTEXT_ENTER_REQUEST, reqId, this.name); }); } catch (err) { @@ -240,7 +239,7 @@ class Context { // we need the try/catch block to change the promise rejection into proper error try { await new Promise((resolve, reject) => { - const reqId = storeRequestPromise(resolve, reject); + const reqId = this._promiseStore.add(resolve, reject, 'exit-context'); this.client.socket.send(CONTEXT_EXIT_REQUEST, reqId, this.name); }); } catch (err) { diff --git a/src/common/BaseSharedState.js b/src/common/BaseSharedState.js index 8b609bf4..c713db66 100644 --- a/src/common/BaseSharedState.js +++ b/src/common/BaseSharedState.js @@ -1,5 +1,6 @@ import { isPlainObject } from '@ircam/sc-utils'; import ParameterBag from './ParameterBag.js'; +import PromiseStore from './PromiseStore.js'; import { DELETE_REQUEST, DELETE_RESPONSE, @@ -13,11 +14,6 @@ import { UPDATE_ABORT, UPDATE_NOTIFICATION, } from './constants.js'; -import { - storeRequestPromise, - resolveRequest, - rejectRequest, -} from './promise-store.js'; class BaseSharedState { constructor(id, remoteId, schemaName, schema, client, isOwner, manager, initValues = {}) { @@ -33,8 +29,13 @@ class BaseSharedState { this._client = client; /** @private */ this._manager = manager; - + /** + * true is the state has been detached or deleted + * @private + */ this._detached = false; + /* @private */ + this._promiseStore = new PromiseStore(this.constructor.name); try { /** @private */ @@ -56,13 +57,13 @@ ${JSON.stringify(initValues, null, 2)}`); // add listener for state updates this._client.transport.addListener(`${UPDATE_RESPONSE}-${this.id}-${this.remoteId}`, async (reqId, updates, context) => { const updated = await this._commit(updates, context, true, true); - resolveRequest(reqId, updated); + this._promiseStore.resolve(reqId, updated); }); // retrieve values but do not propagate to subscriptions this._client.transport.addListener(`${UPDATE_ABORT}-${this.id}-${this.remoteId}`, async (reqId, updates, context) => { const updated = await this._commit(updates, context, false, true); - resolveRequest(reqId, updated); + this._promiseStore.resolve(reqId, updated); }); this._client.transport.addListener(`${UPDATE_NOTIFICATION}-${this.id}-${this.remoteId}`, async (updates, context) => { @@ -112,7 +113,9 @@ ${JSON.stringify(initValues, null, 2)}`); } } - this._clearDetach(); + this._onDetachCallbacks.clear(); + this._onDeleteCallbacks.clear(); + this._promiseStore.flush(); }); @@ -132,12 +135,14 @@ ${JSON.stringify(initValues, null, 2)}`); await callback(); } - this._clearDetach(); - resolveRequest(reqId, this); + this._onDetachCallbacks.clear(); + this._onDeleteCallbacks.clear(); + this._promiseStore.resolve(reqId, this); + this._promiseStore.flush(); }); this._client.transport.addListener(`${DELETE_ERROR}-${this.id}`, (reqId, msg) => { - rejectRequest(reqId, msg); + this._promiseStore.reject(reqId, msg); }); } else { @@ -152,13 +157,18 @@ ${JSON.stringify(initValues, null, 2)}`); await callback(); } - this._clearDetach(); - resolveRequest(reqId, this); + this._onDetachCallbacks.clear(); + this._onDeleteCallbacks.clear(); + this._promiseStore.resolve(reqId, this); + this._promiseStore.flush(); }); // the state does not exists anymore in the server (should not happen) this._client.transport.addListener(`${DETACH_ERROR}-${this.id}`, (reqId, msg) => { - rejectRequest(reqId, msg); + this._onDetachCallbacks.clear(); + this._onDeleteCallbacks.clear(); + this._promiseStore.reject(reqId, msg); + this._promiseStore.flush(); }); } } @@ -200,13 +210,6 @@ ${JSON.stringify(initValues, null, 2)}`); return this._isOwner; } - /** @private */ - _clearDetach() { - this._onDetachCallbacks.clear(); - this._onDeleteCallbacks.clear(); - this._detached = true; - } - /** @private */ _clearTransport() { // remove listeners @@ -358,7 +361,7 @@ ${JSON.stringify(initValues, null, 2)}`); // go through server-side normal behavior return new Promise((resolve, reject) => { - const reqId = storeRequestPromise(resolve, reject); + const reqId = this._promiseStore.add(resolve, reject, 'update-request'); this._client.transport.emit(`${UPDATE_REQUEST}-${this.id}-${this.remoteId}`, reqId, updates, context); }); } @@ -477,16 +480,17 @@ ${JSON.stringify(initValues, null, 2)}`); throw new Error(`[SharedState] State "${this.schemaName} (${this.id})" already detached, cannot detach twice`); } + this._detached = true; // mark detached early this._onUpdateCallbacks.clear(); if (this._isOwner) { return new Promise((resolve, reject) => { - const reqId = storeRequestPromise(resolve, reject); + const reqId = this._promiseStore.add(resolve, reject, 'delete-request'); this._client.transport.emit(`${DELETE_REQUEST}-${this.id}-${this.remoteId}`, reqId); }); } else { return new Promise((resolve, reject) => { - const reqId = storeRequestPromise(resolve, reject); + const reqId = this._promiseStore.add(resolve, reject, 'detach-request'); this._client.transport.emit(`${DETACH_REQUEST}-${this.id}-${this.remoteId}`, reqId); }); } diff --git a/src/common/BaseStateManager.js b/src/common/BaseStateManager.js index 4199cc10..35ff3138 100644 --- a/src/common/BaseStateManager.js +++ b/src/common/BaseStateManager.js @@ -1,6 +1,7 @@ import { isString, isFunction } from '@ircam/sc-utils'; import SharedState from './BaseSharedState.js'; import SharedStateCollection from './BaseSharedStateCollection.js'; +import PromiseStore from './PromiseStore.js'; import { CREATE_REQUEST, CREATE_RESPONSE, @@ -14,11 +15,6 @@ import { UNOBSERVE_NOTIFICATION, DELETE_SCHEMA, } from './constants.js'; -import { - storeRequestPromise, - resolveRequest, - rejectRequest, -} from './promise-store.js'; /** * @private @@ -37,6 +33,8 @@ class BaseStateManager { this._cachedSchemas = new Map(); this._observeRequestCallbacks = new Map(); + this._promiseStore = new PromiseStore(); + // --------------------------------------------- // CREATE // --------------------------------------------- @@ -53,11 +51,11 @@ class BaseStateManager { const state = new SharedState(stateId, remoteId, schemaName, schema, this.client, true, this, initValues); this._statesById.set(state.id, state); - resolveRequest(reqId, state); + this._promiseStore.resolve(reqId, state); }); this.client.transport.addListener(CREATE_ERROR, (reqId, msg) => { - rejectRequest(reqId, msg); + this._promiseStore.reject(reqId, msg); }); // --------------------------------------------- @@ -77,11 +75,11 @@ class BaseStateManager { const state = new SharedState(stateId, remoteId, schemaName, schema, this.client, false, this, currentValues); this._statesById.set(state.id, state); - resolveRequest(reqId, state); + this._promiseStore.resolve(reqId, state); }); this.client.transport.addListener(ATTACH_ERROR, (reqId, msg) => { - rejectRequest(reqId, msg); + this._promiseStore.reject(reqId, msg); }); // --------------------------------------------- @@ -115,7 +113,7 @@ class BaseStateManager { } }; - resolveRequest(reqId, unsubscribe); + this._promiseStore.resolve(reqId, unsubscribe); }); this.client.transport.addListener(OBSERVE_NOTIFICATION, (schemaName, stateId, nodeId) => { @@ -148,7 +146,7 @@ class BaseStateManager { */ async create(schemaName, initValues = {}) { return new Promise((resolve, reject) => { - const reqId = storeRequestPromise(resolve, reject); + const reqId = this._promiseStore.add(resolve, reject, 'create-create'); const requireSchema = this._cachedSchemas.has(schemaName) ? false : true; this.client.transport.emit(CREATE_REQUEST, reqId, schemaName, requireSchema, initValues); }); @@ -171,7 +169,7 @@ class BaseStateManager { async attach(schemaName, stateId = null) { return new Promise((resolve, reject) => { // @todo - add a timeout - const reqId = storeRequestPromise(resolve, reject); + const reqId = this._promiseStore.add(resolve, reject, 'attach-request'); const requireSchema = this._cachedSchemas.has(schemaName) ? false : true; this.client.transport.emit(ATTACH_REQUEST, reqId, schemaName, stateId, requireSchema); }); @@ -231,7 +229,7 @@ class BaseStateManager { // resend request to get updated list of states return new Promise((resolve, reject) => { - const reqId = storeRequestPromise(resolve, reject); + const reqId = this._promiseStore.add(resolve, reject, 'observe-request'); // store the callback for execution on the response. the returned Promise // is fullfiled once callback has been executed with each existing states this._observeRequestCallbacks.set(reqId, [callback, filterSchemaName]); diff --git a/src/common/PromiseStore.js b/src/common/PromiseStore.js new file mode 100644 index 00000000..f817d893 --- /dev/null +++ b/src/common/PromiseStore.js @@ -0,0 +1,41 @@ +import { idGenerator } from '@ircam/sc-utils'; + +const generateRequestId = idGenerator(); + +export default class PromiseStore { + constructor(name) { + this.name = name; + this.store = new Map(); + } + + add(resolve, reject, type) { + const reqId = generateRequestId.next().value; + this.store.set(reqId, { resolve, reject, type }); + + return reqId; + } + + resolve(reqId, data) { + const { resolve } = this.store.get(reqId); + this.store.delete(reqId); + + resolve(data); + } + + reject(reqId, msg) { + const { reject } = this.store.get(reqId); + this.store.delete(reqId); + + reject(new Error(msg)); + } + + // reject all pendeing request + flush() { + for (let [_reqId, entry] of this.store) { + const { reject, type } = entry; + reject(new Error(`[${this.name}] Discard promise "${type}", cannot resolve`)); + } + + this.store.clear(); + } +} diff --git a/src/common/promise-store.js b/src/common/promise-store.js deleted file mode 100644 index 1d2d684b..00000000 --- a/src/common/promise-store.js +++ /dev/null @@ -1,25 +0,0 @@ -import { idGenerator } from '@ircam/sc-utils'; - -const generateRequestId = idGenerator(); -const requestPromises = new Map(); - -export function storeRequestPromise(resolve, reject) { - const reqId = generateRequestId.next().value; - requestPromises.set(reqId, { resolve, reject }); - - return reqId; -} - -export function resolveRequest(reqId, data) { - const { resolve } = requestPromises.get(reqId); - requestPromises.delete(reqId); - - resolve(data); -} - -export function rejectRequest(reqId, msg) { - const { reject } = requestPromises.get(reqId); - requestPromises.delete(reqId); - - reject(new Error(msg)); -} diff --git a/tests/states/SharedState.spec.js b/tests/states/SharedState.spec.js index 2e88f39d..c78d3cf1 100644 --- a/tests/states/SharedState.spec.js +++ b/tests/states/SharedState.spec.js @@ -7,7 +7,7 @@ import { Client } from '../../src/client/index.js'; import config from '../utils/config.js'; import { a, b } from '../utils/schemas.js'; -describe('# ShareState', () => { +describe('# SharedState', () => { let server; let client; @@ -498,20 +498,28 @@ describe('# ShareState', () => { }); describe(`## Race conditions`, () => { - it.skip(`[FIXME #73] should not stuck the program`, async () => { + it(`[FIXME #73] should not stuck the program`, async () => { const aCreated = await server.stateManager.create('a'); const aAttached = await client.stateManager.attach('a'); - // DELETE_REQUEST is received first on the SharedStatePrivate which deletes all its listeners - // The DETACH_REQUEST is sent but have not response, then it never resolves so the program is stuck... + // DELETE_REQUEST is received first on the SharedStatePrivate which deletes + // all its listeners. + // Concurrently DETACH_REQUEST is sent but have not response, request is flush when + // DELETE_NOTIFICATION or DETACH_NOTIFICATION is received + aCreated.delete(); - // Possible fix: - // keep track of all requests and reject them if a DELETE_NOTIFICATION or - // a DETACH_NOTIFICATION is received - // is received + let errored = false; - await aCreated.delete(); - await aAttached.detach(); // program is stuck here... + try { + await aAttached.detach(); + } catch (err) { + console.log(err.message); + errored = true; + } + + if (!errored) { + assert.fail('should have thrown'); + } }); }); });