From 54a3d3adb0888f5b947aef6b8d534baad6c5063d Mon Sep 17 00:00:00 2001 From: Stefano Ricci Date: Tue, 3 Oct 2023 10:55:26 +0200 Subject: [PATCH] added idle timeout cache --- .../service/update/thread/IdleTimeoutCache.js | 71 +++++++++++++++++++ .../update/thread/recordsUpdateThread.js | 54 +++++++------- 2 files changed, 99 insertions(+), 26 deletions(-) create mode 100644 server/modules/record/service/update/thread/IdleTimeoutCache.js diff --git a/server/modules/record/service/update/thread/IdleTimeoutCache.js b/server/modules/record/service/update/thread/IdleTimeoutCache.js new file mode 100644 index 0000000000..14dd98d2cb --- /dev/null +++ b/server/modules/record/service/update/thread/IdleTimeoutCache.js @@ -0,0 +1,71 @@ +const defaultOptions = { + itemIdleTimeoutSeconds: 15 * 60, // 15 mins +} + +/** + * Simple items cache with idle timeout. + * When the timeout for an item is reached, + * the item is automatically removed from the cache to save memory. + */ +export default class IdleTimeoutCache { + constructor(options = {}) { + const { itemIdleTimeoutSeconds } = { ...defaultOptions, ...options } + + this.itemIdleTimeoutSeconds = itemIdleTimeoutSeconds + this.itemsByKey = {} + this._itemTimeoutByKey = {} + } + + get(key) { + this._resetItemIdleTimeout(key) + return this.itemsByKey[key] + } + + has(key) { + return !!this.get(key) + } + + get keys() { + return Object.keys(this.itemsByKey) + } + + get size() { + return this.keys.length + } + + isEmpty() { + return this.size === 0 + } + + findKeys(filterFunction) { + return this.keys.filter(filterFunction) + } + + set(key, item) { + this.itemsByKey[key] = item + this._resetItemIdleTimeout(key) + return this + } + + delete(key) { + this._clearItemIdleTimeout(key) + + delete this.itemsByKey[key] + } + + _clearItemIdleTimeout(key) { + const oldTimeout = this._itemTimeoutByKey[key] + if (oldTimeout) { + clearTimeout(oldTimeout) + } + delete this._itemTimeoutByKey[key] + } + + _resetItemIdleTimeout(key) { + this._clearItemIdleTimeout(key) + + this._itemTimeoutByKey[key] = setTimeout(() => { + this.delete(key) + }, this.itemIdleTimeoutSeconds * 1000) + } +} diff --git a/server/modules/record/service/update/thread/recordsUpdateThread.js b/server/modules/record/service/update/thread/recordsUpdateThread.js index 4a58a7f12d..ca074dd202 100644 --- a/server/modules/record/service/update/thread/recordsUpdateThread.js +++ b/server/modules/record/service/update/thread/recordsUpdateThread.js @@ -14,6 +14,7 @@ import Queue from '@core/queue' import * as RecordManager from '../../../manager/recordManager' import * as SurveyManager from '../../../../survey/manager/surveyManager' import { RecordsUpdateThreadMessageTypes } from './recordsThreadMessageTypes' +import IdleTimeoutCache from './IdleTimeoutCache' const Logger = Log.getLogger('RecordsUpdateThread') @@ -22,7 +23,7 @@ class RecordsUpdateThread extends Thread { super(paramsObj) this.queue = new Queue() - this.surveyDataByKey = {} + this.surveysDataCache = new IdleTimeoutCache() this.processing = false this.messageProcessorByType = { @@ -114,7 +115,7 @@ class RecordsUpdateThread extends Thread { const key = this.getSurveyDataKey(msg) - let data = this.surveyDataByKey[key] + let data = this.surveysDataCache.get(key) if (data) { return data } @@ -135,9 +136,9 @@ class RecordsUpdateThread extends Thread { data = { survey, - recordsByUuid: {}, + recordsCache: new IdleTimeoutCache(), } - this.surveyDataByKey[key] = data + this.surveysDataCache.set(key, data) return data } @@ -149,20 +150,20 @@ class RecordsUpdateThread extends Thread { const messageProcessor = this.messageProcessorByType[type] if (messageProcessor) { messageProcessor(msg) + + if ([RecordsUpdateThreadMessageTypes.nodePersist, RecordsUpdateThreadMessageTypes.nodeDelete].includes(type)) { + const recordUuid = msg.recordUuid || msg.node?.recordUuid + this.postMessage({ type: WebSocketEvent.nodesUpdateCompleted, content: { recordUuid } }) + } } else { Logger.debug(`Skipping unknown message type: ${type}`) } - - if ([RecordsUpdateThreadMessageTypes.nodePersist, RecordsUpdateThreadMessageTypes.nodeDelete].includes(type)) { - const recordUuid = msg.recordUuid || msg.node?.recordUuid - this.postMessage({ type: WebSocketEvent.nodesUpdateCompleted, content: { recordUuid } }) - } } async processRecordInitMsg(msg) { const { surveyId, recordUuid, user } = msg - const { survey, recordsByUuid } = await this.getOrFetchSurveyData(msg) + const { survey, recordsCache } = await this.getOrFetchSurveyData(msg) let record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid }) @@ -173,24 +174,24 @@ class RecordsUpdateThread extends Thread { nodesUpdateListener: (updatedNodes) => this.handleNodesUpdated.bind(this)({ record, updatedNodes }), nodesValidationListener: (validations) => this.handleNodesValidationUpdated.bind(this)({ record, validations }), }) - recordsByUuid[recordUuid] = record + recordsCache.set(recordUuid, record) } async processRecordReloadMsg(msg) { const { surveyId, recordUuid } = msg - const { recordsByUuid } = await this.getOrFetchSurveyData(msg) + const { recordsCache } = await this.getOrFetchSurveyData(msg) - if (recordsByUuid[recordUuid]) { + if (recordsCache.has(recordUuid)) { const record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid }) - recordsByUuid[recordUuid] = record + recordsCache.set(recordUuid, record) } } async processRecordNodePersistMsg(msg) { const { node, user } = msg - const { survey, recordsByUuid } = await this.getOrFetchSurveyData(msg) + const { survey, recordsCache } = await this.getOrFetchSurveyData(msg) const recordUuid = Node.getRecordUuid(node) let record = await this.getOrFetchRecord({ msg, recordUuid }) @@ -203,13 +204,13 @@ class RecordsUpdateThread extends Thread { nodesUpdateListener: (updatedNodes) => this.handleNodesUpdated({ record, updatedNodes }), nodesValidationListener: (validations) => this.handleNodesValidationUpdated({ record, validations }), }) - recordsByUuid[recordUuid] = record + recordsCache.set(recordUuid, record) } async processRecordNodeDeleteMsg(msg) { const { nodeUuid, recordUuid, user } = msg - const { survey, recordsByUuid } = await this.getOrFetchSurveyData(msg) + const { survey, recordsCache } = await this.getOrFetchSurveyData(msg) let record = await this.getOrFetchRecord({ msg, recordUuid }) record = await RecordManager.deleteNode( @@ -220,16 +221,16 @@ class RecordsUpdateThread extends Thread { (updatedNodes) => this.handleNodesUpdated({ record, updatedNodes }), (validations) => this.handleNodesValidationUpdated({ record, validations }) ) - recordsByUuid[recordUuid] = record + recordsCache.set(recordUuid, record) } async processRecordClearMsg(msg) { const { recordUuid } = msg - const { recordsByUuid } = await this.getOrFetchSurveyData(msg) - delete recordsByUuid[recordUuid] + const { recordsCache } = await this.getOrFetchSurveyData(msg) + recordsCache.delete(recordUuid) - if (Object.keys(recordsByUuid).length === 0) { + if (recordsCache.isEmpty()) { await this.processSurveyClearMsg(msg) } } @@ -243,22 +244,23 @@ class RecordsUpdateThread extends Thread { const key = this.getSurveyDataKey(msg) keysToDelete.push(key) } else { - keysToDelete.push(...Object.keys(this.surveyDataByKey).filter((key) => key.startsWith(`${surveyId}_`))) + keysToDelete.push(...this.surveysDataCache.findKeys((key) => key.startsWith(`${surveyId}_`))) } keysToDelete.forEach((key) => { - delete this.surveyDataByKey[key] + this.surveysDataCache.delete(key) }) } async getOrFetchRecord({ msg, recordUuid }) { const { surveyId } = msg - const { recordsByUuid } = await this.getOrFetchSurveyData(msg) + const { recordsCache } = await this.getOrFetchSurveyData(msg) + + let record = recordsCache.get(recordUuid) - let record = recordsByUuid[recordUuid] if (!record) { record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid }) - recordsByUuid[recordUuid] = record + recordsCache.set(recordUuid, record) } return record }