From c91ae445367338228cd4f913b199f752b2c22845 Mon Sep 17 00:00:00 2001 From: Stefano Ricci Date: Mon, 2 Oct 2023 21:54:15 +0200 Subject: [PATCH] records update thread: use single thread --- .../modules/record/service/recordService.js | 32 +++-- .../service/update/surveyRecordsThreadMap.js | 4 +- .../update/surveyRecordsThreadService.js | 41 +++--- .../thread/recordsThreadMessageTypes.js | 2 + .../update/thread/recordsUpdateThread.js | 135 ++++++++++++------ .../modules/survey/service/surveyService.js | 6 +- 6 files changed, 141 insertions(+), 79 deletions(-) diff --git a/server/modules/record/service/recordService.js b/server/modules/record/service/recordService.js index fdcd00b540..e4ee69862f 100644 --- a/server/modules/record/service/recordService.js +++ b/server/modules/record/service/recordService.js @@ -160,10 +160,10 @@ export const checkIn = async ({ socketId, user, surveyId, recordUuid, draft }) = if (preview || (Survey.isPublished(surveyInfo) && Authorizer.canEditRecord(user, record))) { // Create record thread - const thread = RecordsUpdateThreadService.getOrCreatedThread({ surveyId, cycle, draft: preview }) + const thread = RecordsUpdateThreadService.getOrCreatedThread() // initialize record if empty if (Record.getNodesArray(record).length === 0) { - thread.postMessage({ type: RecordsUpdateThreadMessageTypes.recordInit, user, surveyId, recordUuid }) + thread.postMessage({ type: RecordsUpdateThreadMessageTypes.recordInit, user, surveyId, cycle, draft, recordUuid }) } } return record @@ -177,12 +177,22 @@ export const checkOut = async (socketId, user, surveyId, recordUuid) => { includePreview: true, }) if (Record.isPreview(recordSummary)) { - RecordsUpdateThreadService.killThread({ surveyId, cycle: Record.getCycle(recordSummary), draft: true }) + RecordsUpdateThreadService.clearSurveyDataFromThread({ + surveyId, + cycle: Record.getCycle(recordSummary), + draft: true, + }) await RecordManager.deleteRecordPreview(surveyId, recordUuid) } else { const record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid, fetchForUpdate: false }) if (Record.isEmpty(record)) { await deleteRecord({ socketId, user, surveyId, recordUuid, notifySameUser: true }) + RecordsUpdateThreadService.clearRecordDataFromThread({ + surveyId, + cycle: Record.getCycle(recordSummary), + draft: false, + recordUuid, + }) } } RecordsUpdateThreadService.dissocSocket({ recordUuid, socketId }) @@ -295,10 +305,10 @@ export const startRecordsCloneJob = ({ user, surveyId, cycleFrom, cycleTo, recor } // NODE -const _sendNodeUpdateMessage = ({ socketId, user, surveyId, cycle, recordUuid, draft, msg }) => { +const _sendNodeUpdateMessage = ({ socketId, user, recordUuid, msg }) => { RecordsUpdateThreadService.assocSocket({ recordUuid, socketId }) - const thread = RecordsUpdateThreadService.getOrCreatedThread({ surveyId, cycle, draft }) + const thread = RecordsUpdateThreadService.getOrCreatedThread() thread.postMessage(msg, user) } @@ -323,12 +333,12 @@ export const persistNode = async ({ socketId, user, surveyId, draft, cycle, node _sendNodeUpdateMessage({ socketId, user, - surveyId, - cycle, - draft, recordUuid, msg: { type: RecordsUpdateThreadMessageTypes.nodePersist, + surveyId, + cycle, + draft, node, user, }, @@ -339,12 +349,12 @@ export const deleteNode = ({ socketId, user, surveyId, cycle, draft, recordUuid, _sendNodeUpdateMessage({ socketId, user, - surveyId, - cycle, - draft, recordUuid, msg: { type: RecordsUpdateThreadMessageTypes.nodeDelete, + surveyId, + cycle, + draft, recordUuid, nodeUuid, user, diff --git a/server/modules/record/service/update/surveyRecordsThreadMap.js b/server/modules/record/service/update/surveyRecordsThreadMap.js index d978b0d997..159b8020b7 100644 --- a/server/modules/record/service/update/surveyRecordsThreadMap.js +++ b/server/modules/record/service/update/surveyRecordsThreadMap.js @@ -4,9 +4,8 @@ const threads = new ThreadsCache() const threadZombies = new Set() // Set of threads marked to be killed // thread cache -const getKey = ({ surveyId, cycle, draft }) => `${surveyId}_${cycle}_${draft}` +const getKey = () => `survey_records_thread` const get = (threadKey) => threads.getThread(threadKey) -const getThreadsKeysBySurveyId = ({ surveyId }) => threads.findThreadsKeys((key) => key.startsWith(`${surveyId}_`)) const put = (threadKey, thread) => threads.putThread(threadKey, thread) const remove = (threadKey) => { @@ -24,7 +23,6 @@ const isZombie = (threadKey) => threadZombies.has(threadKey) export const SurveyRecordsThreadMap = { getKey, get, - getThreadsKeysBySurveyId, put, remove, markZombie, diff --git a/server/modules/record/service/update/surveyRecordsThreadService.js b/server/modules/record/service/update/surveyRecordsThreadService.js index 08b97f1232..2941ceb07b 100644 --- a/server/modules/record/service/update/surveyRecordsThreadService.js +++ b/server/modules/record/service/update/surveyRecordsThreadService.js @@ -1,7 +1,6 @@ import { WebSocketEvent, WebSocketServer } from '@openforis/arena-server' import ThreadManager from '@server/threads/threadManager' -import * as ThreadParams from '@server/threads/threadParams' import { RecordsUpdateThreadMessageTypes } from './thread/recordsThreadMessageTypes' import { SurveyRecordsThreadMap } from './surveyRecordsThreadMap' @@ -18,13 +17,9 @@ const threadTimeouts = {} // ====== // ====== CREATE -const _createThread = ({ surveyId, cycle, draft }) => { - const threadData = { - [ThreadParams.keys.surveyId]: surveyId, - [ThreadParams.keys.draft]: draft, - [ThreadParams.keys.cycle]: cycle, - } - const threadKey = getThreadKey({ surveyId, cycle, draft }) +const _createThread = () => { + const threadData = {} + const threadKey = getThreadKey() const handleMessageFromThread = (msg) => { const { type, content } = msg @@ -61,14 +56,19 @@ const _killThreadByKey = (threadKey) => { } } -const killThread = ({ surveyId, cycle, draft }) => { - const threadKey = getThreadKey({ surveyId, cycle, draft }) +const killThread = () => { + const threadKey = getThreadKey() _killThreadByKey(threadKey) } -const killSurveyThreads = ({ surveyId }) => { - const threadKeys = SurveyRecordsThreadMap.getThreadsKeysBySurveyId({ surveyId }) - threadKeys.forEach(_killThreadByKey) +const clearSurveyDataFromThread = ({ surveyId, cycle = null, draft = false }) => { + const thread = getThread() + thread?.postMessage({ type: RecordsUpdateThreadMessageTypes.surveyClear, surveyId, cycle, draft }) +} + +const clearRecordDataFromThread = ({ surveyId, cycle, draft, recordUuid }) => { + const thread = getThread() + thread?.postMessage({ type: RecordsUpdateThreadMessageTypes.recordClear, surveyId, cycle, draft, recordUuid }) } // ====== READ @@ -80,8 +80,8 @@ const _resetThreadInactivityTimeout = (threadKey) => { threadTimeouts[threadKey] = setTimeout(_killThreadByKey.bind(null, threadKey), inactivityPeriod) } -const getThread = ({ surveyId, cycle, draft = false }) => { - const threadKey = getThreadKey({ surveyId, cycle, draft }) +const getThread = () => { + const threadKey = getThreadKey() if (SurveyRecordsThreadMap.isZombie(threadKey)) { SurveyRecordsThreadMap.reviveZombie(threadKey) } @@ -92,11 +92,11 @@ const getThread = ({ surveyId, cycle, draft = false }) => { return thread } -const getOrCreatedThread = ({ surveyId, cycle, draft = false }) => { - let thread = getThread({ surveyId, cycle, draft }) +const getOrCreatedThread = () => { + let thread = getThread() if (!thread) { - thread = _createThread({ surveyId, cycle, draft }) - const threadKey = getThreadKey({ surveyId, cycle, draft }) + thread = _createThread() + const threadKey = getThreadKey() _resetThreadInactivityTimeout(threadKey) } return thread @@ -134,7 +134,8 @@ export const RecordsUpdateThreadService = { getOrCreatedThread, getThread, killThread, - killSurveyThreads, + clearSurveyDataFromThread, + clearRecordDataFromThread, // sockets assocSocket, notifyRecordDeleteToSockets, diff --git a/server/modules/record/service/update/thread/recordsThreadMessageTypes.js b/server/modules/record/service/update/thread/recordsThreadMessageTypes.js index b2f89bd5ef..ca6af7f04a 100644 --- a/server/modules/record/service/update/thread/recordsThreadMessageTypes.js +++ b/server/modules/record/service/update/thread/recordsThreadMessageTypes.js @@ -6,4 +6,6 @@ export const RecordsUpdateThreadMessageTypes = { recordReload: 'recordReload', nodePersist: 'nodePersist', nodeDelete: 'nodeDelete', + recordClear: 'recordClear', + surveyClear: 'surveyClear', } diff --git a/server/modules/record/service/update/thread/recordsUpdateThread.js b/server/modules/record/service/update/thread/recordsUpdateThread.js index a7f7702e3d..3f930824e0 100644 --- a/server/modules/record/service/update/thread/recordsUpdateThread.js +++ b/server/modules/record/service/update/thread/recordsUpdateThread.js @@ -22,9 +22,19 @@ class RecordsUpdateThread extends Thread { super(paramsObj) this.queue = new Queue() - this.survey = null - this.record = null + this.surveyDataByKey = {} this.processing = false + + this.messageProcessorByType = { + [RecordsUpdateThreadMessageTypes.threadInit]: this.init.bind(this), + [RecordsUpdateThreadMessageTypes.recordInit]: this.processRecordInitMsg.bind(this), + [RecordsUpdateThreadMessageTypes.recordReload]: this.processRecordReloadMsg.bind(this), + [RecordsUpdateThreadMessageTypes.nodePersist]: this.processRecordNodePersistMsg.bind(this), + [RecordsUpdateThreadMessageTypes.nodeDelete]: this.processRecordNodeDeleteMsg.bind(this), + [RecordsUpdateThreadMessageTypes.recordClear]: this.processRecordClearMsg.bind(this), + [RecordsUpdateThreadMessageTypes.surveyClear]: this.processSurveyClearMsg.bind(this), + [RecordsUpdateThreadMessageTypes.threadKill]: this.postMessage.bind(this), + } } sendThreadInitMsg() { @@ -92,8 +102,22 @@ class RecordsUpdateThread extends Thread { } } - async init() { - const { surveyId, cycle, draft } = this.params + getSurveyDataKey(msg) { + const { surveyId, cycle, draft } = msg + return `${surveyId}_${cycle}_${draft}` + } + + async init() {} + + async getOrFetchSurveyData(msg) { + const { surveyId, cycle, draft } = msg + + const key = this.getSurveyDataKey({ surveyId, cycle, draft }) + + let data = this.surveyDataByKey[key] + if (data) { + return data + } const surveyDb = await SurveyManager.fetchSurveyAndNodeDefsAndRefDataBySurveyId({ surveyId, @@ -107,36 +131,26 @@ class RecordsUpdateThread extends Thread { ? Survey.buildDependencyGraph(surveyDb) : await SurveyManager.fetchDependencies(surveyId) - this.survey = Survey.assocDependencyGraph(dependencyGraph)(surveyDb) + const survey = Survey.assocDependencyGraph(dependencyGraph)(surveyDb) - this.recordsByUuid = {} + data = { + survey, + recordsByUuid: {}, + } + this.surveyDataByKey[key] = data + + return data } async processMessage(msg) { const { type } = msg Logger.debug('processing message', type) - switch (type) { - case RecordsUpdateThreadMessageTypes.threadInit: - await this.init() - break - case RecordsUpdateThreadMessageTypes.recordInit: - await this.processRecordInitMsg(msg) - break - case RecordsUpdateThreadMessageTypes.recordReload: - await this.processRecordReloadMsg(msg) - break - case RecordsUpdateThreadMessageTypes.nodePersist: - await this.processRecordNodePersistMsg(msg) - break - case RecordsUpdateThreadMessageTypes.nodeDelete: - await this.processRecordNodeDeleteMsg(msg) - break - case RecordsUpdateThreadMessageTypes.threadKill: - this.postMessage(msg) - break - default: - Logger.debug(`Skipping unknown message type: ${type}`) + const messageProcessor = this.messageProcessorByType[type] + if (messageProcessor) { + messageProcessor(msg) + } else { + Logger.debug(`Skipping unknown message type: ${type}`) } if ([RecordsUpdateThreadMessageTypes.nodePersist, RecordsUpdateThreadMessageTypes.nodeDelete].includes(type)) { @@ -146,8 +160,9 @@ class RecordsUpdateThread extends Thread { } async processRecordInitMsg(msg) { - const { survey, surveyId } = this - const { recordUuid, user } = msg + const { surveyId, recordUuid, user } = msg + + const { survey, recordsByUuid } = await this.getOrFetchSurveyData(msg) let record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid }) @@ -158,24 +173,28 @@ class RecordsUpdateThread extends Thread { nodesUpdateListener: (updatedNodes) => this.handleNodesUpdated.bind(this)({ record, updatedNodes }), nodesValidationListener: (validations) => this.handleNodesValidationUpdated.bind(this)({ record, validations }), }) - this.recordsByUuid[recordUuid] = record + recordsByUuid[recordUuid] = record } async processRecordReloadMsg(msg) { - const { surveyId } = this - const { recordUuid } = msg + const { surveyId, recordUuid } = msg - if (this.recordsByUuid[recordUuid]) { + const { recordsByUuid } = await this.getOrFetchSurveyData(msg) + + if (recordsByUuid[recordUuid]) { const record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid }) - this.recordsByUuid[recordUuid] = record + recordsByUuid[recordUuid] = record } } async processRecordNodePersistMsg(msg) { - const { survey } = this const { node, user } = msg + + const { survey, recordsByUuid } = await this.getOrFetchSurveyData(msg) + const recordUuid = Node.getRecordUuid(node) - let record = await this.getOrFetchRecord({ recordUuid }) + let record = await this.getOrFetchRecord({ msg, recordUuid }) + record = await RecordManager.persistNode({ user, survey, @@ -184,14 +203,16 @@ class RecordsUpdateThread extends Thread { nodesUpdateListener: (updatedNodes) => this.handleNodesUpdated({ record, updatedNodes }), nodesValidationListener: (validations) => this.handleNodesValidationUpdated({ record, validations }), }) - this.recordsByUuid[recordUuid] = record + recordsByUuid[recordUuid] = record } async processRecordNodeDeleteMsg(msg) { - const { survey } = this const { nodeUuid, recordUuid, user } = msg - let record = await this.getOrFetchRecord({ recordUuid }) + const surveyKey = this.getSurveyDataKey(msg) + const { survey, recordsByUuid } = await this.getOrFetchSurveyData(msg) + + let record = await this.getOrFetchRecord({ surveyKey, recordUuid }) record = await RecordManager.deleteNode( user, survey, @@ -200,11 +221,41 @@ class RecordsUpdateThread extends Thread { (updatedNodes) => this.handleNodesUpdated({ record, updatedNodes }), (validations) => this.handleNodesValidationUpdated({ record, validations }) ) - this.recordsByUuid[recordUuid] = record + recordsByUuid[recordUuid] = record + } + + async processRecordClearMsg(msg) { + const { recordUuid } = msg + + const { recordsByUuid } = await this.getOrFetchSurveyData(msg) + delete recordsByUuid[recordUuid] + + if (Object.keys(recordsByUuid).length === 0) { + await this.processSurveyClearMsg(msg) + } } - async getOrFetchRecord({ recordUuid }) { - const { surveyId, recordsByUuid } = this + async processSurveyClearMsg(msg) { + const { surveyId, cycle, draft } = msg + + let keysToDelete = [] + + if (!Objects.isNil(cycle) && !Objects.isNil(draft)) { + const key = this.getSurveyDataKey(msg) + keysToDelete.push(key) + } else { + keysToDelete.push(...Object.keys(this.surveyDataByKey).filter((key) => key.startsWith(`${surveyId}_`))) + } + keysToDelete.forEach((key) => { + delete this.surveyDataByKey[key] + }) + } + + async getOrFetchRecord({ msg, recordUuid }) { + const { surveyId } = msg + + const { recordsByUuid } = await this.getOrFetchSurveyData(msg) + let record = recordsByUuid[recordUuid] if (!record) { record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid }) diff --git a/server/modules/survey/service/surveyService.js b/server/modules/survey/service/surveyService.js index 2e27db2cfb..5b812f549f 100644 --- a/server/modules/survey/service/surveyService.js +++ b/server/modules/survey/service/surveyService.js @@ -13,7 +13,7 @@ import { SchemaSummary } from './schemaSummary' // JOBS export const startPublishJob = (user, surveyId) => { - RecordsUpdateThreadService.killSurveyThreads({ surveyId }) + RecordsUpdateThreadService.clearSurveyDataFromThread({ surveyId }) const job = new SurveyPublishJob({ user, surveyId }) @@ -23,7 +23,7 @@ export const startPublishJob = (user, surveyId) => { } export const startUnpublishJob = (user, surveyId) => { - RecordsUpdateThreadService.killSurveyThreads({ surveyId }) + RecordsUpdateThreadService.clearSurveyDataFromThread({ surveyId }) const job = new SurveyUnpublishJob({ user, surveyId }) @@ -77,7 +77,7 @@ export const exportSchemaSummary = async ({ surveyId, cycle, outputStream }) => SchemaSummary.exportSchemaSummary({ surveyId, cycle, outputStream }) export const deleteSurvey = async (surveyId) => { - RecordsUpdateThreadService.killSurveyThreads({ surveyId }) + RecordsUpdateThreadService.clearSurveyDataFromThread({ surveyId }) await SurveyManager.deleteSurvey(surveyId) }