Skip to content

Commit

Permalink
added idle timeout cache
Browse files Browse the repository at this point in the history
  • Loading branch information
SteRiccio committed Oct 3, 2023
1 parent 274a4f8 commit 54a3d3a
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 26 deletions.
71 changes: 71 additions & 0 deletions server/modules/record/service/update/thread/IdleTimeoutCache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
const defaultOptions = {
itemIdleTimeoutSeconds: 15 * 60, // 15 mins
}

/**
* Simple items cache with idle timeout.
* When the timeout for an item is reached,
* the item is automatically removed from the cache to save memory.
*/
export default class IdleTimeoutCache {
constructor(options = {}) {
const { itemIdleTimeoutSeconds } = { ...defaultOptions, ...options }

this.itemIdleTimeoutSeconds = itemIdleTimeoutSeconds
this.itemsByKey = {}
this._itemTimeoutByKey = {}
}

get(key) {
this._resetItemIdleTimeout(key)
return this.itemsByKey[key]
}

has(key) {
return !!this.get(key)
}

get keys() {
return Object.keys(this.itemsByKey)
}

get size() {
return this.keys.length
}

isEmpty() {
return this.size === 0
}

findKeys(filterFunction) {
return this.keys.filter(filterFunction)
}

set(key, item) {
this.itemsByKey[key] = item
this._resetItemIdleTimeout(key)
return this
}

delete(key) {
this._clearItemIdleTimeout(key)

delete this.itemsByKey[key]
}

_clearItemIdleTimeout(key) {
const oldTimeout = this._itemTimeoutByKey[key]
if (oldTimeout) {
clearTimeout(oldTimeout)
}
delete this._itemTimeoutByKey[key]
}

_resetItemIdleTimeout(key) {
this._clearItemIdleTimeout(key)

this._itemTimeoutByKey[key] = setTimeout(() => {
this.delete(key)
}, this.itemIdleTimeoutSeconds * 1000)
}
}
54 changes: 28 additions & 26 deletions server/modules/record/service/update/thread/recordsUpdateThread.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import Queue from '@core/queue'
import * as RecordManager from '../../../manager/recordManager'
import * as SurveyManager from '../../../../survey/manager/surveyManager'
import { RecordsUpdateThreadMessageTypes } from './recordsThreadMessageTypes'
import IdleTimeoutCache from './IdleTimeoutCache'

const Logger = Log.getLogger('RecordsUpdateThread')

Expand All @@ -22,7 +23,7 @@ class RecordsUpdateThread extends Thread {
super(paramsObj)

this.queue = new Queue()
this.surveyDataByKey = {}
this.surveysDataCache = new IdleTimeoutCache()
this.processing = false

this.messageProcessorByType = {
Expand Down Expand Up @@ -114,7 +115,7 @@ class RecordsUpdateThread extends Thread {

const key = this.getSurveyDataKey(msg)

let data = this.surveyDataByKey[key]
let data = this.surveysDataCache.get(key)
if (data) {
return data
}
Expand All @@ -135,9 +136,9 @@ class RecordsUpdateThread extends Thread {

data = {
survey,
recordsByUuid: {},
recordsCache: new IdleTimeoutCache(),
}
this.surveyDataByKey[key] = data
this.surveysDataCache.set(key, data)

return data
}
Expand All @@ -149,20 +150,20 @@ class RecordsUpdateThread extends Thread {
const messageProcessor = this.messageProcessorByType[type]
if (messageProcessor) {
messageProcessor(msg)

if ([RecordsUpdateThreadMessageTypes.nodePersist, RecordsUpdateThreadMessageTypes.nodeDelete].includes(type)) {
const recordUuid = msg.recordUuid || msg.node?.recordUuid
this.postMessage({ type: WebSocketEvent.nodesUpdateCompleted, content: { recordUuid } })
}
} else {
Logger.debug(`Skipping unknown message type: ${type}`)
}

if ([RecordsUpdateThreadMessageTypes.nodePersist, RecordsUpdateThreadMessageTypes.nodeDelete].includes(type)) {
const recordUuid = msg.recordUuid || msg.node?.recordUuid
this.postMessage({ type: WebSocketEvent.nodesUpdateCompleted, content: { recordUuid } })
}
}

async processRecordInitMsg(msg) {
const { surveyId, recordUuid, user } = msg

const { survey, recordsByUuid } = await this.getOrFetchSurveyData(msg)
const { survey, recordsCache } = await this.getOrFetchSurveyData(msg)

let record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid })

Expand All @@ -173,24 +174,24 @@ class RecordsUpdateThread extends Thread {
nodesUpdateListener: (updatedNodes) => this.handleNodesUpdated.bind(this)({ record, updatedNodes }),
nodesValidationListener: (validations) => this.handleNodesValidationUpdated.bind(this)({ record, validations }),
})
recordsByUuid[recordUuid] = record
recordsCache.set(recordUuid, record)
}

async processRecordReloadMsg(msg) {
const { surveyId, recordUuid } = msg

const { recordsByUuid } = await this.getOrFetchSurveyData(msg)
const { recordsCache } = await this.getOrFetchSurveyData(msg)

if (recordsByUuid[recordUuid]) {
if (recordsCache.has(recordUuid)) {
const record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid })
recordsByUuid[recordUuid] = record
recordsCache.set(recordUuid, record)
}
}

async processRecordNodePersistMsg(msg) {
const { node, user } = msg

const { survey, recordsByUuid } = await this.getOrFetchSurveyData(msg)
const { survey, recordsCache } = await this.getOrFetchSurveyData(msg)

const recordUuid = Node.getRecordUuid(node)
let record = await this.getOrFetchRecord({ msg, recordUuid })
Expand All @@ -203,13 +204,13 @@ class RecordsUpdateThread extends Thread {
nodesUpdateListener: (updatedNodes) => this.handleNodesUpdated({ record, updatedNodes }),
nodesValidationListener: (validations) => this.handleNodesValidationUpdated({ record, validations }),
})
recordsByUuid[recordUuid] = record
recordsCache.set(recordUuid, record)
}

async processRecordNodeDeleteMsg(msg) {
const { nodeUuid, recordUuid, user } = msg

const { survey, recordsByUuid } = await this.getOrFetchSurveyData(msg)
const { survey, recordsCache } = await this.getOrFetchSurveyData(msg)

let record = await this.getOrFetchRecord({ msg, recordUuid })
record = await RecordManager.deleteNode(
Expand All @@ -220,16 +221,16 @@ class RecordsUpdateThread extends Thread {
(updatedNodes) => this.handleNodesUpdated({ record, updatedNodes }),
(validations) => this.handleNodesValidationUpdated({ record, validations })
)
recordsByUuid[recordUuid] = record
recordsCache.set(recordUuid, record)
}

async processRecordClearMsg(msg) {
const { recordUuid } = msg

const { recordsByUuid } = await this.getOrFetchSurveyData(msg)
delete recordsByUuid[recordUuid]
const { recordsCache } = await this.getOrFetchSurveyData(msg)
recordsCache.delete(recordUuid)

if (Object.keys(recordsByUuid).length === 0) {
if (recordsCache.isEmpty()) {
await this.processSurveyClearMsg(msg)
}
}
Expand All @@ -243,22 +244,23 @@ class RecordsUpdateThread extends Thread {
const key = this.getSurveyDataKey(msg)
keysToDelete.push(key)
} else {
keysToDelete.push(...Object.keys(this.surveyDataByKey).filter((key) => key.startsWith(`${surveyId}_`)))
keysToDelete.push(...this.surveysDataCache.findKeys((key) => key.startsWith(`${surveyId}_`)))
}
keysToDelete.forEach((key) => {
delete this.surveyDataByKey[key]
this.surveysDataCache.delete(key)
})
}

async getOrFetchRecord({ msg, recordUuid }) {
const { surveyId } = msg

const { recordsByUuid } = await this.getOrFetchSurveyData(msg)
const { recordsCache } = await this.getOrFetchSurveyData(msg)

let record = recordsCache.get(recordUuid)

let record = recordsByUuid[recordUuid]
if (!record) {
record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid })
recordsByUuid[recordUuid] = record
recordsCache.set(recordUuid, record)
}
return record
}
Expand Down

0 comments on commit 54a3d3a

Please sign in to comment.