forked from donovanhide/eventsource
-
Notifications
You must be signed in to change notification settings - Fork 8
/
stream_reconnect_test.go
169 lines (141 loc) · 5.25 KB
/
stream_reconnect_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package eventsource
import (
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/launchdarkly/go-test-helpers/v2/httphelpers"
)
func TestStreamReconnectsIfConnectionIsBroken(t *testing.T) {
streamHandler, streamControl := httphelpers.SSEHandler(nil)
defer streamControl.Close()
httpServer := httptest.NewServer(streamHandler)
defer httpServer.Close()
stream := mustSubscribe(t, httpServer.URL, StreamOptionInitialRetry(time.Millisecond))
defer stream.Close()
event := httphelpers.SSEEvent{ID: "123"}
streamControl.Enqueue(event)
select {
case <-stream.Events:
case <-time.After(timeToWaitForEvent):
t.Error("Timed out waiting for event")
return
}
streamControl.EndAll()
// Expect at least one error
select {
case <-stream.Errors:
case <-time.After(timeToWaitForEvent):
t.Error("Timed out waiting for event")
return
}
streamControl.Enqueue(event)
// Consume errors until we've got another event
for {
select {
case <-stream.Errors:
case <-time.After(2 * time.Second):
t.Error("Timed out waiting for event")
return
case receivedEvent := <-stream.Events:
assert.Equal(t, &publication{id: "123", lastEventID: "123"}, receivedEvent)
return
}
}
}
func TestStreamCanUseBackoff(t *testing.T) {
streamHandler, streamControl := httphelpers.SSEHandler(nil)
defer streamControl.Close()
httpServer := httptest.NewServer(streamHandler)
defer httpServer.Close()
baseDelay := time.Millisecond
stream := mustSubscribe(t, httpServer.URL,
StreamOptionInitialRetry(baseDelay),
StreamOptionUseBackoff(time.Minute))
defer stream.Close()
retry := stream.getRetryDelayStrategy()
assert.False(t, retry.hasJitter())
d0 := retry.NextRetryDelay(time.Now())
d1 := retry.NextRetryDelay(time.Now())
d2 := retry.NextRetryDelay(time.Now())
assert.Equal(t, baseDelay, d0)
assert.Equal(t, baseDelay*2, d1)
assert.Equal(t, baseDelay*4, d2)
}
func TestStreamCanUseJitter(t *testing.T) {
streamHandler, streamControl := httphelpers.SSEHandler(nil)
defer streamControl.Close()
httpServer := httptest.NewServer(streamHandler)
defer httpServer.Close()
baseDelay := time.Millisecond
stream := mustSubscribe(t, httpServer.URL,
StreamOptionInitialRetry(baseDelay),
StreamOptionUseBackoff(time.Minute),
StreamOptionUseJitter(0.5))
defer stream.Close()
retry := stream.getRetryDelayStrategy()
assert.True(t, retry.hasJitter())
d0 := retry.NextRetryDelay(time.Now())
d1 := retry.NextRetryDelay(time.Now())
assert.True(t, d0 >= baseDelay/2)
assert.True(t, d0 <= baseDelay)
assert.True(t, d1 >= baseDelay)
assert.True(t, d1 <= baseDelay*2)
}
func TestStreamCanSetMaximumDelayWithBackoff(t *testing.T) {
streamHandler, streamControl := httphelpers.SSEHandler(nil)
defer streamControl.Close()
httpServer := httptest.NewServer(streamHandler)
defer httpServer.Close()
baseDelay := time.Millisecond
max := baseDelay * 3
stream := mustSubscribe(t, httpServer.URL,
StreamOptionInitialRetry(baseDelay),
StreamOptionUseBackoff(max))
defer stream.Close()
retry := stream.getRetryDelayStrategy()
assert.False(t, retry.hasJitter())
d0 := retry.NextRetryDelay(time.Now())
d1 := retry.NextRetryDelay(time.Now())
d2 := retry.NextRetryDelay(time.Now())
assert.Equal(t, baseDelay, d0)
assert.Equal(t, baseDelay*2, d1)
assert.Equal(t, max, d2)
}
func TestStreamBackoffCanUseResetInterval(t *testing.T) {
// In this test, streamHandler1 sends an event, then breaks the connection too soon for the delay to be
// reset. We ask the retryDelayStrategy to compute the next delay; it should be higher than the initial
// value. Then streamHandler2 sends an event, and we verify that the next delay that *would* come from the
// retryDelayStrategy if the reset interval elapsed would be the initial delay, not a higher value.
streamHandler1, streamControl1 := httphelpers.SSEHandler(nil)
defer streamControl1.Close()
streamHandler2, streamControl2 := httphelpers.SSEHandler(nil)
defer streamControl2.Close()
httpServer := httptest.NewServer(httphelpers.SequentialHandler(streamHandler1, streamHandler2))
defer httpServer.Close()
baseDelay := time.Millisecond
resetInterval := time.Millisecond * 200
stream := mustSubscribe(t, httpServer.URL,
StreamOptionInitialRetry(baseDelay),
StreamOptionUseBackoff(time.Hour),
StreamOptionRetryResetInterval(resetInterval))
defer stream.Close()
retry := stream.getRetryDelayStrategy()
// The first stream connection sends an event, so the stream state becomes "good".
event := httphelpers.SSEEvent{ID: "123"}
streamControl1.Enqueue(event)
// We ask the retryDelayStrategy to compute the next two delay values; they should show an increase.
d0 := retry.NextRetryDelay(time.Now())
d1 := retry.NextRetryDelay(time.Now())
assert.Equal(t, baseDelay, d0)
assert.Equal(t, baseDelay*2, d1)
// The first connection is broken; the state becomes "bad".
streamControl1.EndAll()
// After it reconnects, the second connection receives an event and the state becomes "good" again.
streamControl2.Enqueue(event)
<-stream.Events
// Now, ask the retryDelayStrategy what the next delay value would be if the next attempt happened
// 200 milliseconds from now (assuming the stream remained good). It should go back to baseDelay.
d2 := retry.NextRetryDelay(time.Now().Add(resetInterval))
assert.Equal(t, baseDelay, d2)
}