Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/http makeready can reset state #142

Merged
merged 7 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@
"fast-clone": "^1.5.13",
"hyperdeck-connection": "^0.4.3",
"osc": "^2.4.0",
"p-queue": "^6.4.0",
"p-timeout": "^3.2.0",
"request": "^2.88.0",
"sprintf-js": "^1.1.2",
"superfly-timeline": "^7.3.1",
Expand Down
71 changes: 39 additions & 32 deletions src/conductor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import { QuantelDevice, DeviceOptionsQuantelInternal } from './devices/quantel'
import { SisyfosMessageDevice, DeviceOptionsSisyfosInternal } from './devices/sisyfos'
import { SingularLiveDevice, DeviceOptionsSingularLiveInternal } from './devices/singularLive'
import { VizMSEDevice, DeviceOptionsVizMSEInternal } from './devices/vizMSE'
import PQueue from 'p-queue'
import PTimeout from 'p-timeout'
export { DeviceContainer }
export { CommandWithContext }

Expand Down Expand Up @@ -131,6 +133,10 @@ export class Conductor extends EventEmitter {
private _triggerSendStartStopCallbacksTimeout: NodeJS.Timer | null = null
private _sentCallbacks: TimelineCallbacks = {}

private _actionQueue: PQueue = new PQueue({
concurrency: 1
})

private _statMeasureStart: number = 0
private _statMeasureReason: string = ''
private _statReports: StatReport[] = []
Expand Down Expand Up @@ -220,12 +226,7 @@ export class Conductor extends EventEmitter {
// re-resolve timeline
this._mapping = mapping

let ps: Promise<any>[] = []
_.each(this.devices, (d: DeviceContainer) => {
// @ts-ignore
ps.push(d.device.setMapping(mapping))
})
await Promise.all(ps)
await this._mapAllDevices(d => d.device.setMapping(mapping))

if (this._timeline) {
this._resolveTimeline()
Expand Down Expand Up @@ -457,6 +458,8 @@ export class Conductor extends EventEmitter {
// @ts-ignore
await newDevice.device.setMapping(this.mapping)

// TODO - should the device be on this.devices yet? sounds like we could instruct it to do things before it has initialised?

await newDevice.device.init(deviceOptions.options)

await newDevice.reloadProps() // because the device name might have changed after init
Expand Down Expand Up @@ -506,9 +509,7 @@ export class Conductor extends EventEmitter {

if (this._triggerSendStartStopCallbacksTimeout) clearTimeout(this._triggerSendStartStopCallbacksTimeout)

await Promise.all(_.map(_.keys(this.devices), (deviceId: string) => {
return this.removeDevice(deviceId)
}))
await this._mapAllDevices(d => this.removeDevice(d.deviceId))
}
/**
* Resets the resolve-time, so that the resolving will happen for the point-in time NOW
Expand All @@ -527,28 +528,26 @@ export class Conductor extends EventEmitter {
/**
* Send a makeReady-trigger to all devices
*/
public devicesMakeReady (okToDestroyStuff?: boolean, activeRundownId?: string): Promise<void> {
let p = Promise.resolve()
_.each(this.devices, (d: DeviceContainer) => {
p = p.then(async () => {
return d.device.makeReady(okToDestroyStuff, activeRundownId)
})
public async devicesMakeReady (okToDestroyStuff?: boolean, activeRundownId?: string): Promise<void> {
await this._actionQueue.add(async () => {
await this._mapAllDevices((d) => PTimeout(d.device.makeReady(okToDestroyStuff, activeRundownId), 10000, `makeReady for "${d.deviceId}" timed out`))

this._triggerResolveTimeline()
})
this._resolveTimeline()
return p
}
/**
* Send a standDown-trigger to all devices
*/
public devicesStandDown (okToDestroyStuff?: boolean): Promise<void> {
let p = Promise.resolve()
_.each(this.devices, (d: DeviceContainer) => {
p = p.then(async () => {
return d.device.standDown(okToDestroyStuff)
})
public async devicesStandDown (okToDestroyStuff?: boolean): Promise<void> {
await this._actionQueue.add(async () => {
await this._mapAllDevices((d) => PTimeout(d.device.standDown(okToDestroyStuff), 10000, `standDown for "${d.deviceId}" timed out`))
})
return p
}

private _mapAllDevices<T> (fcn: (d: DeviceContainer) => Promise<T>): Promise<T[]> {
return Promise.all(_.map(_.values(this.devices), d => fcn(d)))
}

/**
* This is the main resolve-loop.
*/
Expand Down Expand Up @@ -582,9 +581,12 @@ export class Conductor extends EventEmitter {
}

this._resolveTimelineRunning = true
this._resolveTimelineInner()
.catch(e => {
this.emit('error', 'Caught error in _resolveTimelineInner' + e)

this._actionQueue.add(() => {
return this._resolveTimelineInner()
.catch(e => {
this.emit('error', 'Caught error in _resolveTimelineInner' + e)
})
})
.then((nextResolveTime) => {
this._resolveTimelineRunning = false
Expand Down Expand Up @@ -642,6 +644,13 @@ export class Conductor extends EventEmitter {

// Let all devices know that a new state is about to come in.
// This is done so that they can clear future commands a bit earlier, possibly avoiding double or conflicting commands
// const pPrepareForHandleStates = this._mapAllDevices(async (device: DeviceContainer) => {
// await device.device.prepareForHandleState(resolveTime)
// }).catch(error => {
// this.emit('error', error)
// })
// TODO - the PAll way of doing this provokes https://github.com/nrkno/tv-automation-state-timeline-resolver/pull/139
// The doOnTime calls fire before this, meaning we cleanup the state for a time we have already sent commands for
const pPrepareForHandleStates: Promise<any> = Promise.all(
_.map(this.devices, async (device: DeviceContainer): Promise<any> => {
await device.device.prepareForHandleState(resolveTime)
Expand Down Expand Up @@ -711,8 +720,7 @@ export class Conductor extends EventEmitter {
}

// Push state to the right device:
let pHandleStates: Promise<any>[] = []
pHandleStates = _.map(this.devices, async (device: DeviceContainer): Promise<any> => {
await this._mapAllDevices(async (device: DeviceContainer): Promise<void> => {
// The subState contains only the parts of the state relevant to that device:
let subState: TimelineState = {
time: tlState.time,
Expand All @@ -729,14 +737,14 @@ export class Conductor extends EventEmitter {
}
return o
}

// Pass along the state to the device, it will generate its commands and execute them:
try {
await device.device.handleState(removeParent(subState))
} catch (e) {
this.emit('error', 'Error in device "' + device.deviceId + '"' + e + ' ' + e.stack)
}
})
await Promise.all(pHandleStates)

statTimeStateHandled = Date.now()

Expand Down Expand Up @@ -775,14 +783,13 @@ export class Conductor extends EventEmitter {
} else {
// there's nothing ahead in the timeline,
// Tell the devices that the future is clear:
const pClearFutures = _.map(this.devices, async (device: DeviceContainer) => {
await this._mapAllDevices(async (device: DeviceContainer) => {
try {
await device.device.clearFuture(tlState.time)
} catch (e) {
this.emit('error', 'Error in device "' + device.deviceId + '", clearFuture: ' + e + ' ' + e.stack)
}
})
await Promise.all(pClearFutures)

// resolve at this time then next time (or later):
nextResolveTime = Math.min(tlState.time)
Expand Down
23 changes: 12 additions & 11 deletions src/devices/casparCG.ts
Original file line number Diff line number Diff line change
Expand Up @@ -768,18 +768,20 @@ export class CasparCGDevice extends DeviceWithState<TimelineState> implements ID
const trackedState = this._ccgState.getState()

const channel = currentCasparState.channels[resCommand.channel]

if (!trackedState.channels[resCommand.channel]) {
trackedState.channels[resCommand.channel] = {
channelNo: channel.channelNo,
fps: channel.fps || 0,
videoMode: channel.videoMode || null,
layers: {}
if (channel) {

if (!trackedState.channels[resCommand.channel]) {
trackedState.channels[resCommand.channel] = {
channelNo: channel.channelNo,
fps: channel.fps || 0,
videoMode: channel.videoMode || null,
layers: {}
}
}
// Copy the tracked from current state:
trackedState.channels[resCommand.channel].layers[resCommand.layer] = channel.layers[resCommand.layer]
this._ccgState.setState(trackedState)
}
// Copy the tracked from current state:
trackedState.channels[resCommand.channel].layers[resCommand.layer] = currentCasparState.channels[resCommand.channel].layers[resCommand.layer]
this._ccgState.setState(trackedState)
}
}
}).catch((error) => {
Expand All @@ -804,7 +806,6 @@ export class CasparCGDevice extends DeviceWithState<TimelineState> implements ID
} else if (cmd.payload && !_.isEmpty(cmd.payload)) {
errorString += ', payload: ' + JSON.stringify(cmd.payload)
}
console.log('commandError', errorString)
this.emit('commandError', new Error(errorString), cwc)
if (cmd.name === 'ScheduleSetCommand') {
// delete this._queue[cmd.getParam('command').token]
Expand Down
19 changes: 12 additions & 7 deletions src/devices/httpSend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type CommandContext = string
export class HTTPSendDevice extends DeviceWithState<TimelineState> implements IDevice {

private _makeReadyCommands: HTTPSendCommandContent[]
private _makeReadyDoesReset: boolean
private _doOnTime: DoOnTime

private _commandReceiver: CommandReceiver
Expand All @@ -57,6 +58,7 @@ export class HTTPSendDevice extends DeviceWithState<TimelineState> implements ID
}
init (initOptions: HTTPSendOptions): Promise<boolean> {
this._makeReadyCommands = initOptions.makeReadyCommands || []
this._makeReadyDoesReset = initOptions.makeReadyDoesReset || false

return Promise.resolve(true) // This device doesn't have any initialization procedure
}
Expand Down Expand Up @@ -100,14 +102,17 @@ export class HTTPSendDevice extends DeviceWithState<TimelineState> implements ID
}
}
async makeReady (okToDestroyStuff?: boolean): Promise<void> {
if (okToDestroyStuff && this._makeReadyCommands && this._makeReadyCommands.length > 0) {
if (okToDestroyStuff) {
const time = this.getCurrentTime()
_.each(this._makeReadyCommands, (cmd: HTTPSendCommandContent) => {
// add the new commands to the queue:
this._doOnTime.queue(time, cmd.queueId, (cmd: HTTPSendCommandContent) => {
return this._commandReceiver(time, cmd, 'makeReady', '')
}, cmd)
})

if (this._makeReadyDoesReset) {
this.clearStates()
this._doOnTime.clearQueueAfter(0)
}

for (const cmd of this._makeReadyCommands || []) {
await this._commandReceiver(time, cmd, 'makeReady', '')
}
}
}

Expand Down
20 changes: 13 additions & 7 deletions src/devices/tcpSend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type CommandContext = string
export class TCPSendDevice extends DeviceWithState<TimelineState> implements IDevice {

private _makeReadyCommands: TcpSendCommandContent[]
private _makeReadyDoesReset: boolean

private _doOnTime: DoOnTime
private _tcpClient: Socket | null = null
private _connected: boolean = false
Expand All @@ -67,6 +69,7 @@ export class TCPSendDevice extends DeviceWithState<TimelineState> implements IDe
}
init (initOptions: TCPSendOptions): Promise<boolean> {
this._makeReadyCommands = initOptions.makeReadyCommands || []
this._makeReadyDoesReset = initOptions.makeReadyDoesReset || false

this._host = initOptions.host
this._port = initOptions.port
Expand Down Expand Up @@ -108,17 +111,20 @@ export class TCPSendDevice extends DeviceWithState<TimelineState> implements IDe
}

async makeReady (okToDestroyStuff?: boolean): Promise<void> {
if (okToDestroyStuff && this._makeReadyCommands && this._makeReadyCommands.length > 0) {
if (okToDestroyStuff) {
await this._disconnectTCPClient()
await this._connectTCPClient()

const time = this.getCurrentTime()
_.each(this._makeReadyCommands, (cmd: TcpSendCommandContent) => {
// add the new commands to the queue:
this._doOnTime.queue(time, cmd.queueId, (cmd: TcpSendCommandContent) => {
return this._commandReceiver(time, cmd, 'makeReady', '')
}, cmd)
})

if (this._makeReadyDoesReset) {
this.clearStates()
this._doOnTime.clearQueueAfter(0)
}

for (const cmd of this._makeReadyCommands || []) {
await this._commandReceiver(time, cmd, 'makeReady', '')
}
}
}
async terminate () {
Expand Down
2 changes: 2 additions & 0 deletions src/types/src/httpSend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export interface HTTPSendCommandContent {
}
export interface HTTPSendOptions {
makeReadyCommands?: HTTPSendCommandContent[]
/** Whether a makeReady should be treated as a reset of the device. It should be assumed clean, with the queue discarded, and state reapplied from empty */
makeReadyDoesReset?: boolean
}

export enum TimelineContentTypeHTTP {
Expand Down
2 changes: 2 additions & 0 deletions src/types/src/tcpSend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export interface TcpSendCommandContent {

export interface TCPSendOptions {
makeReadyCommands?: TcpSendCommandContent[]
/** Whether a makeReady should be treated as a reset of the device. It should be assumed clean, with the queue discarded, and state reapplied from empty */
makeReadyDoesReset?: boolean

host: string
port: number
Expand Down
20 changes: 20 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2059,6 +2059,11 @@ esutils@^2.0.2:
resolved "https://registry.yarnpkg.com/esutils/-/esutils-2.0.3.tgz#74d2eb4de0b8da1293711910d50775b9b710ef64"
integrity sha512-kVscqXk4OCp68SZ0dkgEKVi6/8ij300KBWTJq32P/dYeWTSwK41WyTxalN1eRmA5Z9UU/LX9D7FWSmV9SAYx6g==

eventemitter3@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/eventemitter3/-/eventemitter3-4.0.0.tgz#d65176163887ee59f386d64c82610b696a4a74eb"
integrity sha512-qerSRB0p+UDEssxTtm6EDKcE7W4OaoisfIMl4CngyEhjpYglocpNg6UEqCvemdGhosAsg4sO2dXJOdyBifPGCg==

exec-sh@^0.3.2:
version "0.3.4"
resolved "https://registry.yarnpkg.com/exec-sh/-/exec-sh-0.3.4.tgz#3a018ceb526cc6f6df2bb504b2bfe8e3a4934ec5"
Expand Down Expand Up @@ -4529,6 +4534,14 @@ p-map@^3.0.0:
dependencies:
aggregate-error "^3.0.0"

p-queue@^6.4.0:
version "6.4.0"
resolved "https://registry.yarnpkg.com/p-queue/-/p-queue-6.4.0.tgz#5050b379393ea1814d6f9613a654f687d92c0466"
integrity sha512-X7ddxxiQ+bLR/CUt3/BVKrGcJDNxBr0pEEFKHHB6vTPWNUhgDv36GpIH18RmGM3YGPpBT+JWGjDDqsVGuF0ERw==
dependencies:
eventemitter3 "^4.0.0"
p-timeout "^3.1.0"

p-reduce@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/p-reduce/-/p-reduce-1.0.0.tgz#18c2b0dd936a4690a529f8231f58a0fdb6a47dfa"
Expand All @@ -4542,6 +4555,13 @@ p-retry@^4.1.0:
"@types/retry" "^0.12.0"
retry "^0.12.0"

p-timeout@^3.1.0, p-timeout@^3.2.0:
version "3.2.0"
resolved "https://registry.yarnpkg.com/p-timeout/-/p-timeout-3.2.0.tgz#c7e17abc971d2a7962ef83626b35d635acf23dfe"
integrity sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg==
dependencies:
p-finally "^1.0.0"

p-try@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/p-try/-/p-try-1.0.0.tgz#cbc79cdbaf8fd4228e13f621f2b1a237c1b207b3"
Expand Down