diff --git a/meteor/server/api/deviceTriggers/RundownContentObserver.ts b/meteor/server/api/deviceTriggers/RundownContentObserver.ts index 3758160320..e0d7dddcef 100644 --- a/meteor/server/api/deviceTriggers/RundownContentObserver.ts +++ b/meteor/server/api/deviceTriggers/RundownContentObserver.ts @@ -32,7 +32,7 @@ export class RundownContentObserver { #observers: Meteor.LiveQueryHandle[] = [] #cache: ContentCache #cancelCache: () => void - #cleanup: () => void = () => { + #cleanup: (() => void) | undefined = () => { throw new Error('RundownContentObserver.#cleanup has not been set!') } #disposed = false @@ -45,8 +45,11 @@ export class RundownContentObserver { ) { logger.silly(`Creating RundownContentObserver for playlist "${rundownPlaylistId}"`) const { cache, cancel: cancelCache } = createReactiveContentCache(() => { + if (this.#disposed) { + this.#cleanup?.() + return + } this.#cleanup = onChanged(cache) - if (this.#disposed) this.#cleanup() }, REACTIVITY_DEBOUNCE) this.#cache = cache @@ -157,5 +160,6 @@ export class RundownContentObserver { this.#cancelCache() this.#observers.forEach((observer) => observer.stop()) this.#cleanup?.() + this.#cleanup = undefined } } diff --git a/meteor/server/api/deviceTriggers/StudioDeviceTriggerManager.ts b/meteor/server/api/deviceTriggers/StudioDeviceTriggerManager.ts index fc1b6ac0c2..bc3f205a82 100644 --- a/meteor/server/api/deviceTriggers/StudioDeviceTriggerManager.ts +++ b/meteor/server/api/deviceTriggers/StudioDeviceTriggerManager.ts @@ -38,20 +38,23 @@ export class StudioDeviceTriggerManager { StudioActionManagers.set(studioId, new StudioActionManager()) } - updateTriggers(cache: ContentCache, showStyleBaseId: ShowStyleBaseId): void { + async updateTriggers(cache: ContentCache, showStyleBaseId: ShowStyleBaseId): Promise { const studioId = this.studioId this.#lastShowStyleBaseId = showStyleBaseId - const rundownPlaylist = cache.RundownPlaylists.findOne({ - activationId: { - $exists: true, - }, - }) - if (!rundownPlaylist) { + const [showStyleBase, rundownPlaylist] = await Promise.all([ + cache.ShowStyleBases.findOneAsync(showStyleBaseId), + cache.RundownPlaylists.findOneAsync({ + activationId: { + $exists: true, + }, + }), + ]) + if (!showStyleBase || !rundownPlaylist) { return } - const context = createCurrentContextFromCache(cache) + const context = await createCurrentContextFromCache(cache) const actionManager = StudioActionManagers.get(studioId) if (!actionManager) throw new Meteor.Error( @@ -60,18 +63,13 @@ export class StudioDeviceTriggerManager { ) actionManager.setContext(context) - const showStyleBase = cache.ShowStyleBases.findOne(showStyleBaseId) - if (!showStyleBase) { - return - } - const { obj: sourceLayers } = applyAndValidateOverrides(showStyleBase.sourceLayersWithOverrides) - const allTriggeredActions = cache.TriggeredActions.find({ + const allTriggeredActions = await cache.TriggeredActions.find({ showStyleBaseId: { $in: [showStyleBaseId, null], }, - }) + }).fetchAsync() const upsertedDeviceTriggerMountedActionIds: DeviceTriggerMountedActionId[] = [] const touchedActionIds: DeviceActionId[] = [] @@ -83,116 +81,126 @@ export class StudioDeviceTriggerManager { const addedPreviewIds: PreviewWrappedAdLibId[] = [] - Object.entries(triggeredAction.actions).forEach(([key, action]) => { - // Since the compiled action is cached using this actionId as a key, having the action - // and the filterChain allows for a quicker invalidation without doing a deepEquals - const actionId = protectString( - `${studioId}_${triggeredAction._id}_${key}_${JSON.stringify(action)}` - ) - const existingAction = actionManager.getAction(actionId) - let thisAction: ExecutableAction - // Use the cached action or put a new one in the cache - if (existingAction) { - thisAction = existingAction - } else { - const compiledAction = createAction(action, sourceLayers) - actionManager.setAction(actionId, compiledAction) - thisAction = compiledAction - } - touchedActionIds.push(actionId) - - Object.entries(triggeredAction.triggers).forEach(([key, trigger]) => { - if (!isDeviceTrigger(trigger)) { - return + const updateActionsPromises = Object.entries(triggeredAction.actions).map( + async ([key, action]) => { + // Since the compiled action is cached using this actionId as a key, having the action + // and the filterChain allows for a quicker invalidation without doing a deepEquals + const actionId = protectString( + `${studioId}_${triggeredAction._id}_${key}_${JSON.stringify(action)}` + ) + const existingAction = actionManager.getAction(actionId) + let thisAction: ExecutableAction + // Use the cached action or put a new one in the cache + if (existingAction) { + thisAction = existingAction + } else { + const compiledAction = createAction(action, sourceLayers) + actionManager.setAction(actionId, compiledAction) + thisAction = compiledAction } + touchedActionIds.push(actionId) - let deviceActionArguments: ShiftRegisterActionArguments | undefined = undefined + const updateMountedActionsPromises = Object.entries( + triggeredAction.triggers + ).map(async ([key, trigger]) => { + if (!isDeviceTrigger(trigger)) { + return + } - if (action.action === DeviceActions.modifyShiftRegister) { - deviceActionArguments = { - type: 'modifyRegister', - register: action.register, - operation: action.operation, - value: action.value, - limitMin: action.limitMin, - limitMax: action.limitMax, + let deviceActionArguments: ShiftRegisterActionArguments | undefined = undefined + + if (action.action === DeviceActions.modifyShiftRegister) { + deviceActionArguments = { + type: 'modifyRegister', + register: action.register, + operation: action.operation, + value: action.value, + limitMin: action.limitMin, + limitMax: action.limitMax, + } } - } - const deviceTriggerMountedActionId = protectString( - `${actionId}_${key}` - ) - DeviceTriggerMountedActions.upsert(deviceTriggerMountedActionId, { - $set: { - studioId, - showStyleBaseId, - actionType: thisAction.action, - actionId, - deviceId: trigger.deviceId, - deviceTriggerId: trigger.triggerId, - values: trigger.values, - deviceActionArguments, - name: triggeredAction.name, - }, - }) - upsertedDeviceTriggerMountedActionIds.push(deviceTriggerMountedActionId) - }) - - if (!isPreviewableAction(thisAction)) { - const adLibPreviewId = protectString(`${actionId}_preview`) - DeviceTriggerMountedActionAdlibsPreview.upsert(adLibPreviewId, { - $set: literal({ - _id: adLibPreviewId, - _rank: 0, - partId: null, - type: undefined, - label: thisAction.action, - item: null, - triggeredActionId: triggeredAction._id, - actionId, - studioId, - showStyleBaseId, - sourceLayerType: undefined, - sourceLayerName: undefined, - styleClassNames: triggeredAction.styleClassNames, - }), + const deviceTriggerMountedActionId = protectString( + `${actionId}_${key}` + ) + upsertedDeviceTriggerMountedActionIds.push(deviceTriggerMountedActionId) + return DeviceTriggerMountedActions.upsertAsync(deviceTriggerMountedActionId, { + $set: { + studioId, + showStyleBaseId, + actionType: thisAction.action, + actionId, + deviceId: trigger.deviceId, + deviceTriggerId: trigger.triggerId, + values: trigger.values, + deviceActionArguments, + name: triggeredAction.name, + }, + }) }) - addedPreviewIds.push(adLibPreviewId) - } else { - const previewedAdLibs = thisAction.preview(context) + if (!isPreviewableAction(thisAction)) { + const adLibPreviewId = protectString(`${actionId}_preview`) - previewedAdLibs.forEach((adLib) => { - const adLibPreviewId = protectString( - `${triggeredAction._id}_${studioId}_${key}_${adLib._id}` - ) - DeviceTriggerMountedActionAdlibsPreview.upsert(adLibPreviewId, { + addedPreviewIds.push(adLibPreviewId) + await DeviceTriggerMountedActionAdlibsPreview.upsertAsync(adLibPreviewId, { $set: literal({ - ...adLib, _id: adLibPreviewId, + _rank: 0, + partId: null, + type: undefined, + label: thisAction.action, + item: null, triggeredActionId: triggeredAction._id, actionId, studioId, showStyleBaseId, - sourceLayerType: adLib.sourceLayerId - ? sourceLayers[adLib.sourceLayerId]?.type - : undefined, - sourceLayerName: adLib.sourceLayerId - ? { - name: sourceLayers[adLib.sourceLayerId]?.name, - abbreviation: sourceLayers[adLib.sourceLayerId]?.abbreviation, - } - : undefined, + sourceLayerType: undefined, + sourceLayerName: undefined, styleClassNames: triggeredAction.styleClassNames, }), }) + } else { + const previewedAdLibs = thisAction.preview(context) + + const previewedAdlibsUpdatePromises = previewedAdLibs.map(async (adLib) => { + const adLibPreviewId = protectString( + `${triggeredAction._id}_${studioId}_${key}_${adLib._id}` + ) + + addedPreviewIds.push(adLibPreviewId) + return DeviceTriggerMountedActionAdlibsPreview.upsertAsync(adLibPreviewId, { + $set: literal({ + ...adLib, + _id: adLibPreviewId, + triggeredActionId: triggeredAction._id, + actionId, + studioId, + showStyleBaseId, + sourceLayerType: adLib.sourceLayerId + ? sourceLayers[adLib.sourceLayerId]?.type + : undefined, + sourceLayerName: adLib.sourceLayerId + ? { + name: sourceLayers[adLib.sourceLayerId]?.name, + abbreviation: sourceLayers[adLib.sourceLayerId]?.abbreviation, + } + : undefined, + styleClassNames: triggeredAction.styleClassNames, + }), + }) + }) - addedPreviewIds.push(adLibPreviewId) - }) + await Promise.all(previewedAdlibsUpdatePromises) + } + + await Promise.all(updateMountedActionsPromises) } - }) + ) - DeviceTriggerMountedActionAdlibsPreview.remove({ + await Promise.all(updateActionsPromises) + + await DeviceTriggerMountedActionAdlibsPreview.removeAsync({ triggeredActionId: triggeredAction._id, _id: { $nin: addedPreviewIds, @@ -200,7 +208,7 @@ export class StudioDeviceTriggerManager { }) } - DeviceTriggerMountedActions.remove({ + await DeviceTriggerMountedActions.removeAsync({ studioId, showStyleBaseId, _id: { @@ -211,7 +219,7 @@ export class StudioDeviceTriggerManager { actionManager.deleteActionsOtherThan(touchedActionIds) } - clearTriggers(): void { + async clearTriggers(): Promise { const studioId = this.studioId const showStyleBaseId = this.#lastShowStyleBaseId @@ -226,28 +234,34 @@ export class StudioDeviceTriggerManager { `No Studio Action Manager available to handle action context in Studio "${studioId}"` ) - DeviceTriggerMountedActions.find({ + const mountedActions = await DeviceTriggerMountedActions.find({ studioId, showStyleBaseId, - }).forEach((mountedAction) => { + }).fetchAsync() + for (const mountedAction of mountedActions) { actionManager.deleteAction(mountedAction.actionId) - }) - DeviceTriggerMountedActions.remove({ - studioId, - showStyleBaseId, - }) - DeviceTriggerMountedActionAdlibsPreview.remove({ - studioId, - showStyleBaseId, - }) + } + + await Promise.all([ + DeviceTriggerMountedActions.removeAsync({ + studioId, + showStyleBaseId, + }), + DeviceTriggerMountedActionAdlibsPreview.removeAsync({ + studioId, + showStyleBaseId, + }), + ]) + actionManager.deleteContext() this.#lastShowStyleBaseId = null + return } - stop(): void { - this.clearTriggers() + async stop(): Promise { StudioActionManagers.delete(this.studioId) + return await this.clearTriggers() } } @@ -266,8 +280,8 @@ function convertDocument(doc: ReadonlyObjectDeep): UITrigger }) } -function createCurrentContextFromCache(cache: ContentCache): ReactivePlaylistActionContext { - const rundownPlaylist = cache.RundownPlaylists.findOne({ +async function createCurrentContextFromCache(cache: ContentCache): Promise { + const rundownPlaylist = await cache.RundownPlaylists.findOneAsync({ activationId: { $exists: true, }, @@ -275,24 +289,26 @@ function createCurrentContextFromCache(cache: ContentCache): ReactivePlaylistAct if (!rundownPlaylist) throw new Error('There should be an active RundownPlaylist!') - const currentPartInstance = rundownPlaylist.currentPartInfo - ? cache.PartInstances.findOne(rundownPlaylist.currentPartInfo.partInstanceId) - : undefined - const nextPartInstance = rundownPlaylist.nextPartInfo - ? cache.PartInstances.findOne(rundownPlaylist.nextPartInfo.partInstanceId) - : undefined + const [currentPartInstance, nextPartInstance] = await Promise.all([ + rundownPlaylist.currentPartInfo + ? cache.PartInstances.findOneAsync(rundownPlaylist.currentPartInfo.partInstanceId) + : undefined, + rundownPlaylist.nextPartInfo + ? cache.PartInstances.findOneAsync(rundownPlaylist.nextPartInfo.partInstanceId) + : undefined, + ]) const currentSegmentPartIds = currentPartInstance - ? cache.Parts.find({ + ? await cache.Parts.find({ segmentId: currentPartInstance.part.segmentId, - }).map((part) => part._id) + }).mapAsync((part) => part._id) : [] const nextSegmentPartIds = nextPartInstance ? nextPartInstance.part.segmentId === currentPartInstance?.part.segmentId ? currentSegmentPartIds - : cache.Parts.find({ + : await cache.Parts.find({ segmentId: nextPartInstance.part.segmentId, - }).map((part) => part._id) + }).mapAsync((part) => part._id) : [] return { diff --git a/meteor/server/api/deviceTriggers/observer.ts b/meteor/server/api/deviceTriggers/observer.ts index bcc7fa8853..ef3be0679c 100644 --- a/meteor/server/api/deviceTriggers/observer.ts +++ b/meteor/server/api/deviceTriggers/observer.ts @@ -29,28 +29,28 @@ Meteor.startup(() => { const studioObserversAndManagers = new Map() const jobQueue = new JobQueueWithClasses({ autoStart: true, - executionWrapper: Meteor.bindEnvironment, - resolutionWrapper: Meteor.defer, }) - function workInQueue(fnc: () => Promise) { - jobQueue.add(fnc).catch((e) => { - logger.error(`Error in DeviceTriggers Studio observer reaction: ${stringifyError(e)}`) - }) + function workInQueue(fnc: () => Promise) { + jobQueue + .add(async () => { + const res = await fnc() + return res + }) + .catch((e) => { + logger.error(`Error in DeviceTriggers Studio observer reaction: ${stringifyError(e)}`) + }) } function createObserverAndManager(studioId: StudioId) { logger.debug(`Creating observer for studio "${studioId}"`) const manager = new StudioDeviceTriggerManager(studioId) const observer = new StudioObserver(studioId, (showStyleBaseId, cache) => { - workInQueue(async () => { - manager.updateTriggers(cache, showStyleBaseId) - }) + logger.silly(`Studio observer updating triggers for "${studioId}":"${showStyleBaseId}"`) + workInQueue(async () => manager.updateTriggers(cache, showStyleBaseId)) return () => { - workInQueue(async () => { - manager.clearTriggers() - }) + workInQueue(async () => manager.clearTriggers()) } }) @@ -58,15 +58,16 @@ Meteor.startup(() => { } function destroyObserverAndManager(studioId: StudioId) { - logger.debug(`Destroying observer for studio "${studioId}"`) - const toRemove = studioObserversAndManagers.get(studioId) - if (toRemove) { - toRemove.observer.stop() - toRemove.manager.stop() - studioObserversAndManagers.delete(studioId) - } else { - logger.error(`Observer for studio "${studioId}" not found`) - } + workInQueue(async () => { + const toRemove = studioObserversAndManagers.get(studioId) + if (toRemove) { + toRemove.observer.stop() + await toRemove.manager.stop() + studioObserversAndManagers.delete(studioId) + } else { + logger.error(`Observer for studio "${studioId}" not found`) + } + }) } Studios.observeChanges( diff --git a/meteor/server/publications/lib/ReactiveCacheCollection.ts b/meteor/server/publications/lib/ReactiveCacheCollection.ts index 294430cab1..6b7df53a34 100644 --- a/meteor/server/publications/lib/ReactiveCacheCollection.ts +++ b/meteor/server/publications/lib/ReactiveCacheCollection.ts @@ -69,6 +69,55 @@ export class ReactiveCacheCollection, + options?: Omit, 'limit'> + ): Promise { + return this.#collection.findOne(selector as any, options) + } + + async insertAsync(doc: Mongo.OptionalId): Promise { + const id = await this.#collection.insertAsync(doc) + this.runReaction() + return id + } + + async removeAsync(selector: Document['_id'] | MongoQuery): Promise { + const num = await this.#collection.removeAsync(selector as any) + if (num > 0) { + this.runReaction() + } + return num + } + + async updateAsync( + selector: Document['_id'] | MongoQuery, + modifier: MongoModifier, + options?: { + multi?: boolean + upsert?: boolean + arrayFilters?: { [identifier: string]: any }[] + } + ): Promise { + const num = await this.#collection.updateAsync(selector as any, modifier as any, options) + if (num > 0) { + this.runReaction() + } + return num + } + + async upsertAsync( + selector: Document['_id'] | Mongo.Selector, + modifier: Mongo.Modifier, + options?: { multi?: boolean } + ): Promise<{ numberAffected?: number; insertedId?: string }> { + const res = await this.#collection.upsertAsync(selector as any, modifier as any, options) + if (res.numberAffected || res.insertedId) { + this.runReaction() + } + return res + } + link(cb?: () => void): ObserveChangesCallbacks { return { added: (id: Document['_id'], fields: Partial) => {