This repository has been archived by the owner on Jul 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
medium.js
115 lines (100 loc) · 2.96 KB
/
medium.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class Medium {
constructor (executor, storage) {
this._handlers = {}
// Delegate executor methods
for (let m of [ 'runTask', 'startTask' ]) {
this[m] = executor[m].bind(executor)
}
// Delegate storage methods
for (let m of [ 'insertTasks', 'loadTask', 'loadDependencies', 'completeTask' ]) {
this[m] = storage[m].bind(storage)
}
}
registerHandler (taskName, handler) {
this._handlers[taskName] = taskId => {
console.log('run', taskId)
const task = taskId == null ? Promise.resolve(null) : this.loadPendingTask(taskId)
return task
.then(task => handler(task && task.input, task, this))
.then(() => console.log('ok', taskId))
.catch(error => {
if (error instanceof TaskAlreadyCompleteError) {
console.error('warn', taskId, error)
} else {
console.error('error', taskId, error)
return Promise.reject(error)
}
})
}
}
createHandler () {
return (...args) => this.runTask((taskId, taskName) => {
return this._handlers[taskName || 'default'](taskId)
}, ...args)
}
loadPendingTask (taskId) {
return this.loadTask(taskId)
.then(task => {
if (!task) {
throw new TaskNotFoundError(taskId)
}
if (task.status === 'complete') {
throw new TaskAlreadyCompleteError()
}
return task
})
}
startGraph (graph) {
return this.insertTasks(graph.tasks)
.then(() => this.startTasks(graph.start.dependents))
}
loadDependencyOutputs (dependencies) {
return this.loadDependencies(dependencies)
.then(dependencies => {
return dependencies.map(({ id, output, status }) => {
if (status !== 'complete') {
throw new DependencyNotCompleteError(id)
}
return output && JSON.parse(output)
})
})
}
startDependents (taskId, output) {
return this.completeTask(taskId, output)
.then(task => this.startTasks(task.dependents))
.then(() => output)
}
startTasks (tasks) {
return Promise.all(tasks.map(task => {
return this.startTask(this._handlers[task.name], task.id, task.name)
}))
}
}
class TaskNotFoundError extends CustomError {
constructor (taskId) {
super(`Task ${taskId} not found`)
}
}
class TaskAlreadyCompleteError extends CustomError {
constructor (taskId) {
super(`Task ${taskId} is already complete`)
}
}
class DependencyNotCompleteError extends CustomError {
constructor (dependencyId) {
super(`Dependency task ${dependencyId} is not complete`)
}
}
function CustomError (message) {
this.name = this.constructor.name
this.message = message
Error.captureStackTrace(this, this.constructor)
}
CustomError.prototype = Object.create(Error.prototype)
CustomError.prototype.constructor = CustomError
Object.assign(Medium, {
TaskNotFoundError,
TaskAlreadyCompleteError,
DependencyNotCompleteError
})
module.exports = Medium