-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathclient_test.go
246 lines (215 loc) · 5.96 KB
/
client_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
package mqtt_test // import "gosrc.io/mqtt"
import (
"errors"
"log"
"net"
"net/url"
"testing"
"time"
"gosrc.io/mqtt"
)
const (
acceptTimeout = 300 * time.Millisecond
)
// TestClient_ConnectTimeout checks that connect will properly timeout and not
// block forever if server never send CONNACK.
func TestClient_ConnectTimeout(t *testing.T) {
// Setup Mock server
mock := MQTTServerMock{}
if err := mock.Start(t, func(t *testing.T, c net.Conn) { return }); err != nil {
t.Error(err)
return
}
// Test / Check result
client := mqtt.NewClient(testMQTTAddress)
client.ConnectTimeout = 100 * time.Millisecond
if err := client.Connect(nil); err != nil {
if neterr, ok := err.(net.Error); ok && !neterr.Timeout() {
t.Error("MQTT connection should timeout")
}
}
mock.Stop()
}
// TestClient_Connect checks that we can connect to MQTT server and
// get no error when we receive CONNACK.
func TestClient_Connect(t *testing.T) {
// Setup Mock server
mock := MQTTServerMock{}
if err := mock.Start(t, handlerConnackSuccess); err != nil {
t.Error(err)
return
}
// Test / Check result
client := mqtt.NewClient(testMQTTAddress)
client.ConnectTimeout = 30 * time.Second
if err := client.Connect(nil); err != nil {
t.Errorf("MQTT connection failed: %s", err)
}
mock.Stop()
}
// TestClient_Unauthorized checks that MQTT connect fails when we
// received unauthorized response.
func TestClient_Unauthorized(t *testing.T) {
// Setup Mock server
mock := MQTTServerMock{}
if err := mock.Start(t, handlerUnauthorized); err != nil {
t.Error(err)
return
}
// Test / Check result
client := mqtt.NewClient(testMQTTAddress)
client.ClientID = "testClientID"
if err := client.Connect(nil); err == nil {
t.Error("MQTT connection should have failed")
}
mock.Stop()
}
// TestClient_KeepAliveDisable checks that we can connect successfully
// without keepalive.
func TestClient_KeepAliveDisable(t *testing.T) {
// Setup Mock server
mock := MQTTServerMock{}
if err := mock.Start(t, handlerConnackSuccess); err != nil {
t.Error(err)
return
}
// Test / Check result
client := mqtt.NewClient(testMQTTAddress)
client.Keepalive = 0
if err := client.Connect(nil); err != nil {
t.Error("MQTT connection failed")
}
// TODO Check that client does not send PINGREQ to server when keep alive is 0.
// keepalive 0 should disable keep alive.
mock.Stop()
}
//=============================================================================
// Mock MQTT server for testing client
const (
// Default port is not standard MQTT port to avoid interfering
// with local running MQTT server
testMQTTAddress = "tcp://localhost:10883"
)
type testHandler func(t *testing.T, conn net.Conn)
type MQTTServerMock struct {
t *testing.T
handler testHandler
listener net.Listener
connections []net.Conn
done chan struct{}
cleanup chan struct{}
}
func (mock *MQTTServerMock) Start(t *testing.T, handler testHandler) error {
mock.t = t
mock.handler = handler
if err := mock.init(); err != nil {
return err
}
go mock.loop()
return nil
}
func (mock *MQTTServerMock) Stop() {
close(mock.done)
// Check that main MQTT mock server loop is actually terminated
select {
case <-mock.cleanup:
case <-time.After(5 * time.Second):
log.Println("timeout on MQTTServerMock cleanup")
}
// Shutdown server mock
if mock.listener != nil {
if err := mock.listener.Close(); err != nil {
log.Println("cannot close listener", err)
}
}
}
func (mock *MQTTServerMock) init() error {
mock.done = make(chan struct{})
mock.cleanup = make(chan struct{})
// Parse address string
uri, err := url.Parse(testMQTTAddress)
if err != nil {
return err
}
var l net.Listener
switch uri.Scheme {
case "tcp":
l, err = net.Listen("tcp", uri.Host)
if err != nil {
mock.t.Errorf("mqttServerMock cannot listen on tcp address: %s (%s)", uri.Host, err)
return err
}
case "tls":
return errors.New("mqttServerMock does not support TLS yet")
default:
return errors.New("mqttServerMock init error: url scheme must be tcp or tls")
}
mock.listener = l
return nil
}
func (mock *MQTTServerMock) loop() {
defer mock.loopCleanup()
listener := mock.listener
for {
if l, ok := listener.(*net.TCPListener); ok {
l.SetDeadline(time.Now().Add(acceptTimeout))
}
conn, err := listener.Accept()
if err != nil {
select {
case <-mock.done:
return
default:
if err, ok := err.(net.Error); ok && err.Timeout() {
// timeout error
return
}
mock.t.Error("mqttServerMock accept error:", err.Error())
}
return
}
mock.connections = append(mock.connections, conn)
// TODO Create and pass a context to cancel the handler if they are still around = avoid possible leak on complex handlers
go mock.handler(mock.t, conn)
}
}
func (mock *MQTTServerMock) loopCleanup() {
// Close all existing connections
for _, c := range mock.connections {
if err := c.Close(); err != nil {
log.Println("Cannot close connection", c)
}
}
close(mock.cleanup)
}
//=============================================================================
// Basic MQTT Server Mock Handlers.
// handlerConnackSuccess sends connack to client without even reading from socket.
func handlerConnackSuccess(_ *testing.T, c net.Conn) {
ack := mqtt.ConnAckPacket{}
buf := ack.Marshall()
c.Write(buf)
}
func handlerUnauthorized(t *testing.T, c net.Conn) {
var p mqtt.Marshaller
var err error
// Only wait for client response for a small amount of time
c.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
if p, err = mqtt.PacketRead(c); err != nil {
t.Error("did not receive anything from client")
}
c.SetReadDeadline(time.Time{})
switch pType := p.(type) {
case mqtt.ConnectPacket:
if pType.ClientID != "testClientID" {
t.Error("connect packet is not properly parsed")
}
ack := mqtt.ConnAckPacket{ReturnCode: mqtt.ConnRefusedBadUsernameOrPassword}
buf := ack.Marshall()
if _, err := c.Write(buf); err != nil {
log.Println(err)
}
default:
}
log.Println("Unauthorized handler done")
}