forked from donovanhide/eventsource
-
Notifications
You must be signed in to change notification settings - Fork 8
/
stream_options.go
261 lines (224 loc) · 9.21 KB
/
stream_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
package eventsource
import (
"net/http"
"net/url"
"time"
)
type streamOptions struct {
initialRetry time.Duration
httpClient *http.Client
lastEventID string
logger Logger
backoffMaxDelay time.Duration
jitterRatio float64
readTimeout time.Duration
retryResetInterval time.Duration
initialRetryTimeout time.Duration
errorHandler StreamErrorHandler
queryParamsFunc *func(existing url.Values) url.Values
}
// StreamOption is a common interface for optional configuration parameters that can be
// used in creating a stream.
type StreamOption interface {
apply(s *streamOptions) error
}
type dynamicQueryParamsOption struct {
queryParamsFunc func(existing url.Values) url.Values
}
func (o dynamicQueryParamsOption) apply(s *streamOptions) error {
s.queryParamsFunc = &o.queryParamsFunc
return nil
}
// StreamOptionDynamicQueryParams returns an option that sets a function to
// generate query parameters each time the stream needs to make a fresh
// connection.
func StreamOptionDynamicQueryParams(f func(existing url.Values) url.Values) StreamOption {
return dynamicQueryParamsOption{queryParamsFunc: f}
}
type readTimeoutOption struct {
timeout time.Duration
}
func (o readTimeoutOption) apply(s *streamOptions) error {
s.readTimeout = o.timeout
return nil
}
// StreamOptionReadTimeout returns an option that sets the read timeout interval for a
// stream when the stream is created. If the stream does not receive new data within this
// length of time, it will restart the connection.
//
// By default, there is no read timeout.
func StreamOptionReadTimeout(timeout time.Duration) StreamOption {
return readTimeoutOption{timeout: timeout}
}
type initialRetryOption struct {
retry time.Duration
}
func (o initialRetryOption) apply(s *streamOptions) error {
s.initialRetry = o.retry
return nil
}
// StreamOptionInitialRetry returns an option that sets the initial retry delay for a
// stream when the stream is created.
//
// This delay will be used the first time the stream has to be restarted; the interval will
// increase exponentially on subsequent reconnections. Each time, there will also be a
// pseudo-random jitter so that the actual value may be up to 50% less. So, for instance,
// if you set the initial delay to 1 second, the first reconnection will use a delay between
// 0.5s and 1s inclusive, and subsequent reconnections will be 1s-2s, 2s-4s, etc.
//
// The default value is DefaultInitialRetry. In a future version, this value may change, so
// if you need a specific value it is best to set it explicitly.
func StreamOptionInitialRetry(retry time.Duration) StreamOption {
return initialRetryOption{retry: retry}
}
type useBackoffOption struct {
maxDelay time.Duration
}
func (o useBackoffOption) apply(s *streamOptions) error {
s.backoffMaxDelay = o.maxDelay
return nil
}
// StreamOptionUseBackoff returns an option that determines whether to use an exponential
// backoff for reconnection delays.
//
// If the maxDelay parameter is greater than zero, backoff is enabled. The retry delay interval
// will be doubled (not counting jitter - see StreamOptionUseJitter) for consecutive stream
// reconnections, but will never be greater than maxDelay.
//
// For consistency with earlier versions, this is currently zero (disabled) by default. In
// a future version this may change, so if you do not want backoff behavior you should explicitly
// set it to zero. It is recommended to use both backoff and jitter, to avoid "thundering herd"
// behavior in the case of a server outage.
func StreamOptionUseBackoff(maxDelay time.Duration) StreamOption {
return useBackoffOption{maxDelay}
}
type canRetryFirstConnectionOption struct {
initialRetryTimeout time.Duration
}
func (o canRetryFirstConnectionOption) apply(s *streamOptions) error {
s.initialRetryTimeout = o.initialRetryTimeout
return nil
}
// StreamOptionCanRetryFirstConnection returns an option that determines whether to apply
// retry behavior to the first connection attempt for the stream.
//
// If the timeout is nonzero, an initial connection failure when subscribing will not cause an
// error result, but will trigger the same retry logic as if an existing connection had failed.
// The stream constructor will not return until a connection has been made, or until the
// specified timeout expires, if the timeout is positive; if the timeout is negative, it
// will continue retrying indefinitely.
//
// The default value is zero: an initial connection failure will not be retried.
func StreamOptionCanRetryFirstConnection(initialRetryTimeout time.Duration) StreamOption {
return canRetryFirstConnectionOption{initialRetryTimeout}
}
type useJitterOption struct {
jitterRatio float64
}
func (o useJitterOption) apply(s *streamOptions) error {
s.jitterRatio = o.jitterRatio
return nil
}
// StreamOptionUseJitter returns an option that determines whether to use a randomized
// jitter for reconnection delays.
//
// If jitterRatio is greater than zero, it represents a proportion up to 1.0 (100%) that will
// be deducted from the retry delay interval would otherwise be used: for instance, 0.5 means
// that the delay will be randomly decreased by up to 50%. A value greater than 1.0 is treated
// as equal to 1.0.
//
// For consistency with earlier versions, this is currently disabled (zero) by default. In
// a future version this may change, so if you do not want jitter you should explicitly set it
// to zero. It is recommended to use both backoff and jitter, to avoid "thundering herd"
// behavior in the case of a server outage.
func StreamOptionUseJitter(jitterRatio float64) StreamOption {
return useJitterOption{jitterRatio}
}
type retryResetIntervalOption struct {
retryResetInterval time.Duration
}
func (o retryResetIntervalOption) apply(s *streamOptions) error {
s.retryResetInterval = o.retryResetInterval
return nil
}
// StreamOptionRetryResetInterval returns an option that sets the minimum amount of time that a
// connection must stay open before the Stream resets its backoff delay. This is only relevant if
// backoff is enabled (see StreamOptionUseBackoff).
//
// If a connection fails before the threshold has elapsed, the delay before reconnecting will be
// greater than the last delay; if it fails after the threshold, the delay will start over at the
// the initial minimum value. This prevents long delays from occurring on connections that are only
// rarely restarted.
//
// The default value is DefaultRetryResetInterval.
func StreamOptionRetryResetInterval(retryResetInterval time.Duration) StreamOption {
return retryResetIntervalOption{retryResetInterval: retryResetInterval}
}
type lastEventIDOption struct {
lastEventID string
}
func (o lastEventIDOption) apply(s *streamOptions) error {
s.lastEventID = o.lastEventID
return nil
}
// StreamOptionLastEventID returns an option that sets the initial last event ID for a
// stream when the stream is created. If specified, this value will be sent to the server
// in case it can replay missed events.
func StreamOptionLastEventID(lastEventID string) StreamOption {
return lastEventIDOption{lastEventID: lastEventID}
}
type httpClientOption struct {
client *http.Client
}
func (o httpClientOption) apply(s *streamOptions) error {
if o.client != nil {
s.httpClient = o.client
}
return nil
}
// StreamOptionHTTPClient returns an option that overrides the default HTTP client used by
// a stream when the stream is created.
func StreamOptionHTTPClient(client *http.Client) StreamOption {
return httpClientOption{client: client}
}
type loggerOption struct {
logger Logger
}
func (o loggerOption) apply(s *streamOptions) error {
s.logger = o.logger
return nil
}
// StreamOptionLogger returns an option that sets the logger for a stream when the stream
// is created (to change it later, you can use SetLogger). By default, there is no logger.
func StreamOptionLogger(logger Logger) StreamOption {
return loggerOption{logger: logger}
}
type streamErrorHandlerOption struct {
handler StreamErrorHandler
}
func (o streamErrorHandlerOption) apply(s *streamOptions) error {
s.errorHandler = o.handler
return nil
}
// StreamOptionErrorHandler returns an option that causes a Stream to call the specified function
// for stream errors.
//
// If non-nil, this function will be called whenever Stream encounters either a network error or an
// HTTP error response status. The returned value determines whether Stream should retry as usual,
// or immediately stop as if Close had been called.
//
// When used, this mechanism replaces the Errors channel; that channel will be pre-closed and Stream
// will not push any errors to it, so the caller does not need to consume the channel.
//
// Note that using a handler is the only way to have control over how Stream handles errors during
// the initial connection attempt, since there would be no way for the caller to consume the Errors
// channel before the Subscribe method has returned.
func StreamOptionErrorHandler(handler StreamErrorHandler) StreamOption {
return streamErrorHandlerOption{handler}
}
const (
// DefaultInitialRetry is the default value for StreamOptionalInitialRetry.
DefaultInitialRetry = time.Second * 3
// DefaultRetryResetInterval is the default value for StreamOptionRetryResetInterval.
DefaultRetryResetInterval = time.Second * 60
)