-
Notifications
You must be signed in to change notification settings - Fork 2
/
mqtt.go
133 lines (122 loc) · 3.49 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package edgex
import (
"context"
"encoding/json"
mqtt "github.com/eclipse/paho.mqtt.golang"
"runtime"
"time"
)
//
// Author: 陈哈哈 [email protected]
//
func mqttSetOptions(opts *mqtt.ClientOptions, scoped *Globals, onConnectedFunc func(mqtt.Client)) {
opts.AddBroker(scoped.MqttBroker)
opts.SetKeepAlive(scoped.MqttKeepAlive)
opts.SetPingTimeout(scoped.MqttPingTimeout)
opts.SetAutoReconnect(scoped.MqttAutoReconnect)
opts.SetConnectTimeout(scoped.MqttConnectTimeout)
opts.SetCleanSession(scoped.MqttCleanSession)
opts.SetMaxReconnectInterval(scoped.MqttReconnectInterval)
if "" != scoped.MqttUsername && "" != scoped.MqttPassword {
opts.Username = scoped.MqttUsername
opts.Password = scoped.MqttPassword
}
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
log.Error("Mqtt客户端:丢失连接[CONNECTION-LOST](" + err.Error() + ")")
})
opts.SetOnConnectHandler(func(client mqtt.Client) {
log.Debug("Mqtt客户端:已连接[CONNECTED]")
onConnectedFunc(client)
})
}
////
func createStateMessage(state VirtualNodeState) Message {
if "" == state.UnionId {
state.UnionId = MakeUnionId(state.NodeId, state.BoardId, state.MajorId, state.MinorId)
}
stateJSON, err := json.Marshal(state)
if nil != err {
log.Panic("数据序列化错误", err)
}
return NewMessageByUnionId(state.UnionId, stateJSON, 0)
}
func mqttSendNodeState(client mqtt.Client, state VirtualNodeState) {
token := client.Publish(
TopicOfStates(state.NodeId),
0,
false,
createStateMessage(state).Bytes(),
)
if token.Wait() && nil != token.Error() {
log.Error("NodeState: 发送消息出错", token.Error())
}
}
func mqttSendNodeProperties(globals *Globals, client mqtt.Client, properties MainNodeProperties) {
checkRequired(properties.NodeType, "NodeType是必须的")
if 0 == len(properties.VirtualNodes) {
log.Panic("NodeProperties: 缺少虚拟节点数据")
}
if "" == properties.HostOS {
properties.HostOS = runtime.GOOS
}
if "" == properties.HostArch {
properties.HostArch = runtime.GOARCH
}
nodeId := properties.NodeId
// 更新设备列表参数
for _, vn := range properties.VirtualNodes {
if "" == vn.UnionId {
vn.UnionId = MakeUnionId(nodeId, vn.BoardId, vn.MajorId, vn.MinorId)
}
}
propertiesJSON, err := json.Marshal(properties)
if nil != err {
log.Panic("数据序列化错误", err)
} else if globals.LogVerbose {
log.Debug("NodeProperties: " + string(propertiesJSON))
}
token := client.Publish(
TopicOfProperties(nodeId),
0,
false,
NewMessage(nodeId, nodeId, nodeId, "", propertiesJSON, 0).Bytes(),
)
if token.Wait() && nil != token.Error() {
log.Error("发送消息出错", token.Error())
}
}
func scheduleSendProperties(shutdown context.Context, inspectTask func()) {
// 在1分钟内上报Properties消息
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
tick := 1
for {
select {
case <-ticker.C:
inspectTask()
tick++
if tick >= 6 {
return
}
case <-shutdown.Done():
return
}
}
}
func mqttAwaitConnection(client mqtt.Client, maxRetry int) {
timer := time.NewTimer(time.Second)
defer timer.Stop()
for i := 1; i <= maxRetry; i++ {
<-timer.C
if token := client.Connect(); token.Wait() && token.Error() != nil {
if i == maxRetry {
log.Errorf("[%d] Mqtt客户端连接失败,最大次数:%v", i, token.Error())
} else {
log.Debugf("[%d] Mqtt客户端尝试重新连接,失败:%v", i, token.Error())
}
timer.Reset(time.Second * time.Duration(i))
} else {
break
}
}
}