-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathdelay.go
96 lines (80 loc) · 2.19 KB
/
delay.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package repeat
import (
"context"
"time"
)
// SetErrorsTimeout specifies the maximum timeout for repetition
// in case of error. This timeout is reset each time when the
// repetition operation is successfully completed.
//
// Default value is maximum time.Duration value.
func SetErrorsTimeout(t time.Duration) func(*DelayOptions) {
return func(do *DelayOptions) {
do.ErrorsTimeout = t
}
}
// SetContext allows to set a context instead of default one.
func SetContext(ctx context.Context) func(*DelayOptions) {
return func(do *DelayOptions) {
do.Context = ctx
}
}
// SetContextHintStop instructs to use HintStop(nil)
// instead of context error in case of context expiration.
func SetContextHintStop() func(*DelayOptions) {
return func(do *DelayOptions) {
do.ContextHintStop = true
}
}
// WithDelay constructs HeartbeatPredicate.
func WithDelay(options ...func(hb *DelayOptions)) Operation {
do := applyOptions(applyOptions(&DelayOptions{}, defaultOptions()), options)
shift := func() time.Time {
return time.Now().Add(do.ErrorsTimeout)
}
deadline := shift()
return func(e error) error {
// Shift the deadline in case of success.
if e == nil {
deadline = shift()
}
delayT := time.NewTimer(do.Backoff())
defer delayT.Stop()
deadlineT := time.NewTimer(deadline.Sub(time.Now()))
defer deadlineT.Stop()
select {
case <-do.Context.Done():
// Let out caller know that the op is cancelled.
if do.ContextHintStop {
return HintStop(nil)
}
return do.Context.Err()
case <-deadlineT.C:
// The reason of a deadline is the previous error. Let our
// caller to take care of it.
return Cause(e)
case <-delayT.C:
return e
}
}
}
// DelayOptions holds parameters for a heartbeat process.
type DelayOptions struct {
ErrorsTimeout time.Duration
Backoff func() time.Duration
Context context.Context
ContextHintStop bool
}
func defaultOptions() []func(hb *DelayOptions) {
return []func(do *DelayOptions){
SetContext(context.Background()),
SetErrorsTimeout(1<<63 - 1),
FixedBackoff(time.Second).Set(),
}
}
func applyOptions(do *DelayOptions, options []func(*DelayOptions)) *DelayOptions {
for _, o := range options {
o(do)
}
return do
}