This repository has been archived by the owner on Jul 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
handler.js
129 lines (114 loc) · 3.36 KB
/
handler.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
const uuid = require('uuid')
const { DependencyNotCompleteError } = require('./medium')
/*
* Creates a flow compute handler wrapping the given function.
*
* A compute handler will be given the input and the task, and the result will be stored as output
* and the task's dependents will be started.
*/
function compute (fn) {
return (input, task, medium) => {
return fn(input, task, medium)
.then(output => medium.startDependents(task.id, output))
}
}
/*
* Creates a flow graph handler, wrapping the given function.
*
* A graph handler generates a subgraph to compute task output. A graph should be terminated by a
* sink task.
*
* Graph handlers ignore the result of the function, as output will be computed for the tasks in the
* graph.
*/
function graph (fn) {
return (input, task, medium) => {
const graph = new TaskGraph()
return fn(graph)(input, task, medium)
.then(() => {
if (task) {
if (graph.tasks.length === 0) {
// If no tasks were defined, resort to `compute` behaviour (with no output)
return medium.startDependents(task.id, task.output)
}
if (!graph.end.dependencies.length === 1) {
throw new Error('A graph task must end with exactly one dependency')
}
// Override the sink input to set the source task ID to the graph task's ID
const sink = graph.tasks.find(task => task.id === graph.end.dependencies[0])
sink.input = {
sourceTaskId: task.id,
input: sink.input
}
}
return medium.startGraph(graph)
})
}
}
/*
* Creates a flow sink handler, wrapping the given function.
*
* A sink handler ensures all its dependencies are complete before invoking, and passes the
* dependency output as input to the task. It also handles saving back to a source task if
* specified.
*/
function sink (fn) {
return (input, task, medium) => {
return medium.loadDependencyOutputs(task.dependencies)
.then(_sink)
.catch(error => {
if (error instanceof DependencyNotCompleteError) {
// Fail silently. The sink will be triggered again when the dependency is completed.
return
}
throw error
})
function _sink (dependencies) {
let sourceTaskId = null
if (input && input.sourceTaskId) {
({ sourceTaskId, input } = input)
}
return fn(dependencies)(input, task, medium)
.then(output => medium.startDependents(task.id, output))
.then(output => sourceTaskId ? medium.startDependents(sourceTaskId, output) : output)
}
}
}
/*
* Provides an interface for defining a graph of tasks.
*/
class TaskGraph {
constructor (sourceTask) {
this.tasks = []
this.start = { dependents: [] }
this.end = { dependencies: [] }
}
task (task) {
task = Object.assign({
id: uuid.v4(),
name: null,
input: null,
output: null,
dependencies: [],
dependents: [],
status: 'pending',
statusUpdatedAt: new Date()
}, task)
this.tasks.push(task)
return task
}
pipe (source, destination) {
if (destination.id) {
source.dependents.push({ id: destination.id, name: destination.name })
}
if (source.id) {
destination.dependencies.push(source.id)
}
}
}
module.exports = {
compute,
graph,
sink,
TaskGraph
}