-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool.go
143 lines (120 loc) · 2.67 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package mqttx
import (
"encoding/json"
"sync"
)
// MQTTxClientPool MQTT客户端连接池
type MQTTxClientPool struct {
Clients []*MQTTxClient `json:"clients"` // MQTT客户端连接池
mux *sync.RWMutex // 读写锁
}
// GetClients 获取所有客户端连接
func (p *MQTTxClientPool) GetClients() []*MQTTxClient {
if p != nil {
p.mux.RLock()
defer p.mux.RUnlock()
return p.Clients
}
return nil
}
// Len 返回连接池的长度
func (p *MQTTxClientPool) Len() int {
if p != nil {
p.mux.RLock()
defer p.mux.RUnlock()
return len(p.Clients)
}
return 0
}
// Add 添加一个客户端连接
func (p *MQTTxClientPool) Add(client *MQTTxClient) {
if p != nil && client != nil {
p.mux.Lock()
defer p.mux.Unlock()
p.Clients = append(p.Clients, client)
}
}
// Remove 移除一个客户端连接
func (p *MQTTxClientPool) Remove(server string) {
if p != nil && server != "" {
p.mux.Lock()
defer p.mux.Unlock()
for i, c := range p.Clients {
if c.Server() == FormatServerAddr(server) {
p.Clients = append(p.Clients[:i], p.Clients[i+1:]...)
break
}
}
}
}
// Get 获取一个客户端连接
func (p *MQTTxClientPool) Get(server string) *MQTTxClient {
if p != nil && server != "" {
p.mux.RLock()
defer p.mux.RUnlock()
for _, c := range p.Clients {
if c.Server() == FormatServerAddr(server) {
return c
}
}
}
return nil
}
// GetMinConnectionCountClient 获取所有客户端里面连接数最少的一个
func (p *MQTTxClientPool) GetMinConnectionCountClient() *MQTTxClient {
if p != nil {
p.mux.RLock()
defer p.mux.RUnlock()
if len(p.Clients) == 0 {
return nil
}
var min *MQTTxClient
for _, c := range p.Clients {
if min == nil {
if c.Client.IsConnected() {
min = c
}
continue
}
if !c.Client.IsConnected() {
continue
}
if c.GetServerConnectionCount() < min.GetServerConnectionCount() {
min = c
}
}
if min != nil {
if min.GetServerConnectionCount() == MaxInt {
return nil
}
}
return min
}
return nil
}
// Iterate 遍历所有客户端连接, 主要用于定时主动检查连接状态,如有必要进行重连操作
func (p *MQTTxClientPool) Iterate(f func(c *MQTTxClient)) {
if p != nil {
p.mux.RLock()
defer p.mux.RUnlock()
for _, c := range p.Clients {
f(c)
}
}
}
// String 序列化成字符串
func (p MQTTxClientPool) String() string {
body, err := json.Marshal(p)
if err != nil {
return ""
}
return string(body)
}
// NewMQTTxClientPool 创建一个MQTT客户端连接池
func NewMQTTxClientPool() *MQTTxClientPool {
clientPool := &MQTTxClientPool{
Clients: make([]*MQTTxClient, 0),
mux: new(sync.RWMutex),
}
return clientPool
}