Skip to content

Commit

Permalink
refactor: promise store, flush pending request when state is deleted -
Browse files Browse the repository at this point in the history
…fix #73
  • Loading branch information
b-ma committed Dec 12, 2023
1 parent 25c8063 commit 7f0e7c2
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 84 deletions.
21 changes: 10 additions & 11 deletions src/client/Context.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Client from './Client.js';
import PromiseStore from '../common/PromiseStore.js';
import {
CONTEXT_ENTER_REQUEST,
CONTEXT_ENTER_RESPONSE,
Expand All @@ -7,11 +8,6 @@ import {
CONTEXT_EXIT_RESPONSE,
CONTEXT_EXIT_ERROR,
} from '../common/constants.js';
import {
storeRequestPromise,
resolveRequest,
rejectRequest,
} from '../common/promise-store.js';

/**
* Base class to extend in order to implement a new Context.
Expand Down Expand Up @@ -81,36 +77,39 @@ class Context {
*/
this.status = 'idle';

/** @private */
this._promiseStore = new PromiseStore(this.constructor.name);

this.client.socket.addListener(CONTEXT_ENTER_RESPONSE, (reqId, contextName) => {
if (contextName !== this.name) {
return;
}

resolveRequest(reqId);
this._promiseStore.resolve(reqId);
});

this.client.socket.addListener(CONTEXT_ENTER_ERROR, (reqId, contextName, msg) => {
if (contextName !== this.name) {
return;
}

rejectRequest(reqId, msg);
this._promiseStore.reject(reqId, msg);
});

this.client.socket.addListener(CONTEXT_EXIT_RESPONSE, (reqId, contextName) => {
if (contextName !== this.name) {
return;
}

resolveRequest(reqId);
this._promiseStore.resolve(reqId);
});

this.client.socket.addListener(CONTEXT_EXIT_ERROR, (reqId, contextName, msg) => {
if (contextName !== this.name) {
return;
}

rejectRequest(reqId, msg);
this._promiseStore.reject(reqId, msg);
});

this.client.contextManager.register(this);
Expand Down Expand Up @@ -206,7 +205,7 @@ class Context {
// we need the try/catch block to change the promise rejection into proper error
try {
await new Promise((resolve, reject) => {
const reqId = storeRequestPromise(resolve, reject);
const reqId = this._promiseStore.add(resolve, reject, 'enter-context');
this.client.socket.send(CONTEXT_ENTER_REQUEST, reqId, this.name);
});
} catch (err) {
Expand Down Expand Up @@ -240,7 +239,7 @@ class Context {
// we need the try/catch block to change the promise rejection into proper error
try {
await new Promise((resolve, reject) => {
const reqId = storeRequestPromise(resolve, reject);
const reqId = this._promiseStore.add(resolve, reject, 'exit-context');
this.client.socket.send(CONTEXT_EXIT_REQUEST, reqId, this.name);
});
} catch (err) {
Expand Down
54 changes: 29 additions & 25 deletions src/common/BaseSharedState.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { isPlainObject } from '@ircam/sc-utils';
import ParameterBag from './ParameterBag.js';
import PromiseStore from './PromiseStore.js';
import {
DELETE_REQUEST,
DELETE_RESPONSE,
Expand All @@ -13,11 +14,6 @@ import {
UPDATE_ABORT,
UPDATE_NOTIFICATION,
} from './constants.js';
import {
storeRequestPromise,
resolveRequest,
rejectRequest,
} from './promise-store.js';

class BaseSharedState {
constructor(id, remoteId, schemaName, schema, client, isOwner, manager, initValues = {}) {
Expand All @@ -33,8 +29,13 @@ class BaseSharedState {
this._client = client;
/** @private */
this._manager = manager;

/**
* true is the state has been detached or deleted
* @private
*/
this._detached = false;
/* @private */
this._promiseStore = new PromiseStore(this.constructor.name);

try {
/** @private */
Expand All @@ -56,13 +57,13 @@ ${JSON.stringify(initValues, null, 2)}`);
// add listener for state updates
this._client.transport.addListener(`${UPDATE_RESPONSE}-${this.id}-${this.remoteId}`, async (reqId, updates, context) => {
const updated = await this._commit(updates, context, true, true);
resolveRequest(reqId, updated);
this._promiseStore.resolve(reqId, updated);
});

// retrieve values but do not propagate to subscriptions
this._client.transport.addListener(`${UPDATE_ABORT}-${this.id}-${this.remoteId}`, async (reqId, updates, context) => {
const updated = await this._commit(updates, context, false, true);
resolveRequest(reqId, updated);
this._promiseStore.resolve(reqId, updated);
});

this._client.transport.addListener(`${UPDATE_NOTIFICATION}-${this.id}-${this.remoteId}`, async (updates, context) => {
Expand Down Expand Up @@ -112,7 +113,9 @@ ${JSON.stringify(initValues, null, 2)}`);
}
}

this._clearDetach();
this._onDetachCallbacks.clear();
this._onDeleteCallbacks.clear();
this._promiseStore.flush();
});


Expand All @@ -132,12 +135,14 @@ ${JSON.stringify(initValues, null, 2)}`);
await callback();
}

this._clearDetach();
resolveRequest(reqId, this);
this._onDetachCallbacks.clear();
this._onDeleteCallbacks.clear();
this._promiseStore.resolve(reqId, this);
this._promiseStore.flush();
});

this._client.transport.addListener(`${DELETE_ERROR}-${this.id}`, (reqId, msg) => {
rejectRequest(reqId, msg);
this._promiseStore.reject(reqId, msg);
});

} else {
Expand All @@ -152,13 +157,18 @@ ${JSON.stringify(initValues, null, 2)}`);
await callback();
}

this._clearDetach();
resolveRequest(reqId, this);
this._onDetachCallbacks.clear();
this._onDeleteCallbacks.clear();
this._promiseStore.resolve(reqId, this);
this._promiseStore.flush();
});

// the state does not exists anymore in the server (should not happen)
this._client.transport.addListener(`${DETACH_ERROR}-${this.id}`, (reqId, msg) => {
rejectRequest(reqId, msg);
this._onDetachCallbacks.clear();
this._onDeleteCallbacks.clear();
this._promiseStore.reject(reqId, msg);
this._promiseStore.flush();
});
}
}
Expand Down Expand Up @@ -200,13 +210,6 @@ ${JSON.stringify(initValues, null, 2)}`);
return this._isOwner;
}

/** @private */
_clearDetach() {
this._onDetachCallbacks.clear();
this._onDeleteCallbacks.clear();
this._detached = true;
}

/** @private */
_clearTransport() {
// remove listeners
Expand Down Expand Up @@ -358,7 +361,7 @@ ${JSON.stringify(initValues, null, 2)}`);

// go through server-side normal behavior
return new Promise((resolve, reject) => {
const reqId = storeRequestPromise(resolve, reject);
const reqId = this._promiseStore.add(resolve, reject, 'update-request');
this._client.transport.emit(`${UPDATE_REQUEST}-${this.id}-${this.remoteId}`, reqId, updates, context);
});
}
Expand Down Expand Up @@ -477,16 +480,17 @@ ${JSON.stringify(initValues, null, 2)}`);
throw new Error(`[SharedState] State "${this.schemaName} (${this.id})" already detached, cannot detach twice`);
}

this._detached = true; // mark detached early
this._onUpdateCallbacks.clear();

if (this._isOwner) {
return new Promise((resolve, reject) => {
const reqId = storeRequestPromise(resolve, reject);
const reqId = this._promiseStore.add(resolve, reject, 'delete-request');
this._client.transport.emit(`${DELETE_REQUEST}-${this.id}-${this.remoteId}`, reqId);
});
} else {
return new Promise((resolve, reject) => {
const reqId = storeRequestPromise(resolve, reject);
const reqId = this._promiseStore.add(resolve, reject, 'detach-request');
this._client.transport.emit(`${DETACH_REQUEST}-${this.id}-${this.remoteId}`, reqId);
});
}
Expand Down
24 changes: 11 additions & 13 deletions src/common/BaseStateManager.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { isString, isFunction } from '@ircam/sc-utils';
import SharedState from './BaseSharedState.js';
import SharedStateCollection from './BaseSharedStateCollection.js';
import PromiseStore from './PromiseStore.js';
import {
CREATE_REQUEST,
CREATE_RESPONSE,
Expand All @@ -14,11 +15,6 @@ import {
UNOBSERVE_NOTIFICATION,
DELETE_SCHEMA,
} from './constants.js';
import {
storeRequestPromise,
resolveRequest,
rejectRequest,
} from './promise-store.js';

/**
* @private
Expand All @@ -37,6 +33,8 @@ class BaseStateManager {
this._cachedSchemas = new Map();
this._observeRequestCallbacks = new Map();

this._promiseStore = new PromiseStore();

// ---------------------------------------------
// CREATE
// ---------------------------------------------
Expand All @@ -53,11 +51,11 @@ class BaseStateManager {
const state = new SharedState(stateId, remoteId, schemaName, schema, this.client, true, this, initValues);
this._statesById.set(state.id, state);

resolveRequest(reqId, state);
this._promiseStore.resolve(reqId, state);
});

this.client.transport.addListener(CREATE_ERROR, (reqId, msg) => {
rejectRequest(reqId, msg);
this._promiseStore.reject(reqId, msg);
});

// ---------------------------------------------
Expand All @@ -77,11 +75,11 @@ class BaseStateManager {
const state = new SharedState(stateId, remoteId, schemaName, schema, this.client, false, this, currentValues);
this._statesById.set(state.id, state);

resolveRequest(reqId, state);
this._promiseStore.resolve(reqId, state);
});

this.client.transport.addListener(ATTACH_ERROR, (reqId, msg) => {
rejectRequest(reqId, msg);
this._promiseStore.reject(reqId, msg);
});

// ---------------------------------------------
Expand Down Expand Up @@ -115,7 +113,7 @@ class BaseStateManager {
}
};

resolveRequest(reqId, unsubscribe);
this._promiseStore.resolve(reqId, unsubscribe);
});

this.client.transport.addListener(OBSERVE_NOTIFICATION, (schemaName, stateId, nodeId) => {
Expand Down Expand Up @@ -148,7 +146,7 @@ class BaseStateManager {
*/
async create(schemaName, initValues = {}) {
return new Promise((resolve, reject) => {
const reqId = storeRequestPromise(resolve, reject);
const reqId = this._promiseStore.add(resolve, reject, 'create-create');
const requireSchema = this._cachedSchemas.has(schemaName) ? false : true;
this.client.transport.emit(CREATE_REQUEST, reqId, schemaName, requireSchema, initValues);
});
Expand All @@ -171,7 +169,7 @@ class BaseStateManager {
async attach(schemaName, stateId = null) {
return new Promise((resolve, reject) => {
// @todo - add a timeout
const reqId = storeRequestPromise(resolve, reject);
const reqId = this._promiseStore.add(resolve, reject, 'attach-request');
const requireSchema = this._cachedSchemas.has(schemaName) ? false : true;
this.client.transport.emit(ATTACH_REQUEST, reqId, schemaName, stateId, requireSchema);
});
Expand Down Expand Up @@ -231,7 +229,7 @@ class BaseStateManager {

// resend request to get updated list of states
return new Promise((resolve, reject) => {
const reqId = storeRequestPromise(resolve, reject);
const reqId = this._promiseStore.add(resolve, reject, 'observe-request');
// store the callback for execution on the response. the returned Promise
// is fullfiled once callback has been executed with each existing states
this._observeRequestCallbacks.set(reqId, [callback, filterSchemaName]);
Expand Down
41 changes: 41 additions & 0 deletions src/common/PromiseStore.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { idGenerator } from '@ircam/sc-utils';

const generateRequestId = idGenerator();

export default class PromiseStore {
constructor(name) {
this.name = name;
this.store = new Map();
}

add(resolve, reject, type) {
const reqId = generateRequestId.next().value;
this.store.set(reqId, { resolve, reject, type });

return reqId;
}

resolve(reqId, data) {
const { resolve } = this.store.get(reqId);
this.store.delete(reqId);

resolve(data);
}

reject(reqId, msg) {
const { reject } = this.store.get(reqId);
this.store.delete(reqId);

reject(new Error(msg));
}

// reject all pendeing request
flush() {
for (let [_reqId, entry] of this.store) {
const { reject, type } = entry;
reject(new Error(`[${this.name}] Discard promise "${type}", cannot resolve`));
}

this.store.clear();
}
}
25 changes: 0 additions & 25 deletions src/common/promise-store.js

This file was deleted.

Loading

0 comments on commit 7f0e7c2

Please sign in to comment.