Skip to content

Commit

Permalink
feat: rework CacheForPlayout as structured objects SOFIE-2513
Browse files Browse the repository at this point in the history
  • Loading branch information
Julusian committed Oct 10, 2023
1 parent f2281bd commit 2b032f3
Show file tree
Hide file tree
Showing 62 changed files with 783 additions and 703 deletions.
6 changes: 5 additions & 1 deletion packages/corelib/src/dataModel/PieceInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export function rewrapPieceToInstance(
): PieceInstance {
return {
isTemporary,
_id: protectString(`${partInstanceId}_${piece._id}`),
_id: getPieceInstanceIdForPiece(partInstanceId, piece._id),
rundownId: rundownId,
playlistActivationId: playlistActivationId,
partInstanceId: partInstanceId,
Expand All @@ -128,3 +128,7 @@ export function wrapPieceToInstance(
partInstanceId === protectString('') || isTemporary
)
}

export function getPieceInstanceIdForPiece(partInstanceId: PartInstanceId, pieceId: PieceId): PieceInstanceId {
return protectString(`${partInstanceId}_${pieceId}`)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
} from '@sofie-automation/blueprints-integration'
import { ActionExecutionContext, ActionPartChange } from '../context/adlibActions'
import { isTooCloseToAutonext } from '../../playout/lib'
import { PlayoutModel } from '../../playout/cacheModel/PlayoutModel'
import { PlayoutModel } from '../../playout/model/PlayoutModel'
import { WatchedPackagesHelper } from '../context/watchedPackages'
import { MockJobContext, setupDefaultJobEnvironment } from '../../__mocks__/context'
import { runJobWithPlayoutCache } from '../../playout/lock'
Expand All @@ -31,11 +31,11 @@ import {
EmptyPieceTimelineObjectsBlob,
serializePieceTimelineObjectsBlob,
} from '@sofie-automation/corelib/dist/dataModel/Piece'
import { PartInstanceWithPieces } from '../../playout/cacheModel/PartInstanceWithPieces'
import { PlayoutPartInstanceModel } from '../../playout/model/PlayoutPartInstanceModel'
import { convertPartInstanceToBlueprints, convertPieceInstanceToBlueprints } from '../context/lib'
import { TimelineObjRundown, TimelineObjType } from '@sofie-automation/corelib/dist/dataModel/Timeline'
import { PartInstanceWithPiecesImpl } from '../../playout/cacheModel/implementation/PartInstanceWithPiecesImpl'
import { writePartInstancesAndPieceInstances } from '../../playout/cacheModel/implementation/SavePlayoutModel'
import { PlayoutPartInstanceModelImpl } from '../../playout/model/implementation/PlayoutPartInstanceModelImpl'
import { writePartInstancesAndPieceInstances } from '../../playout/model/implementation/SavePlayoutModel'

import * as PlayoutAdlib from '../../playout/adlibUtils'
type TinnerStopPieces = jest.MockedFunction<typeof PlayoutAdlib.innerStopPieces>
Expand Down Expand Up @@ -70,7 +70,7 @@ describe('Test blueprint api context', () => {
context: MockJobContext,
activationId: RundownPlaylistActivationId,
rundownId: RundownId
): Promise<PartInstanceWithPieces[]> {
): Promise<PlayoutPartInstanceModel[]> {
const parts = await context.mockCollections.Parts.findFetch({ rundownId })
for (let i = 0; i < parts.length; i++) {
const part = parts[i]
Expand Down Expand Up @@ -135,7 +135,7 @@ describe('Test blueprint api context', () => {
const pieceInstances = await context.mockCollections.PieceInstances.findFetch({
partInstanceId: partInstance._id,
})
return new PartInstanceWithPiecesImpl(partInstance, pieceInstances, false)
return new PlayoutPartInstanceModelImpl(partInstance, pieceInstances, false)
})
)
}
Expand Down Expand Up @@ -196,7 +196,7 @@ describe('Test blueprint api context', () => {
jobContext: MockJobContext
playlistId: RundownPlaylistId
rundownId: RundownId
allPartInstances: PartInstanceWithPieces[]
allPartInstances: PlayoutPartInstanceModel[]
}> {
const context = setupDefaultJobEnvironment()

Expand Down Expand Up @@ -230,13 +230,13 @@ describe('Test blueprint api context', () => {
async function saveAllToDatabase(
context: JobContext,
cache: PlayoutModel,
allPartInstances: PartInstanceWithPieces[]
allPartInstances: PlayoutPartInstanceModel[]
) {
// We need to push changes back to 'mongo' for these tests
await Promise.all(
writePartInstancesAndPieceInstances(
context,
normalizeArrayToMapFunc(allPartInstances as PartInstanceWithPiecesImpl[], (p) => p.PartInstance._id)
normalizeArrayToMapFunc(allPartInstances as PlayoutPartInstanceModelImpl[], (p) => p.PartInstance._id)
)
)
await cache.saveAllToDatabase()
Expand All @@ -245,11 +245,11 @@ describe('Test blueprint api context', () => {
async function setPartInstances(
jobContext: MockJobContext,
playlistId: RundownPlaylistId,
currentPartInstance: PartInstanceWithPieces | DBPartInstance | PieceInstance | undefined | null,
nextPartInstance: PartInstanceWithPieces | DBPartInstance | PieceInstance | undefined | null,
previousPartInstance?: PartInstanceWithPieces | DBPartInstance | PieceInstance | null
currentPartInstance: PlayoutPartInstanceModel | DBPartInstance | PieceInstance | undefined | null,
nextPartInstance: PlayoutPartInstanceModel | DBPartInstance | PieceInstance | undefined | null,
previousPartInstance?: PlayoutPartInstanceModel | DBPartInstance | PieceInstance | null
) {
const convertInfo = (info: PartInstanceWithPieces | DBPartInstance | PieceInstance | null) => {
const convertInfo = (info: PlayoutPartInstanceModel | DBPartInstance | PieceInstance | null) => {
if (!info) {
return null
} else if ('partInstanceId' in info) {
Expand Down Expand Up @@ -452,7 +452,7 @@ describe('Test blueprint api context', () => {
(
context2: JobContext,
sourceLayers: SourceLayers,
partInstance: PartInstanceWithPieces,
partInstance: PlayoutPartInstanceModel,
now?: number
) => {
expect(context2).toBe(jobContext)
Expand Down Expand Up @@ -1242,7 +1242,7 @@ describe('Test blueprint api context', () => {

// Ensure there are no pending updates already
for (const partInstance of cache.LoadedPartInstances) {
expect((partInstance as PartInstanceWithPiecesImpl).HasChanges).toBeFalsy()
expect((partInstance as PlayoutPartInstanceModelImpl).HasChanges).toBeFalsy()
}

// Update it and expect it to match
Expand Down Expand Up @@ -1289,7 +1289,7 @@ describe('Test blueprint api context', () => {
},
}
expect(pieceInstance1).toEqual(pieceInstance0After)
expect((partInstance1 as PartInstanceWithPiecesImpl).HasChanges).toBeTruthy()
expect((partInstance1 as PlayoutPartInstanceModelImpl).HasChanges).toBeTruthy()
// expect(
// Array.from(cache.PieceInstances.documents.values()).filter((doc) => !doc || !!doc.updated)
// ).toMatchObject([
Expand Down Expand Up @@ -1380,7 +1380,7 @@ describe('Test blueprint api context', () => {
playOffset: 0,
take: undefined,
}
cache.replacePartInstance(new PartInstanceWithPiecesImpl(partInstance, [], true))
cache.replacePartInstance(new PlayoutPartInstanceModelImpl(partInstance, [], true))

expect(isTooCloseToAutonext(partInstance, true)).toBeTruthy()
await expect(context.queuePart({} as any, [{}] as any)).rejects.toThrow(
Expand Down Expand Up @@ -1720,7 +1720,7 @@ describe('Test blueprint api context', () => {
const { context } = await getActionExecutionContext(jobContext, cache)

// Ensure there are no pending updates already
expect((cache.NextPartInstance! as PartInstanceWithPiecesImpl).HasChanges).toBeFalsy()
expect((cache.NextPartInstance! as PlayoutPartInstanceModelImpl).HasChanges).toBeFalsy()

// Update it and expect it to match
const partInstance0Before = clone(partInstance0)
Expand All @@ -1745,7 +1745,7 @@ describe('Test blueprint api context', () => {
},
}
expect(partInstance1.PartInstance).toEqual(pieceInstance0After)
expect((partInstance1 as PartInstanceWithPiecesImpl).HasChanges).toBeTruthy()
expect((partInstance1 as PlayoutPartInstanceModelImpl).HasChanges).toBeTruthy()
// expect(
// Array.from(cache.PartInstances.documents.values()).filter((doc) => !doc || !!doc.updated)
// ).toMatchObject([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { PeripheralDeviceId } from '@sofie-automation/shared-lib/dist/core/model
import { ReadonlyDeep } from 'type-fest'
import { JobContext, ProcessedShowStyleCompound } from '../../jobs'
import { executePeripheralDeviceAction, listPlayoutDevices } from '../../peripheralDevice'
import { PlayoutModel } from '../../playout/cacheModel/PlayoutModel'
import { PlayoutModel } from '../../playout/model/PlayoutModel'
import { RundownEventContext } from './RundownEventContext'
import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown'

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import { PieceInstanceId, RundownPlaylistActivationId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import {
PieceInstance,
PieceInstancePiece,
wrapPieceToInstance,
} from '@sofie-automation/corelib/dist/dataModel/PieceInstance'
import { PieceInstanceId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { PieceInstance, PieceInstancePiece } from '@sofie-automation/corelib/dist/dataModel/PieceInstance'
import { clone, normalizeArrayToMap, omit } from '@sofie-automation/corelib/dist/lib'
import { protectString, protectStringArray, unprotectStringArray } from '@sofie-automation/corelib/dist/protectedString'
import { PartInstanceWithPieces } from '../../playout/cacheModel/PartInstanceWithPieces'
import { setupPieceInstanceInfiniteProperties } from '../../playout/pieces'
import { PlayoutPartInstanceModel } from '../../playout/model/PlayoutPartInstanceModel'
import { ReadonlyDeep } from 'type-fest'
import _ = require('underscore')
import { ContextInfo } from './CommonContext'
Expand Down Expand Up @@ -44,16 +39,15 @@ export class SyncIngestUpdateToPartInstanceContext
{
private readonly _proposedPieceInstances: Map<PieceInstanceId, ReadonlyDeep<PieceInstance>>

private partInstance: PartInstanceWithPieces | null
private partInstance: PlayoutPartInstanceModel | null

constructor(
private readonly _context: JobContext,
contextInfo: ContextInfo,
private readonly playlistActivationId: RundownPlaylistActivationId,
studio: ReadonlyDeep<DBStudio>,
showStyleCompound: ReadonlyDeep<ProcessedShowStyleCompound>,
rundown: ReadonlyDeep<DBRundown>,
partInstance: PartInstanceWithPieces,
partInstance: PlayoutPartInstanceModel,
proposedPieceInstances: ReadonlyDeep<PieceInstance[]>,
private playStatus: 'previous' | 'current' | 'next'
) {
Expand Down Expand Up @@ -126,17 +120,8 @@ export class SyncIngestUpdateToPartInstanceContext
this.partInstance.PartInstance.part._id,
this.playStatus === 'current'
)[0]
const newPieceInstance = wrapPieceToInstance(
piece,
this.playlistActivationId,
this.partInstance.PartInstance._id
)

// Ensure the infinite-ness is setup correctly. We assume any piece inserted starts in the current part
setupPieceInstanceInfiniteProperties(newPieceInstance)

// nocommit - this is wrong?
this.partInstance.replacePieceInstance(newPieceInstance)
const newPieceInstance = this.partInstance.insertPlannedPiece(piece)

return convertPieceInstanceToBlueprints(newPieceInstance)
}
Expand Down
12 changes: 6 additions & 6 deletions packages/job-worker/src/blueprints/context/adlibActions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import { PartInstanceId, PeripheralDeviceId, PieceInstanceId } from '@sofie-auto
import { assertNever, getRandomId, omit } from '@sofie-automation/corelib/dist/lib'
import { logger } from '../../logging'
import { ReadonlyDeep } from 'type-fest'
import { PlayoutModel } from '../../playout/cacheModel/PlayoutModel'
import { PartInstanceWithPieces } from '../../playout/cacheModel/PartInstanceWithPieces'
import { PlayoutModel } from '../../playout/model/PlayoutModel'
import { PlayoutPartInstanceModel } from '../../playout/model/PlayoutPartInstanceModel'
import { UserContextInfo } from './CommonContext'
import { ShowStyleUserContext } from './ShowStyleUserContext'
import { WatchedPackagesHelper } from './watchedPackages'
Expand Down Expand Up @@ -66,7 +66,7 @@ import { ProcessedShowStyleConfig } from '../config'
import { DatastorePersistenceMode } from '@sofie-automation/shared-lib/dist/core/model/TimelineDatastore'
import { getDatastoreId } from '../../playout/datastore'
import { executePeripheralDeviceAction, listPlayoutDevices } from '../../peripheralDevice'
import { RundownWithSegments } from '../../playout/cacheModel/RundownWithSegments'
import { PlayoutRundownModel } from '../../playout/model/PlayoutRundownModel'

export enum ActionPartChange {
NONE = 0,
Expand Down Expand Up @@ -123,7 +123,7 @@ export class DatastoreActionExecutionContext
export class ActionExecutionContext extends ShowStyleUserContext implements IActionExecutionContext, IEventContext {
private readonly _context: JobContext
private readonly _cache: PlayoutModel
private readonly rundown: RundownWithSegments
private readonly rundown: PlayoutRundownModel

/** To be set by any mutation methods on this context. Indicates to core how extensive the changes are to the current partInstance */
public currentPartState: ActionPartChange = ActionPartChange.NONE
Expand All @@ -138,7 +138,7 @@ export class ActionExecutionContext extends ShowStyleUserContext implements IAct
cache: PlayoutModel,
showStyle: ReadonlyDeep<ProcessedShowStyleCompound>,
_showStyleBlueprintConfig: ProcessedShowStyleConfig,
rundown: RundownWithSegments,
rundown: PlayoutRundownModel,
watchedPackages: WatchedPackagesHelper
) {
super(contextInfo, context, showStyle, watchedPackages)
Expand All @@ -148,7 +148,7 @@ export class ActionExecutionContext extends ShowStyleUserContext implements IAct
this.takeAfterExecute = false
}

private _getPartInstance(part: 'current' | 'next'): PartInstanceWithPieces | null {
private _getPartInstance(part: 'current' | 'next'): PlayoutPartInstanceModel | null {
switch (part) {
case 'current':
return this._cache.CurrentPartInstance
Expand Down
38 changes: 4 additions & 34 deletions packages/job-worker/src/cache/CacheBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ProtectedString } from '@sofie-automation/corelib/dist/protectedString'
import { DbCacheReadCollection, DbCacheWriteCollection } from './CacheCollection'
import { DbCacheReadObject, DbCacheWriteObject, DbCacheWriteOptionalObject } from './CacheObject'
import { isDbCacheWritable } from './lib'
import { anythingChanged, Changes, sumChanges } from '../db/changes'
import { anythingChanged, sumChanges } from '../db/changes'
import { IS_PRODUCTION } from '../environment'
import { logger } from '../logging'
import { sleep } from '@sofie-automation/corelib/dist/lib'
Expand Down Expand Up @@ -32,10 +32,9 @@ export type ReadOnlyCache<T extends CacheBase<any>> = Omit<
export interface ICacheBase2 {
readonly DisplayName: string

discardChanges(): void
dispose(): void

hasChanges(): boolean
assertNoChanges(): void
}

/** This cache contains data relevant in a studio */
Expand Down Expand Up @@ -85,10 +84,8 @@ export abstract class ReadOnlyCacheBase<T extends ReadOnlyCacheBase<never>> impl
this._deferredBeforeSaveFunctions.length = 0 // clear the array

const { highPrioDBs, lowPrioDBs } = this.getAllCollections()
const customHighPrios = this.saveAllCustomHighPrioCollections()

if (highPrioDBs.length || customHighPrios.length) {
const allSaves = [...highPrioDBs.map(async (db) => db.updateDatabaseWithData()), ...customHighPrios]
if (highPrioDBs.length) {
const allSaves = [...highPrioDBs.map(async (db) => db.updateDatabaseWithData())]
const anyThingChanged = anythingChanged(sumChanges(...(await Promise.all(allSaves))))
if (anyThingChanged && !process.env.JEST_WORKER_ID) {
// Wait a little bit before saving the rest.
Expand All @@ -111,10 +108,6 @@ export abstract class ReadOnlyCacheBase<T extends ReadOnlyCacheBase<never>> impl
if (span) span.end()
}

protected saveAllCustomHighPrioCollections(): Array<Promise<Changes>> {
return []
}

/**
* Discard all changes to documents in the cache.
* This essentially acts as rolling back this transaction, and lets the cache be reused for another operation instead
Expand Down Expand Up @@ -194,29 +187,6 @@ export abstract class ReadOnlyCacheBase<T extends ReadOnlyCacheBase<never>> impl

if (span) span.end()
}

hasChanges(): boolean {
const { allDBs } = this.getAllCollections()

if (this._deferredBeforeSaveFunctions.length > 0) {
logger.silly(`hasChanges: _deferredBeforeSaveFunctions.length=${this._deferredBeforeSaveFunctions.length}`)
return true
}

if (this._deferredAfterSaveFunctions.length > 0) {
logger.silly(`hasChanges: _deferredAfterSaveFunctions.length=${this._deferredAfterSaveFunctions.length}`)
return true
}

for (const db of allDBs) {
if (db.isModified()) {
logger.silly(`hasChanges: db=${db.name}`)
return true
}
}

return false
}
}
export interface ICacheBase<T> {
/** Defer provided function (it will be run just before cache.saveAllToDatabase() ) */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { sleep } from '@sofie-automation/corelib/dist/lib'
import { protectString } from '@sofie-automation/corelib/dist/protectedString'
import { getRundownId } from '../../ingest/lib'
import { CacheForIngest } from '../../ingest/cache'
import { CacheForStudio } from '../../studio/cache'
import { loadStudioPlayoutModel } from '../../studio/StudioPlayoutModelImpl'
import { MockJobContext, setupDefaultJobEnvironment } from '../../__mocks__/context'

describe('DatabaseCaches', () => {
Expand Down Expand Up @@ -293,7 +293,7 @@ describe('DatabaseCaches', () => {
}

{
const cache = await CacheForStudio.create(context)
const cache = await loadStudioPlayoutModel(context)

// Insert a document:
cache.deferBeforeSave(() => {
Expand All @@ -306,7 +306,7 @@ describe('DatabaseCaches', () => {
}

{
const cache = await CacheForStudio.create(context)
const cache = await loadStudioPlayoutModel(context)

// Insert a document:
cache.deferAfterSave(() => {
Expand Down
Loading

0 comments on commit 2b032f3

Please sign in to comment.