forked from chebyrash/promise
-
Notifications
You must be signed in to change notification settings - Fork 0
/
promise.go
319 lines (268 loc) · 8.14 KB
/
promise.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
package promise
import (
"errors"
"sync"
)
const (
pending = iota
fulfilled
rejected
)
// A Promise is a proxy for a value not necessarily known when
// the promise is created. It allows you to associate handlers
// with an asynchronous action's eventual success value or failure reason.
// This lets asynchronous methods return values like synchronous methods:
// instead of immediately returning the final value, the asynchronous method
// returns a promise to supply the value at some point in the future.
type Promise struct {
// A Promise is in one of these states:
// Pending - 0. Initial state, neither fulfilled nor rejected.
// Fulfilled - 1. Operation completed successfully.
// Rejected - 2. Operation failed.
state int
// A function that is passed with the arguments resolve and reject.
// The executor function is executed immediately by the Promise implementation,
// passing resolve and reject functions (the executor is called
// before the Promise constructor even returns the created object).
// The resolve and reject functions, when called, resolve or reject
// the promise, respectively. The executor normally initiates some
// asynchronous work, and then, once that completes, either calls the
// resolve function to resolve the promise or else rejects it if
// an error or panic occurred.
executor func(resolve func(interface{}), reject func(error))
// Appends fulfillment to the promise,
// and returns a new promise.
then []func(data interface{}) interface{}
// Appends a rejection handler to the promise,
// and returns a new promise.
catch []func(err error) error
// Stores the result passed to resolve()
result interface{}
// Stores the error passed to reject()
err error
// Mutex protects against data race conditions.
mutex *sync.Mutex
// WaitGroup allows to block until all callbacks are executed.
wg *sync.WaitGroup
}
// New instantiates and returns a pointer to the Promise.
func New(executor func(resolve func(interface{}), reject func(error))) *Promise {
var wg = &sync.WaitGroup{}
wg.Add(1)
var promise = &Promise{
state: pending,
executor: executor,
then: make([]func(interface{}) interface{}, 0),
catch: make([]func(error) error, 0),
result: nil,
err: nil,
mutex: &sync.Mutex{},
wg: wg,
}
go func() {
defer promise.handlePanic()
promise.executor(promise.resolve, promise.reject)
}()
return promise
}
func (promise *Promise) resolve(resolution interface{}) {
promise.mutex.Lock()
if promise.state != pending {
promise.mutex.Unlock()
return
}
switch result := resolution.(type) {
case *Promise:
res, err := result.Await()
if err != nil {
promise.mutex.Unlock()
promise.reject(err)
return
}
promise.result = res
default:
promise.result = result
}
promise.wg.Done()
for range promise.catch {
promise.wg.Done()
}
for _, fn := range promise.then {
switch result := fn(promise.result).(type) {
case *Promise:
res, err := result.Await()
if err != nil {
promise.mutex.Unlock()
promise.reject(err)
return
}
promise.result = res
default:
promise.result = result
}
promise.wg.Done()
}
promise.state = fulfilled
promise.mutex.Unlock()
}
func (promise *Promise) reject(err error) {
promise.mutex.Lock()
defer promise.mutex.Unlock()
if promise.state != pending {
return
}
promise.err = err
promise.wg.Done()
for range promise.then {
promise.wg.Done()
}
for _, fn := range promise.catch {
promise.err = fn(promise.err)
promise.wg.Done()
}
promise.state = rejected
}
func (promise *Promise) handlePanic() {
var r = recover()
if r != nil {
promise.reject(errors.New(r.(string)))
}
}
// Then appends fulfillment handler to the Promise, and returns a new promise.
func (promise *Promise) Then(fulfillment func(data interface{}) interface{}) *Promise {
promise.mutex.Lock()
defer promise.mutex.Unlock()
switch promise.state {
case pending:
promise.wg.Add(1)
promise.then = append(promise.then, fulfillment)
case fulfilled:
promise.result = fulfillment(promise.result)
}
return promise
}
// Catch appends a rejection handler callback to the Promise, and returns a new promise.
func (promise *Promise) Catch(rejection func(err error) error) *Promise {
promise.mutex.Lock()
defer promise.mutex.Unlock()
switch promise.state {
case pending:
promise.wg.Add(1)
promise.catch = append(promise.catch, rejection)
case rejected:
promise.err = rejection(promise.err)
}
return promise
}
// Await is a blocking function that waits for all callbacks to be executed.
// Returns value and error.
// Call on an already resolved Promise to get its result and error
func (promise *Promise) Await() (interface{}, error) {
promise.wg.Wait()
return promise.result, promise.err
}
// All waits for all promises to be resolved, or for any to be rejected.
// If the returned promise resolves, it is resolved with an aggregating array of the values
// from the resolved promises in the same order as defined in the iterable of multiple promises.
// If it rejects, it is rejected with the reason from the first promise in the iterable that was rejected.
func All(promises ...*Promise) *Promise {
psLen := len(promises)
if psLen == 0 {
return Resolve(make([]interface{}, 0))
}
return New(func(resolve func(interface{}), reject func(error)) {
resolutionsChan := make(chan []interface{}, psLen)
errorChan := make(chan error, psLen)
for index, promise := range promises {
func(i int) {
promise.Then(func(data interface{}) interface{} {
resolutionsChan <- []interface{}{i, data}
return data
}).Catch(func(err error) error {
errorChan <- err
return err
})
}(index)
}
resolutions := make([]interface{}, psLen)
for x := 0; x < psLen; x++ {
select {
case resolution := <-resolutionsChan:
resolutions[resolution[0].(int)] = resolution[1]
case err := <-errorChan:
reject(err)
return
}
}
resolve(resolutions)
})
}
// Race waits until any of the promises is resolved or rejected.
// If the returned promise resolves, it is resolved with the value of the first promise in the iterable
// that resolved. If it rejects, it is rejected with the reason from the first promise that was rejected.
func Race(promises ...*Promise) *Promise {
psLen := len(promises)
if psLen == 0 {
return Resolve(nil)
}
return New(func(resolve func(interface{}), reject func(error)) {
resolutionsChan := make(chan interface{}, psLen)
errorChan := make(chan error, psLen)
for _, promise := range promises {
promise.Then(func(data interface{}) interface{} {
resolutionsChan <- data
return data
}).Catch(func(err error) error {
errorChan <- err
return err
})
}
select {
case resolution := <-resolutionsChan:
resolve(resolution)
case err := <-errorChan:
reject(err)
}
})
}
// AllSettled waits until all promises have settled (each may resolve, or reject).
// Returns a promise that resolves after all of the given promises have either resolved or rejected,
// with an array of objects that each describe the outcome of each promise.
func AllSettled(promises ...*Promise) *Promise {
psLen := len(promises)
if psLen == 0 {
return Resolve(make([]interface{}, 0))
}
return New(func(resolve func(interface{}), reject func(error)) {
resolutionsChan := make(chan []interface{}, psLen)
for index, promise := range promises {
func(i int) {
promise.Then(func(data interface{}) interface{} {
resolutionsChan <- []interface{}{i, data}
return data
}).Catch(func(err error) error {
resolutionsChan <- []interface{}{i, err}
return err
})
}(index)
}
resolutions := make([]interface{}, psLen)
for x := 0; x < psLen; x++ {
resolution := <-resolutionsChan
resolutions[resolution[0].(int)] = resolution[1]
}
resolve(resolutions)
})
}
// Resolve returns a Promise that has been resolved with a given value.
func Resolve(resolution interface{}) *Promise {
return New(func(resolve func(interface{}), reject func(error)) {
resolve(resolution)
})
}
// Reject returns a Promise that has been rejected with a given error.
func Reject(err error) *Promise {
return New(func(resolve func(interface{}), reject func(error)) {
reject(err)
})
}