Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Automaticaly reconnect mqtt Connection #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 11 additions & 24 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package main

import (
"bufio"
"flag"
mqtt "github.com/eclipse/paho.mqtt.golang"
filename "github.com/keepeye/logrus-filename"
log "github.com/sirupsen/logrus"
"go.bug.st/serial"
"bufio"
"flag"

filename "github.com/keepeye/logrus-filename"
log "github.com/sirupsen/logrus"
"go.bug.st/serial"
)

type SerialMessage struct {
Expand Down Expand Up @@ -50,12 +50,11 @@ func main() {
log.SetLevel(log.TraceLevel)
}

serialPort := openSerialPort(*serialPortAddress)
mqttClient := startMQTT(*brokerAddress)

go mqttToDevice(serialPort, mqttClient, *inputTopic)
go deviceToMqtt(bufio.NewReader(serialPort), mqttClient, *outputTopic)
select {}
serialPort := openSerialPort(*serialPortAddress)
go mqttToDevice(serialPort, *brokerAddress, *inputTopic)
mqttClient := <-MqttChannel
go deviceToMqtt(bufio.NewReader(serialPort), mqttClient, *outputTopic)
select {}
}

func openSerialPort(serialPort string) serial.Port {
Expand All @@ -71,15 +70,3 @@ func openSerialPort(serialPort string) serial.Port {

return port
}

func startMQTT(address string) mqtt.Client {
client := mqtt.NewClient(mqtt.NewClientOptions().AddBroker(address))

if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}

log.WithField("broker", address).Info("connected to mqtt broker")

return client
}
44 changes: 30 additions & 14 deletions mqtt-to-device.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,43 @@ import (
"time"
)

func mqttToDevice(serialPort serial.Port, client mqtt.Client, topic string) {
log.WithField("topic", topic).Info("listening for commands to send to device")

client.Subscribe(topic, 0, func(client mqtt.Client, msg mqtt.Message) {
jsonString := string(msg.Payload())
serialMessage := jsonToSerialMessage(jsonString)

log.WithFields(log.Fields{
"json": jsonString,
"topic": topic,
}).Trace("message received from mqtt")
func mqttToDevice(serialPort serial.Port, address string, topic string) {

clientOptions := mqtt.NewClientOptions().AddBroker(address)
clientOptions.SetPingTimeout(30 * time.Second)
clientOptions.SetKeepAlive(30 * time.Second)
clientOptions.SetAutoReconnect(true)
clientOptions.SetMaxReconnectInterval(10 * time.Second)
clientOptions.SetOnConnectHandler(func(client mqtt.Client) {
client.Subscribe(topic, 0, func(client mqtt.Client, msg mqtt.Message) {
jsonString := string(msg.Payload())
serialMessage := jsonToSerialMessage(jsonString)

log.WithFields(log.Fields{
"json": jsonString,
"topic": topic,
}).Trace("message received from mqtt")

if serialMessage.Command == "" || serialMessage.Value == "" {
log.WithField("json", jsonString).Debug("tried to send unsupported command to device")
return
}

// And finally send the command to the device
var mutex sync.Mutex
sendCommand(serialPort, serialMessage, &mutex)
// And finally send the command to the device
var mutex sync.Mutex
sendCommand(serialPort, serialMessage, &mutex)
})
})
client := mqtt.NewClient(clientOptions)
MqttChannel <- client
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}

log.WithField("broker", address).Info("connected to mqtt broker")

log.WithField("topic", topic).Info("listening for commands to send to device")

}

func jsonToSerialMessage(jsonString string) SerialMessage {
Expand Down
6 changes: 6 additions & 0 deletions shared.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package main

import mqtt "github.com/eclipse/paho.mqtt.golang"

// Declare a shared channel
var MqttChannel = make(chan mqtt.Client)