Skip to content

Commit

Permalink
records update thread: use single thread
Browse files Browse the repository at this point in the history
  • Loading branch information
SteRiccio committed Oct 2, 2023
1 parent 13e9df4 commit c91ae44
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 79 deletions.
32 changes: 21 additions & 11 deletions server/modules/record/service/recordService.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ export const checkIn = async ({ socketId, user, surveyId, recordUuid, draft }) =

if (preview || (Survey.isPublished(surveyInfo) && Authorizer.canEditRecord(user, record))) {
// Create record thread
const thread = RecordsUpdateThreadService.getOrCreatedThread({ surveyId, cycle, draft: preview })
const thread = RecordsUpdateThreadService.getOrCreatedThread()
// initialize record if empty
if (Record.getNodesArray(record).length === 0) {
thread.postMessage({ type: RecordsUpdateThreadMessageTypes.recordInit, user, surveyId, recordUuid })
thread.postMessage({ type: RecordsUpdateThreadMessageTypes.recordInit, user, surveyId, cycle, draft, recordUuid })
}
}
return record
Expand All @@ -177,12 +177,22 @@ export const checkOut = async (socketId, user, surveyId, recordUuid) => {
includePreview: true,
})
if (Record.isPreview(recordSummary)) {
RecordsUpdateThreadService.killThread({ surveyId, cycle: Record.getCycle(recordSummary), draft: true })
RecordsUpdateThreadService.clearSurveyDataFromThread({
surveyId,
cycle: Record.getCycle(recordSummary),
draft: true,
})
await RecordManager.deleteRecordPreview(surveyId, recordUuid)
} else {
const record = await RecordManager.fetchRecordAndNodesByUuid({ surveyId, recordUuid, fetchForUpdate: false })
if (Record.isEmpty(record)) {
await deleteRecord({ socketId, user, surveyId, recordUuid, notifySameUser: true })
RecordsUpdateThreadService.clearRecordDataFromThread({
surveyId,
cycle: Record.getCycle(recordSummary),
draft: false,
recordUuid,
})
}
}
RecordsUpdateThreadService.dissocSocket({ recordUuid, socketId })
Expand Down Expand Up @@ -295,10 +305,10 @@ export const startRecordsCloneJob = ({ user, surveyId, cycleFrom, cycleTo, recor
}

// NODE
const _sendNodeUpdateMessage = ({ socketId, user, surveyId, cycle, recordUuid, draft, msg }) => {
const _sendNodeUpdateMessage = ({ socketId, user, recordUuid, msg }) => {
RecordsUpdateThreadService.assocSocket({ recordUuid, socketId })

const thread = RecordsUpdateThreadService.getOrCreatedThread({ surveyId, cycle, draft })
const thread = RecordsUpdateThreadService.getOrCreatedThread()
thread.postMessage(msg, user)
}

Expand All @@ -323,12 +333,12 @@ export const persistNode = async ({ socketId, user, surveyId, draft, cycle, node
_sendNodeUpdateMessage({
socketId,
user,
surveyId,
cycle,
draft,
recordUuid,
msg: {
type: RecordsUpdateThreadMessageTypes.nodePersist,
surveyId,
cycle,
draft,
node,
user,
},
Expand All @@ -339,12 +349,12 @@ export const deleteNode = ({ socketId, user, surveyId, cycle, draft, recordUuid,
_sendNodeUpdateMessage({
socketId,
user,
surveyId,
cycle,
draft,
recordUuid,
msg: {
type: RecordsUpdateThreadMessageTypes.nodeDelete,
surveyId,
cycle,
draft,
recordUuid,
nodeUuid,
user,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ const threads = new ThreadsCache()
const threadZombies = new Set() // Set of threads marked to be killed

// thread cache
const getKey = ({ surveyId, cycle, draft }) => `${surveyId}_${cycle}_${draft}`
const getKey = () => `survey_records_thread`
const get = (threadKey) => threads.getThread(threadKey)
const getThreadsKeysBySurveyId = ({ surveyId }) => threads.findThreadsKeys((key) => key.startsWith(`${surveyId}_`))
const put = (threadKey, thread) => threads.putThread(threadKey, thread)

const remove = (threadKey) => {
Expand All @@ -24,7 +23,6 @@ const isZombie = (threadKey) => threadZombies.has(threadKey)
export const SurveyRecordsThreadMap = {
getKey,
get,
getThreadsKeysBySurveyId,
put,
remove,
markZombie,
Expand Down
41 changes: 21 additions & 20 deletions server/modules/record/service/update/surveyRecordsThreadService.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { WebSocketEvent, WebSocketServer } from '@openforis/arena-server'

import ThreadManager from '@server/threads/threadManager'
import * as ThreadParams from '@server/threads/threadParams'

import { RecordsUpdateThreadMessageTypes } from './thread/recordsThreadMessageTypes'
import { SurveyRecordsThreadMap } from './surveyRecordsThreadMap'
Expand All @@ -18,13 +17,9 @@ const threadTimeouts = {}
// ======

// ====== CREATE
const _createThread = ({ surveyId, cycle, draft }) => {
const threadData = {
[ThreadParams.keys.surveyId]: surveyId,
[ThreadParams.keys.draft]: draft,
[ThreadParams.keys.cycle]: cycle,
}
const threadKey = getThreadKey({ surveyId, cycle, draft })
const _createThread = () => {
const threadData = {}
const threadKey = getThreadKey()

const handleMessageFromThread = (msg) => {
const { type, content } = msg
Expand Down Expand Up @@ -61,14 +56,19 @@ const _killThreadByKey = (threadKey) => {
}
}

const killThread = ({ surveyId, cycle, draft }) => {
const threadKey = getThreadKey({ surveyId, cycle, draft })
const killThread = () => {
const threadKey = getThreadKey()
_killThreadByKey(threadKey)
}

const killSurveyThreads = ({ surveyId }) => {
const threadKeys = SurveyRecordsThreadMap.getThreadsKeysBySurveyId({ surveyId })
threadKeys.forEach(_killThreadByKey)
const clearSurveyDataFromThread = ({ surveyId, cycle = null, draft = false }) => {
const thread = getThread()
thread?.postMessage({ type: RecordsUpdateThreadMessageTypes.surveyClear, surveyId, cycle, draft })
}

const clearRecordDataFromThread = ({ surveyId, cycle, draft, recordUuid }) => {
const thread = getThread()
thread?.postMessage({ type: RecordsUpdateThreadMessageTypes.recordClear, surveyId, cycle, draft, recordUuid })
}

// ====== READ
Expand All @@ -80,8 +80,8 @@ const _resetThreadInactivityTimeout = (threadKey) => {
threadTimeouts[threadKey] = setTimeout(_killThreadByKey.bind(null, threadKey), inactivityPeriod)
}

const getThread = ({ surveyId, cycle, draft = false }) => {
const threadKey = getThreadKey({ surveyId, cycle, draft })
const getThread = () => {
const threadKey = getThreadKey()
if (SurveyRecordsThreadMap.isZombie(threadKey)) {
SurveyRecordsThreadMap.reviveZombie(threadKey)
}
Expand All @@ -92,11 +92,11 @@ const getThread = ({ surveyId, cycle, draft = false }) => {
return thread
}

const getOrCreatedThread = ({ surveyId, cycle, draft = false }) => {
let thread = getThread({ surveyId, cycle, draft })
const getOrCreatedThread = () => {
let thread = getThread()
if (!thread) {
thread = _createThread({ surveyId, cycle, draft })
const threadKey = getThreadKey({ surveyId, cycle, draft })
thread = _createThread()
const threadKey = getThreadKey()
_resetThreadInactivityTimeout(threadKey)
}
return thread
Expand Down Expand Up @@ -134,7 +134,8 @@ export const RecordsUpdateThreadService = {
getOrCreatedThread,
getThread,
killThread,
killSurveyThreads,
clearSurveyDataFromThread,
clearRecordDataFromThread,
// sockets
assocSocket,
notifyRecordDeleteToSockets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ export const RecordsUpdateThreadMessageTypes = {
recordReload: 'recordReload',
nodePersist: 'nodePersist',
nodeDelete: 'nodeDelete',
recordClear: 'recordClear',
surveyClear: 'surveyClear',
}
Loading

0 comments on commit c91ae44

Please sign in to comment.