forked from xmidt-org/kratos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.go
115 lines (100 loc) · 3.11 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package kratos
import (
"sync"
"github.com/go-kit/kit/log"
"github.com/goph/emperror"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/wrp-go/v3"
)
// Client is what function calls we expose to the user of kratos
type Client interface {
Hostname() string
HandlerRegistry() HandlerRegistry
Send(message *wrp.Message)
Close() error
}
// sendWRPFunc is the function for sending a message downstream.
type sendWRPFunc func(*wrp.Message)
type client struct {
deviceID string
userAgent string
deviceProtocols string
hostname string
registry HandlerRegistry
handlePingMiss HandlePingMiss
encoderSender encoderSender
decoderSender decoderSender
connection websocketConnection
headerInfo *clientHeader
logger log.Logger
done chan struct{}
wg sync.WaitGroup
pingConfig PingConfig
once sync.Once
}
// used to track everything that we want to know about the client headers
type clientHeader struct {
deviceName string
firmwareName string
modelName string
manufacturer string
}
// websocketConnection maintains the websocket connection upstream (to XMiDT).
type websocketConnection interface {
WriteMessage(messageType int, data []byte) error
ReadMessage() (messageType int, p []byte, err error)
Close() error
}
// Hostname provides the client's hostname.
func (c *client) Hostname() string {
return c.hostname
}
// HandlerRegistry returns the HandlerRegistry that the client maintains.
func (c *client) HandlerRegistry() HandlerRegistry {
return c.registry
}
// Send is used to open a channel for writing to XMiDT
func (c *client) Send(message *wrp.Message) {
c.encoderSender.EncodeAndSend(message)
}
// Close closes connections downstream and the socket upstream.
func (c *client) Close() error {
var connectionErr error
c.once.Do(func() {
logging.Info(c.logger).Log(logging.MessageKey(), "Closing client...")
close(c.done)
c.wg.Wait()
c.decoderSender.Close()
c.encoderSender.Close()
connectionErr = c.connection.Close()
c.connection = nil
// TODO: if this fails, can we really do anything. Is there potential for leaks?
// if err != nil {
// return emperror.Wrap(err, "Failed to close connection")
// }
logging.Info(c.logger).Log(logging.MessageKey(), "Client Closed")
})
return connectionErr
}
// going to be used to access the HandleMessage() function
func (c *client) read() {
defer c.wg.Done()
logging.Info(c.logger).Log(logging.MessageKey(), "Watching socket for messages.")
for {
select {
case <-c.done:
logging.Info(c.logger).Log(logging.MessageKey(), "Stopped reading from socket.")
return
default:
logging.Debug(c.logger).Log(logging.MessageKey(), "Reading message...")
_, serverMessage, err := c.connection.ReadMessage()
if err != nil {
logging.Error(c.logger, emperror.Context(err)...).
Log(logging.MessageKey(), "Failed to read message. Exiting out of read loop.", logging.ErrorKey(), err.Error())
return
}
c.decoderSender.DecodeAndSend(serverMessage)
logging.Debug(c.logger).Log(logging.MessageKey(), "Message sent to be decoded")
}
}
}