diff --git a/package.json b/package.json index 5d5eae1492..7b148a0921 100644 --- a/package.json +++ b/package.json @@ -105,8 +105,8 @@ "@mui/material": "^5.12.3", "@mui/x-data-grid": "^5.17.20", "@mui/x-date-pickers": "^5.0.14", - "@openforis/arena-core": "^0.0.142", - "@openforis/arena-server": "^0.1.24", + "@openforis/arena-core": "^0.0.143", + "@openforis/arena-server": "^0.1.25", "@sendgrid/mail": "^7.7.0", "@shopify/draggable": "^1.0.0-beta.8", "ace-builds": "^1.19.0", diff --git a/server/modules/auth/api/authApi.js b/server/modules/auth/api/authApi.js index dadd71906e..7e0e1f8e04 100644 --- a/server/modules/auth/api/authApi.js +++ b/server/modules/auth/api/authApi.js @@ -51,7 +51,7 @@ export const init = (app) => { try { // Before logout checkOut record if there's an opened thread const socketId = Request.getSocketId(req) - RecordService.dissocSocketFromRecordThread(socketId) + RecordService.dissocSocketFromUpdateThread(socketId) req.logout((err) => { if (err) { diff --git a/server/modules/record/api/recordApi.js b/server/modules/record/api/recordApi.js index 08bc5d436b..4732322561 100644 --- a/server/modules/record/api/recordApi.js +++ b/server/modules/record/api/recordApi.js @@ -28,14 +28,14 @@ export const init = (app) => { try { const user = Request.getUser(req) const { surveyId } = Request.getParams(req) - const record = Request.getBody(req) + const recordToCreate = Request.getBody(req) const socketId = Request.getSocketId(req) - if (Record.getOwnerUuid(record) !== User.getUuid(user)) { + if (Record.getOwnerUuid(recordToCreate) !== User.getUuid(user)) { throw new Error('Error record create. User is different') } - await RecordService.createRecord(socketId, user, surveyId, record) + await RecordService.createRecord({ socketId, user, surveyId, recordToCreate }) sendOk(res) } catch (error) { @@ -59,12 +59,12 @@ export const init = (app) => { app.post('/survey/:surveyId/record/:recordUuid/node', requireRecordEditPermission, async (req, res, next) => { try { const user = Request.getUser(req) - const { surveyId } = Request.getParams(req) + const { surveyId, cycle, draft } = Request.getParams(req) const node = Request.getJsonParam(req, 'node') const file = Request.getFile(req) const socketId = Request.getSocketId(req) - await RecordService.persistNode(socketId, user, surveyId, node, file) + await RecordService.persistNode({ socketId, user, surveyId, cycle, draft, node, file }) sendOk(res) } catch (error) { @@ -329,7 +329,7 @@ export const init = (app) => { const user = Request.getUser(req) const socketId = Request.getSocketId(req) - const record = await RecordService.checkIn(socketId, user, surveyId, recordUuid, draft) + const record = await RecordService.checkIn({ socketId, user, surveyId, recordUuid, draft }) res.json({ record }) } catch (error) { @@ -394,11 +394,11 @@ export const init = (app) => { }) app.delete('/survey/:surveyId/record/:recordUuid/node/:nodeUuid', requireRecordEditPermission, (req, res) => { - const { surveyId, recordUuid, nodeUuid } = Request.getParams(req) + const { surveyId, cycle, draft, recordUuid, nodeUuid } = Request.getParams(req) const user = Request.getUser(req) const socketId = Request.getSocketId(req) - RecordService.deleteNode(socketId, user, surveyId, recordUuid, nodeUuid) + RecordService.deleteNode({ socketId, user, surveyId, cycle, draft, recordUuid, nodeUuid }) sendOk(res) }) } diff --git a/server/modules/record/manager/recordManager.js b/server/modules/record/manager/recordManager.js index 91cdeb9c86..ca3d62e28d 100644 --- a/server/modules/record/manager/recordManager.js +++ b/server/modules/record/manager/recordManager.js @@ -29,22 +29,51 @@ export const { persistNodesToRDB } = NodeRdbManager // ==== READ export const fetchRecordsSummaryBySurveyId = async ( - { surveyId, cycle, offset, limit, sortBy, sortOrder, search, step = null, recordUuid = null }, + { + surveyId, + cycle, + offset, + limit, + sortBy, + sortOrder, + search, + step = null, + recordUuid = null, + includeRootKeyValues = true, + includePreview = false, + }, client = db ) => { const surveyInfo = await SurveyRepository.fetchSurveyById({ surveyId, draft: true }, client) const nodeDefsDraft = Survey.isFromCollect(surveyInfo) && !Survey.isPublished(surveyInfo) - const nodeDefRoot = await NodeDefRepository.fetchRootNodeDef(surveyId, nodeDefsDraft, client) - const nodeDefKeys = await NodeDefRepository.fetchRootNodeDefKeysBySurveyId( - surveyId, - NodeDef.getUuid(nodeDefRoot), - nodeDefsDraft, - client - ) + const nodeDefRoot = includeRootKeyValues + ? await NodeDefRepository.fetchRootNodeDef(surveyId, nodeDefsDraft, client) + : null + const nodeDefKeys = includeRootKeyValues + ? await NodeDefRepository.fetchRootNodeDefKeysBySurveyId( + surveyId, + NodeDef.getUuid(nodeDefRoot), + nodeDefsDraft, + client + ) + : null const list = await RecordRepository.fetchRecordsSummaryBySurveyId( - { surveyId, cycle, nodeDefRoot, nodeDefKeys, offset, limit, sortBy, sortOrder, search, step, recordUuid }, + { + surveyId, + cycle, + nodeDefRoot, + nodeDefKeys, + offset, + limit, + sortBy, + sortOrder, + search, + step, + recordUuid, + includePreview, + }, client ) @@ -54,8 +83,14 @@ export const fetchRecordsSummaryBySurveyId = async ( } } -export const fetchRecordSummary = async ({ surveyId, recordUuid }, client = db) => { - const { list } = await fetchRecordsSummaryBySurveyId({ surveyId, recordUuid }, client) +export const fetchRecordSummary = async ( + { surveyId, recordUuid, includeRootKeyValues = true, includePreview = false }, + client = db +) => { + const { list } = await fetchRecordsSummaryBySurveyId( + { surveyId, recordUuid, includeRootKeyValues, includePreview }, + client + ) return list[0] } diff --git a/server/modules/record/repository/recordRepository.js b/server/modules/record/repository/recordRepository.js index 16a78e2111..9c87ab0347 100644 --- a/server/modules/record/repository/recordRepository.js +++ b/server/modules/record/repository/recordRepository.js @@ -130,8 +130,8 @@ export const countRecordsBySurveyIdGroupedByStep = async ({ surveyId, cycle }, c export const fetchRecordsSummaryBySurveyId = async ( { surveyId, - nodeDefRoot, - nodeDefKeys, + nodeDefRoot = null, + nodeDefKeys = null, cycle = null, step = null, offset = 0, @@ -140,25 +140,26 @@ export const fetchRecordsSummaryBySurveyId = async ( sortOrder = 'DESC', search = false, recordUuid = null, + includePreview = false, }, client = db ) => { const rootEntityTableAlias = 'n0' const getNodeDefKeyColumnName = NodeDefTable.getColumnName const getNodeDefKeyColAlias = NodeDef.getName - const nodeDefKeysColumnNamesByAlias = nodeDefKeys.reduce( + const nodeDefKeysColumnNamesByAlias = nodeDefKeys?.reduce( (acc, key) => ({ ...acc, [getNodeDefKeyColAlias(key)]: getNodeDefKeyColumnName(key) }), {} ) const nodeDefKeysSelect = nodeDefKeys - .map( + ?.map( (nodeDefKey) => `${rootEntityTableAlias}.${getNodeDefKeyColumnName(nodeDefKey)} as "${getNodeDefKeyColAlias(nodeDefKey)}"` ) .join(', ') const nodeDefKeysSelectSearch = nodeDefKeys - .map( + ?.map( (nodeDefKey) => ` (${rootEntityTableAlias}.${getNodeDefKeyColumnName(nodeDefKey)})::text ilike '%$/search:value/%'` ) @@ -172,11 +173,19 @@ export const fetchRecordsSummaryBySurveyId = async ( FROM ${schema}.node GROUP BY record_uuid ` + const recordsSelectWhereConditions = [] + if (!includePreview) recordsSelectWhereConditions.push('r.preview = FALSE') + if (!A.isNull(cycle)) recordsSelectWhereConditions.push('r.cycle = $/cycle/') + if (!A.isNull(step)) recordsSelectWhereConditions.push('r.step = $/step/') + if (!A.isNull(recordUuid)) recordsSelectWhereConditions.push('r.uuid = $/recordUuid/') + const recordsSelect = ` SELECT r.uuid, r.owner_uuid, + r.cycle, r.step, + r.preview, ${DbUtils.selectDate('r.date_created', 'date_created')}, r.validation, node_last_modified.date_modified @@ -185,11 +194,7 @@ export const fetchRecordsSummaryBySurveyId = async ( LEFT OUTER JOIN node_last_modified ON r.uuid = node_last_modified.record_uuid - WHERE - r.preview = FALSE - ${A.isNull(cycle) ? '' : 'AND r.cycle = $/cycle/'} - ${A.isNull(step) ? '' : 'AND r.step = $/step/'} - ${A.isNull(recordUuid) ? '' : 'AND r.uuid = $/recordUuid/'} + ${recordsSelectWhereConditions.length > 0 ? `WHERE ${recordsSelectWhereConditions.join(' AND ')}` : ''} ORDER BY r.date_created DESC ` @@ -214,15 +219,19 @@ export const fetchRecordsSummaryBySurveyId = async ( -- GET OWNER NAME JOIN "user" u ON r.owner_uuid = u.uuid - -- join with root entity table to get node key values - LEFT OUTER JOIN - ${SchemaRdb.getName(surveyId)}.${NodeDefTable.getViewName(nodeDefRoot)} as ${rootEntityTableAlias} - ON r.uuid = ${rootEntityTableAlias}.record_uuid + ${ + nodeDefRoot && nodeDefKeys?.length > 0 + ? `-- join with root entity table to get node key values + LEFT OUTER JOIN + ${SchemaRdb.getName(surveyId)}.${NodeDefTable.getViewName(nodeDefRoot)} as ${rootEntityTableAlias} + ON r.uuid = ${rootEntityTableAlias}.record_uuid` + : '' + } ${whereCondition} ORDER BY ${ - Object.keys(nodeDefKeysColumnNamesByAlias).includes(toSnakeCase(sortBy)) + nodeDefKeysColumnNamesByAlias && Object.keys(nodeDefKeysColumnNamesByAlias).includes(toSnakeCase(sortBy)) ? `${rootEntityTableAlias}.${nodeDefKeysColumnNamesByAlias[toSnakeCase(sortBy)]}` : `r.${toSnakeCase(sortBy)}` } ${sortOrder} diff --git a/server/modules/record/service/recordService.js b/server/modules/record/service/recordService.js index c5bec0e432..65daa55256 100644 --- a/server/modules/record/service/recordService.js +++ b/server/modules/record/service/recordService.js @@ -1,7 +1,5 @@ import * as fs from 'fs' -import { WebSocketEvent, WebSocketServer } from '@openforis/arena-server' - import * as Log from '@server/log/log' import * as A from '@core/arena' @@ -11,6 +9,7 @@ import * as Survey from '@core/survey/survey' import * as NodeDef from '@core/survey/nodeDef' import * as Record from '@core/record/record' import * as Node from '@core/record/node' +import { NodeValueFormatter } from '@core/record/nodeValueFormatter' import * as RecordValidationReportItem from '@core/record/recordValidationReportItem' import * as RecordFile from '@core/record/recordFile' import * as Authorizer from '@core/auth/authorizer' @@ -25,31 +24,24 @@ import DataImportJob from '@server/modules/dataImport/service/DataImportJob' import DataImportValidationJob from '@server/modules/dataImport/service/DataImportValidationJob' import * as CSVWriter from '@server/utils/file/csvWriter' import * as Response from '@server/utils/response' +import * as FileUtils from '@server/utils/file/fileUtils' import { ExportFileNameGenerator } from '@server/utils/exportFileNameGenerator' import * as SurveyManager from '../../survey/manager/surveyManager' import * as RecordManager from '../manager/recordManager' import * as FileManager from '../manager/fileManager' -import * as RecordServiceThreads from './update/recordServiceThreads' -import { messageTypes as RecordThreadMessageTypes } from './update/thread/recordThreadMessageTypes' +import { RecordsUpdateThreadMessageTypes } from './update/thread/recordsThreadMessageTypes' import RecordsCloneJob from './recordsCloneJob' -import { NodeValueFormatter } from '@core/record/nodeValueFormatter' -import { FileUtils } from '@webapp/utils/fileUtils' +import { RecordsUpdateThreadService } from './update/surveyRecordsThreadService' const Logger = Log.getLogger('RecordService') // RECORD -export const createRecord = async (socketId, user, surveyId, recordToCreate) => { - Logger.debug('create record: ', recordToCreate) +export const createRecord = async ({ user, surveyId, recordToCreate }) => { + Logger.debug('creating record: ', recordToCreate) - const record = await RecordManager.insertRecord(user, surveyId, recordToCreate) - - // Create record thread and initialize record - const thread = RecordServiceThreads.getOrCreatedRecordThread(socketId, user, surveyId, Record.getUuid(recordToCreate)) - thread.postMessage({ type: RecordThreadMessageTypes.recordInit }) - - return record + return RecordManager.insertRecord(user, surveyId, recordToCreate) } export const createRecordFromSamplingPointDataItem = async ({ user, surveyId, cycle, itemUuid }) => { @@ -132,14 +124,7 @@ export const deleteRecord = async ({ socketId, user, surveyId, recordUuid, notif const survey = await SurveyManager.fetchSurveyAndNodeDefsBySurveyId({ surveyId, cycle: Record.getCycle(record) }) await RecordManager.deleteRecord(user, survey, record) - // Notify other users viewing or editing the record it has been deleted - const socketIds = RecordServiceThreads.getSocketIds(recordUuid) - socketIds.forEach((socketIdCurrent) => { - if (socketIdCurrent !== socketId || notifySameUser) { - WebSocketServer.notifySocket(socketIdCurrent, WebSocketEvent.recordDelete, recordUuid) - } - }) - RecordServiceThreads.dissocSocketsByRecordUuid(recordUuid) + RecordsUpdateThreadService.notifyRecordDeleteToSockets({ socketIdUser: socketId, recordUuid, notifySameUser }) } export const deleteRecords = async ({ user, surveyId, recordUuids }) => { @@ -164,24 +149,35 @@ export const deleteRecordsPreview = async (olderThan24Hours = false) => { return count } -export const checkIn = async (socketId, user, surveyId, recordUuid, draft) => { +export const checkIn = async ({ socketId, user, surveyId, recordUuid, draft }) => { const survey = await SurveyManager.fetchSurveyById({ surveyId, draft }) const surveyInfo = Survey.getSurveyInfo(survey) const record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid, draft }) const preview = Record.isPreview(record) + const cycle = Record.getCycle(record) + + RecordsUpdateThreadService.assocSocket({ recordUuid, socketId }) if (preview || (Survey.isPublished(surveyInfo) && Authorizer.canEditRecord(user, record))) { - RecordServiceThreads.getOrCreatedRecordThread(socketId, user, surveyId, recordUuid) + // Create record thread + const thread = RecordsUpdateThreadService.getOrCreatedThread({ surveyId, cycle, draft: preview }) + // initialize record if empty + if (Record.getNodesArray(record).length === 0) { + thread.postMessage({ type: RecordsUpdateThreadMessageTypes.recordInit, user, surveyId, recordUuid }) + } } - - RecordServiceThreads.assocSocket(recordUuid, socketId) - return record } export const checkOut = async (socketId, user, surveyId, recordUuid) => { - const recordSummary = await RecordManager.fetchRecordSummary({ surveyId, recordUuid }) + const recordSummary = await RecordManager.fetchRecordSummary({ + surveyId, + recordUuid, + includeRootKeyValues: false, + includePreview: true, + }) if (Record.isPreview(recordSummary)) { + RecordsUpdateThreadService.killThread({ surveyId, cycle: Record.getCycle(recordSummary), draft: true }) await RecordManager.deleteRecordPreview(surveyId, recordUuid) } else { const record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid, fetchForUpdate: false }) @@ -189,10 +185,10 @@ export const checkOut = async (socketId, user, surveyId, recordUuid) => { await deleteRecord({ socketId, user, surveyId, recordUuid, notifySameUser: true }) } } - RecordServiceThreads.dissocSocket(socketId) + RecordsUpdateThreadService.dissocSocket({ recordUuid, socketId }) } -export const dissocSocketFromRecordThread = RecordServiceThreads.dissocSocket +export const dissocSocketFromUpdateThread = RecordsUpdateThreadService.dissocSocketBySocketId // VALIDATION REPORT export const { fetchValidationReport, countValidationReportItems } = RecordManager @@ -299,22 +295,16 @@ export const startRecordsCloneJob = ({ user, surveyId, cycleFrom, cycleTo, recor } // NODE -const _sendNodeUpdateMessage = (socketId, user, surveyId, recordUuid, msg) => { - RecordServiceThreads.assocSocket(recordUuid, socketId) - - const singleMessage = !RecordServiceThreads.getRecordThread(recordUuid) +const _sendNodeUpdateMessage = ({ socketId, user, surveyId, cycle, recordUuid, draft, msg }) => { + RecordsUpdateThreadService.assocSocket({ recordUuid, socketId }) - const thread = RecordServiceThreads.getOrCreatedRecordThread(socketId, user, surveyId, recordUuid) + const thread = RecordsUpdateThreadService.getOrCreatedThread({ surveyId, cycle, draft }) thread.postMessage(msg, user) - - if (singleMessage) { - RecordServiceThreads.killRecordThread(recordUuid) - } } export const { fetchNodeByUuid } = RecordManager -export const persistNode = async (socketId, user, surveyId, node, file) => { +export const persistNode = async ({ socketId, user, surveyId, draft, cycle, node, file = null }) => { const recordUuid = Node.getRecordUuid(node) if (file) { @@ -330,18 +320,34 @@ export const persistNode = async (socketId, user, surveyId, node, file) => { await FileManager.insertFile(surveyId, fileObj) } - _sendNodeUpdateMessage(socketId, user, surveyId, recordUuid, { - type: RecordThreadMessageTypes.nodePersist, - node, + _sendNodeUpdateMessage({ + socketId, user, + surveyId, + cycle, + draft, + recordUuid, + msg: { + type: RecordsUpdateThreadMessageTypes.nodePersist, + node, + user, + }, }) } -export const deleteNode = (socketId, user, surveyId, recordUuid, nodeUuid) => - _sendNodeUpdateMessage(socketId, user, surveyId, recordUuid, { - type: RecordThreadMessageTypes.nodeDelete, - nodeUuid, +export const deleteNode = ({ socketId, user, surveyId, cycle, draft, recordUuid, nodeUuid }) => + _sendNodeUpdateMessage({ + socketId, user, + surveyId, + cycle, + draft, + recordUuid, + msg: { + type: RecordsUpdateThreadMessageTypes.nodeDelete, + nodeUuid, + user, + }, }) // generates the record file name in this format: file_SURVEYNAME_KEYVALUES_ATTRIBUTENAME_POSITION.EXTENSION @@ -381,7 +387,7 @@ export const generateNodeFileNameForDownload = async ({ surveyId, nodeUuid, file })(record) const fileName = RecordFile.getName(file) - const extension = FileUtils.getExtension(fileName) + const extension = FileUtils.getFileExtension(fileName) return `file_${surveyName}_${fileNameParts.join('_')}.${extension}` } diff --git a/server/modules/record/service/update/recordServiceThreads.js b/server/modules/record/service/update/recordServiceThreads.js deleted file mode 100644 index efa83b4b0a..0000000000 --- a/server/modules/record/service/update/recordServiceThreads.js +++ /dev/null @@ -1,119 +0,0 @@ -import * as R from 'ramda' - -import { WebSocketEvent, WebSocketServer } from '@openforis/arena-server' - -import ThreadManager from '@server/threads/threadManager' -import * as ThreadParams from '@server/threads/threadParams' - -import * as RecordThreadsMap from './recordThreadsMap' -import * as RecordSocketsMap from './recordSocketsMap' -import * as RecordUpdateThreadParams from './thread/recordUpdateThreadParams' -import { messageTypes as RecordThreadMessageTypes } from './thread/recordThreadMessageTypes' - -const recordThreadTimeouts = {} - -// ====== -// THREAD -// ====== - -// ====== CREATE -const _createRecordThread = (socketId, user, surveyId, recordUuid) => { - const data = { - [ThreadParams.keys.socketId]: socketId, - [ThreadParams.keys.user]: user, - [ThreadParams.keys.surveyId]: surveyId, - [RecordUpdateThreadParams.keys.recordUuid]: recordUuid, - } - - const messageHandler = (msg) => { - if (msg.type === RecordThreadMessageTypes.threadKill) { - if (RecordThreadsMap.isZombie(recordUuid)) { - clearTimeout(recordThreadTimeouts[recordUuid]) - delete recordThreadTimeouts[recordUuid] - - const thread = getRecordThread(recordUuid) - thread.terminate() - } - } else { - // Notify all sockets that have checked in the record - const socketIds = RecordSocketsMap.getSocketIds(recordUuid) - socketIds.forEach((socketIdCurrent) => { - WebSocketServer.notifySocket(socketIdCurrent, msg.type, R.prop('content', msg)) - }) - } - } - - const exitHandler = () => { - RecordSocketsMap.dissocSockets(recordUuid) - RecordThreadsMap.remove(recordUuid) - } - - const thread = new ThreadManager('recordUpdateThread.js', data, messageHandler, exitHandler) - - return RecordThreadsMap.put(recordUuid, thread) -} - -// ====== READ - -const _resetThreadInactivityTimeout = (recordUuid) => { - clearTimeout(recordThreadTimeouts[recordUuid]) - - // After one hour of inactivity, thread gets killed and user is notified - recordThreadTimeouts[recordUuid] = setTimeout(() => { - killRecordThread(recordUuid) - - const userUuids = RecordSocketsMap.getSocketIds(recordUuid) - userUuids.forEach((userUuid) => - WebSocketServer.notifyUser(userUuid, WebSocketEvent.recordSessionExpired, recordUuid) - ) - }, 60 * 60 * 1000) -} - -export const getRecordThread = RecordThreadsMap.get - -export const getOrCreatedRecordThread = (socketId, user, surveyId, recordUuid) => { - if (RecordThreadsMap.isZombie(recordUuid)) { - RecordThreadsMap.reviveZombie(recordUuid) - } - - const thread = getRecordThread(recordUuid) || _createRecordThread(socketId, user, surveyId, recordUuid) - _resetThreadInactivityTimeout(recordUuid) - return thread -} - -// ====== DELETE -export const killRecordThread = (recordUuid) => { - const thread = getRecordThread(recordUuid) - - RecordThreadsMap.markZombie(recordUuid) - thread.postMessage({ type: RecordThreadMessageTypes.threadKill }) -} - -// ====== -// SOCKETS -// ====== - -export const getSocketIds = RecordSocketsMap.getSocketIds - -export const assocSocket = RecordSocketsMap.assocSocket - -const _terminateThreadIfNoSockets = (recordUuid) => { - const thread = getRecordThread(recordUuid) - // Terminate thread if there are no more users editing the record - if (thread && !RecordSocketsMap.hasSockets(recordUuid)) { - killRecordThread(recordUuid) - } -} - -export const dissocSocket = (socketId) => { - const recordUuid = RecordSocketsMap.getRecordUuid(socketId) - if (recordUuid) { - RecordSocketsMap.dissocSocket(recordUuid, socketId) - _terminateThreadIfNoSockets(recordUuid) - } -} - -export const dissocSocketsByRecordUuid = (recordUuid) => { - RecordSocketsMap.dissocSockets(recordUuid) - _terminateThreadIfNoSockets(recordUuid) -} diff --git a/server/modules/record/service/update/recordSocketsMap.js b/server/modules/record/service/update/recordSocketsMap.js index 49b62c3665..2a742a6e2d 100644 --- a/server/modules/record/service/update/recordSocketsMap.js +++ b/server/modules/record/service/update/recordSocketsMap.js @@ -1,28 +1,33 @@ const socketIdsByRecordUuid = new Map() -export const getSocketIds = recordUuid => socketIdsByRecordUuid.get(recordUuid) || new Set() +export const getSocketIdsByRecordUuid = (recordUuid) => socketIdsByRecordUuid.get(recordUuid) || new Set() -export const hasSockets = recordUuid => getSocketIds(recordUuid).size !== 0 +const hasSocketsByRecordUuid = (recordUuid) => getSocketIdsByRecordUuid(recordUuid).size > 0 -export const getRecordUuid = socketId => { +const getRecordUuid = (socketId) => { const recordUuids = [...socketIdsByRecordUuid.keys()] - return recordUuids.find(recordUuid => getSocketIds(recordUuid).has(socketId)) + return recordUuids.find((recordUuid) => getSocketIdsByRecordUuid(recordUuid).has(socketId)) } -export const assocSocket = (recordUuid, socketId) => { +export const assocSocket = ({ recordUuid, socketId }) => { if (!socketIdsByRecordUuid.has(recordUuid)) { socketIdsByRecordUuid.set(recordUuid, new Set()) } - getSocketIds(recordUuid).add(socketId) + getSocketIdsByRecordUuid(recordUuid).add(socketId) } -export const dissocSocket = (recordUuid, socketId) => { - getSocketIds(recordUuid).delete(socketId) +export const dissocSocket = ({ recordUuid, socketId }) => { + getSocketIdsByRecordUuid(recordUuid).delete(socketId) - if (!hasSockets(recordUuid)) { - dissocSockets(recordUuid) + if (!hasSocketsByRecordUuid(recordUuid)) { + dissocSocketsByRecordUuid(recordUuid) } } -export const dissocSockets = recordUuid => socketIdsByRecordUuid.delete(recordUuid) +export const dissocSocketBySocketId = (socketId) => { + const recordUuid = getRecordUuid(socketId) + dissocSocket({ recordUuid, socketId }) +} + +export const dissocSocketsByRecordUuid = (recordUuid) => socketIdsByRecordUuid.delete(recordUuid) diff --git a/server/modules/record/service/update/recordThreadsMap.js b/server/modules/record/service/update/recordThreadsMap.js deleted file mode 100644 index 7e2f69b0ec..0000000000 --- a/server/modules/record/service/update/recordThreadsMap.js +++ /dev/null @@ -1,21 +0,0 @@ -import ThreadsCache from '@server/threads/threadsCache' - -const threads = new ThreadsCache() -const threadZombies = new Set() // Set of threads marked to be killed - -// thread cache -export const get = recordUuid => threads.getThread(recordUuid) - -export const put = (recordUuid, thread) => threads.putThread(recordUuid, thread) - -export const remove = recordUuid => { - threads.removeThread(recordUuid) - threadZombies.delete(recordUuid) -} - -// Thread zombies -export const markZombie = recordUuid => threadZombies.add(recordUuid) - -export const reviveZombie = recordUuid => threadZombies.delete(recordUuid) - -export const isZombie = recordUuid => threadZombies.has(recordUuid) diff --git a/server/modules/record/service/update/surveyRecordsThreadMap.js b/server/modules/record/service/update/surveyRecordsThreadMap.js new file mode 100644 index 0000000000..d978b0d997 --- /dev/null +++ b/server/modules/record/service/update/surveyRecordsThreadMap.js @@ -0,0 +1,33 @@ +import ThreadsCache from '@server/threads/threadsCache' + +const threads = new ThreadsCache() +const threadZombies = new Set() // Set of threads marked to be killed + +// thread cache +const getKey = ({ surveyId, cycle, draft }) => `${surveyId}_${cycle}_${draft}` +const get = (threadKey) => threads.getThread(threadKey) +const getThreadsKeysBySurveyId = ({ surveyId }) => threads.findThreadsKeys((key) => key.startsWith(`${surveyId}_`)) +const put = (threadKey, thread) => threads.putThread(threadKey, thread) + +const remove = (threadKey) => { + threads.removeThread(threadKey) + threadZombies.delete(threadKey) +} + +// Thread zombies (inactive threads, to be deleted at timeout) +const markZombie = (threadKey) => threadZombies.add(threadKey) + +const reviveZombie = (threadKey) => threadZombies.delete(threadKey) + +const isZombie = (threadKey) => threadZombies.has(threadKey) + +export const SurveyRecordsThreadMap = { + getKey, + get, + getThreadsKeysBySurveyId, + put, + remove, + markZombie, + reviveZombie, + isZombie, +} diff --git a/server/modules/record/service/update/surveyRecordsThreadService.js b/server/modules/record/service/update/surveyRecordsThreadService.js new file mode 100644 index 0000000000..5c4e6b4977 --- /dev/null +++ b/server/modules/record/service/update/surveyRecordsThreadService.js @@ -0,0 +1,131 @@ +import { WebSocketEvent, WebSocketServer } from '@openforis/arena-server' + +import ThreadManager from '@server/threads/threadManager' +import * as ThreadParams from '@server/threads/threadParams' + +import { RecordsUpdateThreadMessageTypes } from './thread/recordsThreadMessageTypes' +import { SurveyRecordsThreadMap } from './surveyRecordsThreadMap' +import * as RecordSocketsMap from './recordSocketsMap' + +const { get: getThread, getKey: getThreadKey } = SurveyRecordsThreadMap + +const recordsUpdateThreadFileName = 'recordsUpdateThread.js' +const inactivityPeriod = 10 * 60 * 1000 // 10 mins +const threadTimeouts = {} + +// ====== +// THREAD +// ====== + +// ====== CREATE +const _createThread = ({ surveyId, cycle, draft }) => { + const threadData = { + [ThreadParams.keys.surveyId]: surveyId, + [ThreadParams.keys.draft]: draft, + [ThreadParams.keys.cycle]: cycle, + } + const threadKey = getThreadKey({ surveyId, cycle, draft }) + + const handleMessageFromThread = (msg) => { + const { type, content } = msg + if (type === RecordsUpdateThreadMessageTypes.threadKill) { + if (SurveyRecordsThreadMap.isZombie(threadKey)) { + clearTimeout(threadTimeouts[threadKey]) + delete threadTimeouts[threadKey] + + const thread = getThread(threadKey) + thread.terminate() + } + } else { + notifyRecordUpdateToSockets({ eventType: type, content }) + } + } + + const exitHandler = () => { + SurveyRecordsThreadMap.remove(threadKey) + } + + const thread = new ThreadManager(recordsUpdateThreadFileName, threadData, handleMessageFromThread, exitHandler) + + return SurveyRecordsThreadMap.put(threadKey, thread) +} + +// ====== DELETE +const _killThreadByKey = (threadKey) => { + clearTimeout(threadTimeouts[threadKey]) + const thread = getThread(threadKey) + + if (thread) { + SurveyRecordsThreadMap.markZombie(threadKey) + thread.postMessage({ type: RecordsUpdateThreadMessageTypes.threadKill }) + } +} + +const killThread = ({ surveyId, cycle, draft }) => { + const threadKey = getThreadKey({ surveyId, cycle, draft }) + _killThreadByKey(threadKey) +} + +const killSurveyThreads = ({ surveyId }) => { + const threadKeys = SurveyRecordsThreadMap.getThreadsKeysBySurveyId({ surveyId }) + threadKeys.forEach(_killThreadByKey) +} + +// ====== READ + +const _resetThreadInactivityTimeout = (threadKey) => { + clearTimeout(threadTimeouts[threadKey]) + + // After one hour of inactivity, thread gets killed and user is notified + threadTimeouts[threadKey] = setTimeout(_killThreadByKey.bind(null, threadKey), inactivityPeriod) +} + +const getOrCreatedThread = ({ surveyId, cycle, draft = false }) => { + const threadKey = getThreadKey({ surveyId, cycle, draft }) + if (SurveyRecordsThreadMap.isZombie(threadKey)) { + SurveyRecordsThreadMap.reviveZombie(threadKey) + } + + const thread = getThread(threadKey) || _createThread({ surveyId, cycle, draft }) + _resetThreadInactivityTimeout(threadKey) + return thread +} + +// ====== WebSocket notification + +const { assocSocket, dissocSocket, dissocSocketBySocketId } = RecordSocketsMap + +const notifyRecordUpdateToSockets = ({ eventType, content }) => { + const { recordUuid } = content + const socketIds = RecordSocketsMap.getSocketIdsByRecordUuid(recordUuid) + socketIds.forEach((socketId) => { + if (WebSocketServer.isSocketConnected(socketId)) { + WebSocketServer.notifySocket(socketId, eventType, content) + } else { + // socket has been disconnected without checking out the record + RecordSocketsMap.dissocSocket({ recordUuid, socketId }) + } + }) +} + +const notifyRecordDeleteToSockets = ({ socketIdUser, recordUuid, notifySameUser = true }) => { + // Notify other users viewing or editing the record it has been deleted + const socketIds = RecordSocketsMap.getSocketIdsByRecordUuid(recordUuid) + socketIds.forEach((socketId) => { + if (socketId !== socketIdUser || notifySameUser) { + WebSocketServer.notifySocket(socketId, WebSocketEvent.recordDelete, recordUuid) + } + }) + RecordSocketsMap.dissocSocketsByRecordUuid(recordUuid) +} + +export const RecordsUpdateThreadService = { + getOrCreatedThread, + killThread, + killSurveyThreads, + // sockets + assocSocket, + notifyRecordDeleteToSockets, + dissocSocket, + dissocSocketBySocketId, +} diff --git a/server/modules/record/service/update/thread/recordUpdateThread.js b/server/modules/record/service/update/thread/recordUpdateThread.js deleted file mode 100644 index 7574c21a8b..0000000000 --- a/server/modules/record/service/update/thread/recordUpdateThread.js +++ /dev/null @@ -1,171 +0,0 @@ -import { Objects, SystemError } from '@openforis/arena-core' -import { WebSocketEvent } from '@openforis/arena-server' - -import * as Log from '@server/log/log' - -import Thread from '@server/threads/thread' - -import * as Survey from '@core/survey/survey' -import * as Record from '@core/record/record' -import * as Validation from '@core/validation/validation' -import Queue from '@core/queue' - -import * as RecordManager from '../../../manager/recordManager' -import * as SurveyManager from '../../../../survey/manager/surveyManager' -import * as RecordUpdateThreadParams from './recordUpdateThreadParams' -import { messageTypes } from './recordThreadMessageTypes' - -const Logger = Log.getLogger('RecordUpdateThread') - -class RecordUpdateThread extends Thread { - constructor(paramsObj) { - super(paramsObj) - - this.queue = new Queue() - this.survey = null - this.record = null - this.processing = false - } - - sendThreadInitMsg() { - ;(async () => { - await this.messageHandler({ type: messageTypes.threadInit }) - })() - } - - async handleNodesUpdated(updatedNodes) { - if (!Objects.isEmpty(updatedNodes)) { - this.postMessage({ - type: WebSocketEvent.nodesUpdate, - content: updatedNodes, - }) - } - } - - async handleNodesValidationUpdated(validations) { - const recordUpdated = Record.mergeNodeValidations(validations)(this.record) - - this.postMessage({ - type: WebSocketEvent.nodeValidationsUpdate, - content: { - recordUuid: Record.getUuid(this.record), - recordValid: Validation.isObjValid(recordUpdated), - validations, - }, - }) - } - - async onMessage(msg) { - this.queue.enqueue(msg) - await this.processNext() - } - - async processNext() { - if (!this.processing && !this.queue.isEmpty()) { - this.processing = true - - const msg = this.queue.dequeue() - try { - await this.processMessage(msg) - } catch (error) { - // SystemError is an expected error type, e.g. when there's a problem with expressions. - if (error instanceof SystemError) { - this.postMessage({ - type: WebSocketEvent.applicationError, - content: { - key: error.key, - params: error.params, - }, - }) - return // Stop processing - } - - // Unexpected error: Crash and burn - throw error - } - - this.processing = false - await this.processNext() - } - } - - async init() { - // Init record - this.record = await RecordManager.fetchRecordAndNodesByUuid({ - surveyId: this.surveyId, - recordUuid: RecordUpdateThreadParams.getRecordUuid(this.params), - }) - - // Init survey - const preview = Record.isPreview(this.record) - const surveyDb = await SurveyManager.fetchSurveyAndNodeDefsAndRefDataBySurveyId({ - surveyId: this.surveyId, - cycle: Record.getCycle(this.record), - draft: preview, - advanced: true, - }) - - // If in preview mode, unpublished dependencies have not been stored in the db, so we need to build them - const dependencyGraph = preview - ? Survey.buildDependencyGraph(surveyDb) - : await SurveyManager.fetchDependencies(this.surveyId) - - this.survey = Survey.assocDependencyGraph(dependencyGraph)(surveyDb) - } - - async processMessage(msg) { - Logger.debug('process message', msg) - - switch (msg.type) { - case messageTypes.threadInit: - await this.init() - break - - case messageTypes.recordInit: - this.record = await RecordManager.initNewRecord({ - user: this.user, - survey: this.survey, - record: this.record, - nodesUpdateListener: this.handleNodesUpdated.bind(this), - nodesValidationListener: this.handleNodesValidationUpdated.bind(this), - }) - break - - case messageTypes.nodePersist: - this.record = await RecordManager.persistNode({ - user: msg.user, - survey: this.survey, - record: this.record, - node: msg.node, - nodesUpdateListener: this.handleNodesUpdated.bind(this), - nodesValidationListener: this.handleNodesValidationUpdated.bind(this), - }) - break - - case messageTypes.nodeDelete: - this.record = await RecordManager.deleteNode( - msg.user, - this.survey, - this.record, - msg.nodeUuid, - this.handleNodesUpdated.bind(this), - this.handleNodesValidationUpdated.bind(this) - ) - break - - case messageTypes.threadKill: - this.postMessage(msg) - break - - default: - Logger.debug(`Skipping unknown message type: ${msg.type}`) - } - - if ([messageTypes.nodePersist, messageTypes.nodeDelete].includes(msg.type)) { - this.postMessage({ type: WebSocketEvent.nodesUpdateCompleted }) - } - } -} - -const thread = new RecordUpdateThread() -thread.sendThreadInitMsg() diff --git a/server/modules/record/service/update/thread/recordUpdateThreadParams.js b/server/modules/record/service/update/thread/recordUpdateThreadParams.js deleted file mode 100644 index e9e0934fb2..0000000000 --- a/server/modules/record/service/update/thread/recordUpdateThreadParams.js +++ /dev/null @@ -1,7 +0,0 @@ -import * as R from 'ramda' - -export const keys = { - recordUuid: 'recordUuid', -} - -export const getRecordUuid = R.prop(keys.recordUuid) diff --git a/server/modules/record/service/update/thread/recordThreadMessageTypes.js b/server/modules/record/service/update/thread/recordsThreadMessageTypes.js similarity index 74% rename from server/modules/record/service/update/thread/recordThreadMessageTypes.js rename to server/modules/record/service/update/thread/recordsThreadMessageTypes.js index 837d2d56e8..56f3dc4058 100644 --- a/server/modules/record/service/update/thread/recordThreadMessageTypes.js +++ b/server/modules/record/service/update/thread/recordsThreadMessageTypes.js @@ -1,4 +1,4 @@ -export const messageTypes = { +export const RecordsUpdateThreadMessageTypes = { threadKill: 'threadKill', threadInit: 'threadInit', diff --git a/server/modules/record/service/update/thread/recordsUpdateThread.js b/server/modules/record/service/update/thread/recordsUpdateThread.js new file mode 100644 index 0000000000..81762543df --- /dev/null +++ b/server/modules/record/service/update/thread/recordsUpdateThread.js @@ -0,0 +1,205 @@ +import { Objects, SystemError } from '@openforis/arena-core' +import { WebSocketEvent } from '@openforis/arena-server' + +import * as Log from '@server/log/log' + +import Thread from '@server/threads/thread' + +import * as Survey from '@core/survey/survey' +import * as Record from '@core/record/record' +import * as Node from '@core/record/node' +import * as Validation from '@core/validation/validation' +import Queue from '@core/queue' + +import * as RecordManager from '../../../manager/recordManager' +import * as SurveyManager from '../../../../survey/manager/surveyManager' +import { RecordsUpdateThreadMessageTypes } from './recordsThreadMessageTypes' + +const Logger = Log.getLogger('RecordsUpdateThread') + +class RecordsUpdateThread extends Thread { + constructor(paramsObj) { + super(paramsObj) + + this.queue = new Queue() + this.survey = null + this.record = null + this.processing = false + } + + sendThreadInitMsg() { + ;(async () => { + await this.messageHandler({ type: RecordsUpdateThreadMessageTypes.threadInit }) + })() + } + + async handleNodesUpdated({ record, updatedNodes }) { + if (!Objects.isEmpty(updatedNodes)) { + const recordUuid = Record.getUuid(record) + this.postMessage({ + type: WebSocketEvent.nodesUpdate, + content: { + recordUuid, + updatedNodes, + }, + }) + } + } + + async handleNodesValidationUpdated({ record, validations }) { + const recordUpdated = Record.mergeNodeValidations(validations)(record) + + this.postMessage({ + type: WebSocketEvent.nodeValidationsUpdate, + content: { + recordUuid: Record.getUuid(record), + recordValid: Validation.isObjValid(recordUpdated), + validations, + }, + }) + } + + async onMessage(msg) { + this.queue.enqueue(msg) + await this.processNext() + } + + async processNext() { + if (!this.processing && !this.queue.isEmpty()) { + this.processing = true + + const msg = this.queue.dequeue() + try { + await this.processMessage(msg) + } catch (error) { + // SystemError is an expected error type, e.g. when there's a problem with expressions. + if (error instanceof SystemError) { + this.postMessage({ + type: WebSocketEvent.applicationError, + content: { + key: error.key, + params: error.params, + }, + }) + return // Stop processing + } + // Unexpected error: Crash and burn + throw error + } finally { + this.processing = false + } + await this.processNext() + } + } + + async init() { + const { surveyId, cycle, draft } = this.params + + const surveyDb = await SurveyManager.fetchSurveyAndNodeDefsAndRefDataBySurveyId({ + surveyId, + cycle, + draft, + advanced: true, + }) + + // If in preview mode, unpublished dependencies have not been stored in the db, so we need to build them + const dependencyGraph = draft + ? Survey.buildDependencyGraph(surveyDb) + : await SurveyManager.fetchDependencies(surveyId) + + this.survey = Survey.assocDependencyGraph(dependencyGraph)(surveyDb) + + this.recordsByUuid = {} + } + + async processMessage(msg) { + const { type } = msg + Logger.debug('processing message', type) + + switch (type) { + case RecordsUpdateThreadMessageTypes.threadInit: + await this.init() + break + case RecordsUpdateThreadMessageTypes.recordInit: + await this.processRecordInitMsg(msg) + break + case RecordsUpdateThreadMessageTypes.nodePersist: + await this.processRecordNodePersistMsg(msg) + break + case RecordsUpdateThreadMessageTypes.nodeDelete: + await this.processRecordNodeDeleteMsg(msg) + break + case RecordsUpdateThreadMessageTypes.threadKill: + this.postMessage(msg) + break + default: + Logger.debug(`Skipping unknown message type: ${type}`) + } + + if ([RecordsUpdateThreadMessageTypes.nodePersist, RecordsUpdateThreadMessageTypes.nodeDelete].includes(type)) { + const recordUuid = msg.recordUuid || msg.node?.recordUuid + this.postMessage({ type: WebSocketEvent.nodesUpdateCompleted, content: { recordUuid } }) + } + } + + async processRecordInitMsg(msg) { + const { survey, surveyId } = this + const { recordUuid, user } = msg + + let record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid }) + + record = await RecordManager.initNewRecord({ + user, + survey, + record, + nodesUpdateListener: (updatedNodes) => this.handleNodesUpdated.bind(this)({ record, updatedNodes }), + nodesValidationListener: (validations) => this.handleNodesValidationUpdated.bind(this)({ record, validations }), + }) + this.recordsByUuid[recordUuid] = record + } + + async processRecordNodePersistMsg(msg) { + const { survey } = this + const { node, user } = msg + const recordUuid = Node.getRecordUuid(node) + let record = await this.getOrFetchRecord({ recordUuid }) + record = await RecordManager.persistNode({ + user, + survey, + record, + node, + nodesUpdateListener: (updatedNodes) => this.handleNodesUpdated({ record, updatedNodes }), + nodesValidationListener: (validations) => this.handleNodesValidationUpdated({ record, validations }), + }) + this.recordsByUuid[recordUuid] = record + } + + async processRecordNodeDeleteMsg(msg) { + const { survey } = this + const { nodeUuid, recordUuid, user } = msg + + let record = await this.getOrFetchRecord({ recordUuid }) + record = await RecordManager.deleteNode( + user, + survey, + record, + nodeUuid, + (updatedNodes) => this.handleNodesUpdated({ record, updatedNodes }), + (validations) => this.handleNodesValidationUpdated({ record, validations }) + ) + this.recordsByUuid[recordUuid] = record + } + + async getOrFetchRecord({ recordUuid }) { + const { surveyId, recordsByUuid } = this + let record = recordsByUuid[recordUuid] + if (!record) { + record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid }) + recordsByUuid[recordUuid] = record + } + return record + } +} + +const thread = new RecordsUpdateThread() +thread.sendThreadInitMsg() diff --git a/server/modules/record/service/update/thread/recordsUpdateThreadParams.js b/server/modules/record/service/update/thread/recordsUpdateThreadParams.js new file mode 100644 index 0000000000..52288f71aa --- /dev/null +++ b/server/modules/record/service/update/thread/recordsUpdateThreadParams.js @@ -0,0 +1,7 @@ +import * as A from '@core/arena' + +export const keys = { + recordUuid: 'recordUuid', +} + +export const getRecordUuid = A.prop(keys.recordUuid) diff --git a/server/modules/survey/service/surveyService.js b/server/modules/survey/service/surveyService.js index 614e6e9a6a..4b30f27f97 100644 --- a/server/modules/survey/service/surveyService.js +++ b/server/modules/survey/service/surveyService.js @@ -7,9 +7,12 @@ import SurveyCloneJob from './clone/surveyCloneJob' import ExportCsvDataJob from './export/exportCsvDataJob' import SurveyExportJob from './surveyExport/surveyExportJob' import { SchemaSummary } from './schemaSummary' +import { RecordsUpdateThreadService } from '@server/modules/record/service/update/surveyRecordsThreadService' // JOBS export const startPublishJob = (user, surveyId) => { + RecordsUpdateThreadService.killSurveyThreads({ surveyId }) + const job = new SurveyPublishJob({ user, surveyId }) JobManager.executeJobThread(job) @@ -61,6 +64,12 @@ export const startExportCsvDataJob = ({ export const exportSchemaSummary = async ({ surveyId, cycle, outputStream }) => SchemaSummary.exportSchemaSummary({ surveyId, cycle, outputStream }) +export const deleteSurvey = async (surveyId) => { + RecordsUpdateThreadService.killSurveyThreads({ surveyId }) + + await SurveyManager.deleteSurvey(surveyId) +} + export const { // CREATE insertSurvey, @@ -75,7 +84,6 @@ export const { updateSurveyDependencyGraphs, updateSurveyProps, // DELETE - deleteSurvey, deleteTemporarySurveys, // UTILS validateNewSurvey, diff --git a/server/modules/user/service/userService.js b/server/modules/user/service/userService.js index 33ce151608..cfbb063749 100644 --- a/server/modules/user/service/userService.js +++ b/server/modules/user/service/userService.js @@ -40,22 +40,22 @@ export const insertSystemAdminUserIfNotExisting = async (client = db) => client.tx(async (t) => { Logger.debug('checking if admin users exist...') const aminsCount = await UserManager.countSystemAdministrators(t) - Logger.info(`${aminsCount} admin users found; skipping admin user insert`) if (aminsCount > 0) { + Logger.info(`${aminsCount} admin users found; skipping admin user insert`) return null } - + const throwError = (details) => { + throw new SystemError(`Cannot create system admin user: ${details}`) + } const email = ProcessUtils.ENV.adminEmail const password = ProcessUtils.ENV.adminPassword - if (!email && !password) - throw new SystemError('Cannot create system admin user: email or password not specified in environment variables') + if (!email && !password) throwError(`email or password not specified in environment variables`) const validation = await SystemAdminUserValidator.validate({ email, password }) - if (Validation.isNotValid(validation)) - throw new SystemError('Cannot create admin user: email or password are not valid') + if (Validation.isNotValid(validation)) throwError(`email or password are not valid or password is unsafe`) const existingUser = await UserManager.fetchUserByEmail(email, t) - if (existingUser) throw new SystemError(`Cannot crate system admin user: user with email ${email} already exists`) + if (existingUser) throwError(`user with email ${email} already exists`) Logger.debug(`inserting system admin user with email: ${email}`) const passwordEncrypted = UserPasswordUtils.encryptPassword(password) diff --git a/server/threads/threadParams.js b/server/threads/threadParams.js index 3b4e277dc8..8a5e3ee37c 100644 --- a/server/threads/threadParams.js +++ b/server/threads/threadParams.js @@ -1,6 +1,8 @@ import * as R from 'ramda' export const keys = { + cycle: 'cycle', + draft: 'draft', socketId: 'socketId', surveyId: 'surveyId', user: 'user', diff --git a/server/threads/threadsCache.js b/server/threads/threadsCache.js index a49442f1a0..f19378dd2c 100644 --- a/server/threads/threadsCache.js +++ b/server/threads/threadsCache.js @@ -7,6 +7,16 @@ export default class ThreadsCache { return this.threads.get(key) } + findThreadsKeys(keyFilterFunction) { + const result = [] + for (let key of this.threads.keys()) { + if (keyFilterFunction(key)) { + result.push(key) + } + } + return result + } + putThread(key, worker) { this.threads.set(key, worker) return worker diff --git a/server/utils/file/fileUtils.js b/server/utils/file/fileUtils.js index c72c77c41a..bde0c3de8f 100644 --- a/server/utils/file/fileUtils.js +++ b/server/utils/file/fileUtils.js @@ -39,6 +39,12 @@ export const getFileSize = (path) => { return size } +export const getFileExtension = (file) => { + const fileName = typeof file === 'string' ? file : file.name + const extension = fileName.split('.').pop() + return extension +} + export const deleteFile = (path) => fs.unlinkSync(path) // ======= Temp Files diff --git a/webapp/components/DataQuery/Visualizer/Table/store/hooks/useListenToNodeUpdates.js b/webapp/components/DataQuery/Visualizer/Table/store/hooks/useListenToNodeUpdates.js index 939b0f121a..d80e82b661 100644 --- a/webapp/components/DataQuery/Visualizer/Table/store/hooks/useListenToNodeUpdates.js +++ b/webapp/components/DataQuery/Visualizer/Table/store/hooks/useListenToNodeUpdates.js @@ -52,8 +52,8 @@ export const useListenOnNodeUpdates = ({ data, query, setData }) => { // listening to websocket events when data is loaded in edit mode and rows have record property const listeningToWebSocket = modeEdit && data?.length > 0 && Object.prototype.hasOwnProperty.call(data[0], 'record') - const onNodesUpdate = useCallback((nodes) => { - dataClone.current = updateValues({ data: dataClone.current, nodes }) + const onNodesUpdate = useCallback(({ updatedNodes }) => { + dataClone.current = updateValues({ data: dataClone.current, nodes: updatedNodes }) }, []) const onNodeValidationsUpdate = useCallback( diff --git a/webapp/components/survey/Record/store/useLocalState.js b/webapp/components/survey/Record/store/useLocalState.js index bd9b881fe2..a4667af88e 100644 --- a/webapp/components/survey/Record/store/useLocalState.js +++ b/webapp/components/survey/Record/store/useLocalState.js @@ -48,7 +48,7 @@ export const useLocalState = (props) => { // Add websocket event listeners useOnWebSocketEvent({ eventName: WebSocketEvents.nodesUpdate, - eventHandler: useCallback((content) => dispatch(RecordActions.recordNodesUpdate(content)), []), + eventHandler: useCallback((content) => dispatch(RecordActions.recordNodesUpdate(content.updatedNodes)), []), }) useOnWebSocketEvent({ eventName: WebSocketEvents.nodeValidationsUpdate, diff --git a/webapp/store/ui/record/actions/common.js b/webapp/store/ui/record/actions/common.js index b1ca4c09c3..93303469a3 100644 --- a/webapp/store/ui/record/actions/common.js +++ b/webapp/store/ui/record/actions/common.js @@ -13,5 +13,5 @@ export const recordNodesUpdate = (nodes) => (dispatch, getState) => { dispatch(LoaderActions.hideLoader()) } - dispatch({ type: ActionTypes.nodesUpdate, nodes }) + dispatch({ type: ActionTypes.nodesUpdate, nodes: nodes }) } diff --git a/webapp/store/ui/record/actions/delete.js b/webapp/store/ui/record/actions/delete.js index fdd9413c33..da8b190e3f 100644 --- a/webapp/store/ui/record/actions/delete.js +++ b/webapp/store/ui/record/actions/delete.js @@ -14,8 +14,16 @@ export const removeNode = (nodeDef, node) => async (dispatch, getState) => { dispatch(AppSavingActions.showAppSaving()) dispatch({ type: ActionTypes.nodeDelete, node }) - const surveyId = SurveyState.getSurveyId(getState()) - await axios.delete(`/api/survey/${surveyId}/record/${Node.getRecordUuid(node)}/node/${Node.getUuid(node)}`) + const state = getState() + const record = RecordState.getRecord(state) + const surveyId = SurveyState.getSurveyId(state) + const recordUuid = Record.getUuid(record) + const cycle = Record.getCycle(record) + const nodeUuid = Node.getUuid(node) + + await axios.delete(`/api/survey/${surveyId}/record/${recordUuid}/node/${nodeUuid}`, { + data: { cycle }, + }) } export const recordDeleted = diff --git a/webapp/store/ui/record/actions/update.js b/webapp/store/ui/record/actions/update.js index b217a17c7f..735e4eb41d 100644 --- a/webapp/store/ui/record/actions/update.js +++ b/webapp/store/ui/record/actions/update.js @@ -1,6 +1,7 @@ import axios from 'axios' import * as A from '@core/arena' +import * as Record from '@core/record/record' import * as Node from '@core/record/node' import * as NodeRefData from '@core/record/nodeRefData' @@ -21,10 +22,23 @@ const _updateNodeDebounced = (node, file, delay) => { const action = async (dispatch, getState) => { dispatch(AppSavingActions.showAppSaving()) - const formData = objectToFormData({ node: JSON.stringify(node), ...(file ? { file } : {}) }) + const state = getState() + const cycle = SurveyState.getSurveyCycleKey(state) - const surveyId = SurveyState.getSurveyId(getState()) - await axios.post(`/api/survey/${surveyId}/record/${Node.getRecordUuid(node)}/node`, formData) + const record = RecordState.getRecord(state) + const draft = record && Record.isPreview(record) + + const formData = objectToFormData({ + cycle, + draft, + node: JSON.stringify(node), + ...(file ? { file } : {}), + }) + + const recordUuid = Node.getRecordUuid(node) + + const surveyId = SurveyState.getSurveyId(state) + await axios.post(`/api/survey/${surveyId}/record/${recordUuid}/node`, formData) } return debounceAction(action, `node_update_${Node.getUuid(node)}`, delay) diff --git a/webapp/views/App/views/Data/MapView/CoordinateAttributeDataLayer/useCoordinateAttributeDataLayer.js b/webapp/views/App/views/Data/MapView/CoordinateAttributeDataLayer/useCoordinateAttributeDataLayer.js index be09cf398b..5364d4262f 100644 --- a/webapp/views/App/views/Data/MapView/CoordinateAttributeDataLayer/useCoordinateAttributeDataLayer.js +++ b/webapp/views/App/views/Data/MapView/CoordinateAttributeDataLayer/useCoordinateAttributeDataLayer.js @@ -109,11 +109,11 @@ export const useCoordinateAttributeDataLayer = (props) => { useOnWebSocketEvent({ eventName: WebSocketEvents.nodesUpdate, eventHandler: useCallback( - (nodesUpdated) => { + ({ updatedNodes }) => { if (editingRecordUuid) { const rootKeyDefs = Survey.getNodeDefRootKeys(survey) const rootKeyDefsUuids = rootKeyDefs.map(NodeDef.getUuid) - const shouldFetchRecordData = Object.values(nodesUpdated).some( + const shouldFetchRecordData = Object.values(updatedNodes).some( (nodeUpdated) => Node.getRecordUuid(nodeUpdated) === editingRecordUuid && (Node.getNodeDefUuid(nodeUpdated) === NodeDef.getUuid(attributeDef) || diff --git a/webpack.config.server.babel.js b/webpack.config.server.babel.js index 549b6d0b4f..8969029d0a 100644 --- a/webpack.config.server.babel.js +++ b/webpack.config.server.babel.js @@ -36,7 +36,7 @@ export default { entry: { server: entry('server/server.js'), jobThread: entry('server/job/jobThread.js'), - recordUpdateThread: entry('server/modules/record/service/update/thread/recordUpdateThread.js'), + recordsUpdateThread: entry('server/modules/record/service/update/thread/recordsUpdateThread.js'), }, output: { publicPath: 'dist/', diff --git a/yarn.lock b/yarn.lock index 62958a810f..c8f094e4a8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2095,10 +2095,10 @@ proj4 "^2.8.0" uuid "^9.0.0" -"@openforis/arena-core@^0.0.142": - version "0.0.142" - resolved "https://npm.pkg.github.com/download/@openforis/arena-core/0.0.142/a4a18a237a9c2aa498e9c2a96139cb64093c707b#a4a18a237a9c2aa498e9c2a96139cb64093c707b" - integrity sha512-YPH4AdWHznMKvoZRj2kB9g1F/rzzpYc2ykgc9i8+uJ1CJlEW+KkLLpDBqKIJbqb7izg7Vxr5WaszBzuwjdvjGw== +"@openforis/arena-core@^0.0.143": + version "0.0.143" + resolved "https://npm.pkg.github.com/download/@openforis/arena-core/0.0.143/ea286088606f22c4a520bfc219bae084224c690b#ea286088606f22c4a520bfc219bae084224c690b" + integrity sha512-eyCSf6hClYRGdrP6h9yBpTAccLCpj3eI7LgFyYaRYHCvFHHlE74RH+nqrFrC7LfLTAxREld6rQQVSm9nAPNduA== dependencies: "@esri/proj-codes" "^3.1.0" "@jsep-plugin/regex" "^1.0.3" @@ -2112,10 +2112,10 @@ proj4 "^2.9.0" uuid "^9.0.0" -"@openforis/arena-server@^0.1.24": - version "0.1.24" - resolved "https://npm.pkg.github.com/download/@openforis/arena-server/0.1.24/a4c1b1d93d9f76720e5b2ce07be23ac15537f034#a4c1b1d93d9f76720e5b2ce07be23ac15537f034" - integrity sha512-wif6ZsedqWhbqTgTY+H6Zau9hUAlbZNtn2mkTAvxA4/ZTR/ixVWIPbpiaVpQhy7UjGlNlFgYcE83qcjnz30Bsw== +"@openforis/arena-server@^0.1.25": + version "0.1.25" + resolved "https://npm.pkg.github.com/download/@openforis/arena-server/0.1.25/d164aa101afb984dca9f8c636cd5b764c2214c77#d164aa101afb984dca9f8c636cd5b764c2214c77" + integrity sha512-vIBXxw9ENWMXMscZ1RGyZW85Gr2y864gpAfP0aRDQe2t3NiBfE46cVzIKgQXyuO5xJejZJyKUlETD1xhjjmU0g== dependencies: "@godaddy/terminus" "^4.12.0" "@openforis/arena-core" "^0.0.136"