-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathconnection.go
153 lines (129 loc) · 3.75 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package slack
import (
// "github.com/gorilla/websocket"
"github.com/james-bowman/websocket"
"log"
"sync"
"time"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
// Connection type represents the duplex websocket connection to Slack
type Connection struct {
// The websocket connection.
ws *websocket.Conn
// waitgroup to wait for all go routines to terminate before attempting to reconnect.
wg sync.WaitGroup
// channel used to signal termination to socket writer go routine.
finish chan struct{}
// Buffered channel of outbound messages.
out chan []byte
// Buffered channel of inbound messages.
in chan []byte
// information about the current Slack connection and team settings.
config Config
}
// write a message with the given message type and payload.
func (c *Connection) write(mt int, payload []byte) error {
c.ws.SetWriteDeadline(time.Now().Add(writeWait))
return c.ws.WriteMessage(mt, payload)
}
// socketWriter writes queued messages to the websocket connection.
func (c *Connection) socketWriter() {
c.wg.Add(1)
ticker := time.NewTicker(pingPeriod)
defer func() {
log.Println("Closing socket writer")
ticker.Stop()
c.ws.Close()
c.wg.Done()
}()
for {
select {
case message, ok := <-c.out:
if !ok {
// channel closed so close the websocket
c.write(websocket.CloseMessage, []byte{})
log.Println("Closing socket")
return
}
if err := c.write(websocket.TextMessage, message); err != nil {
// error writing to websocket
log.Printf("Error writing to slack websocket: %s", err)
return
}
case <-ticker.C:
// if idle send a ping
if err := c.write(websocket.PingMessage, []byte{}); err != nil {
log.Printf("Error sending ping on slack websocket: %s", err)
return
}
case <-c.finish:
return
}
}
}
// socketReader reads messages from the websocket connection and queues them for processing.
func (c *Connection) socketReader() {
c.wg.Add(1)
defer func() {
log.Println("Closing socket reader")
c.ws.Close()
c.wg.Done()
}()
c.ws.SetReadDeadline(time.Now().Add(pongWait))
c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
_, message, err := c.ws.ReadMessage()
if err != nil {
log.Printf("Error reading from slack websocket: %s", err)
break
}
c.in <- message
}
}
// Write the specified event to Slack. The event is queued and then sent asynchronously.
func (c *Connection) Write(data []byte) {
c.out <- data
}
// Read the next event from the connection to Slack or block until one is available
func (c *Connection) Read() []byte {
return <-c.in
}
// start the connection. Starts receiving and sending events from/to Slack on the connection and,
// in the event the connection is lost, will attempt to automatically reconnect to Slack.
func (c *Connection) start(reconnectionHandler func() (*Config, *websocket.Conn, error)) {
go func() {
for {
c.finish = make(chan struct{})
go c.socketWriter()
c.socketReader()
close(c.finish)
c.wg.Wait()
connected := false
var ws *websocket.Conn
var config *Config
for i := 1; !connected; i = i * 2 {
log.Printf("Lost connection to Slack - reconnecting in %d seconds", i)
// wait 10 seconds before trying to reconnect
time.Sleep(time.Duration(i) * time.Second)
var err error
config, ws, err = reconnectionHandler()
if err != nil {
log.Printf("Error reconnecting: %s", err)
} else {
log.Printf("Connected to Slack")
connected = true
}
}
c.ws = ws
c.config = *config
}
}()
}