From 01c2c181ab8ca09e8a860d82bae3295e2b52717c Mon Sep 17 00:00:00 2001 From: Damian Galli Date: Sat, 13 May 2023 17:00:02 +0200 Subject: [PATCH] Automaticaly reconnect mqtt Connection --- main.go | 35 +++++++++++------------------------ mqtt-to-device.go | 44 ++++++++++++++++++++++++++++++-------------- shared.go | 6 ++++++ 3 files changed, 47 insertions(+), 38 deletions(-) create mode 100644 shared.go diff --git a/main.go b/main.go index 71a23d3..4ed2a0b 100644 --- a/main.go +++ b/main.go @@ -1,12 +1,12 @@ package main import ( - "bufio" - "flag" - mqtt "github.com/eclipse/paho.mqtt.golang" - filename "github.com/keepeye/logrus-filename" - log "github.com/sirupsen/logrus" - "go.bug.st/serial" + "bufio" + "flag" + + filename "github.com/keepeye/logrus-filename" + log "github.com/sirupsen/logrus" + "go.bug.st/serial" ) type SerialMessage struct { @@ -50,12 +50,11 @@ func main() { log.SetLevel(log.TraceLevel) } - serialPort := openSerialPort(*serialPortAddress) - mqttClient := startMQTT(*brokerAddress) - - go mqttToDevice(serialPort, mqttClient, *inputTopic) - go deviceToMqtt(bufio.NewReader(serialPort), mqttClient, *outputTopic) - select {} + serialPort := openSerialPort(*serialPortAddress) + go mqttToDevice(serialPort, *brokerAddress, *inputTopic) + mqttClient := <-MqttChannel + go deviceToMqtt(bufio.NewReader(serialPort), mqttClient, *outputTopic) + select {} } func openSerialPort(serialPort string) serial.Port { @@ -71,15 +70,3 @@ func openSerialPort(serialPort string) serial.Port { return port } - -func startMQTT(address string) mqtt.Client { - client := mqtt.NewClient(mqtt.NewClientOptions().AddBroker(address)) - - if token := client.Connect(); token.Wait() && token.Error() != nil { - log.Fatal(token.Error()) - } - - log.WithField("broker", address).Info("connected to mqtt broker") - - return client -} diff --git a/mqtt-to-device.go b/mqtt-to-device.go index 708a8d8..1817085 100644 --- a/mqtt-to-device.go +++ b/mqtt-to-device.go @@ -10,27 +10,43 @@ import ( "time" ) -func mqttToDevice(serialPort serial.Port, client mqtt.Client, topic string) { - log.WithField("topic", topic).Info("listening for commands to send to device") - - client.Subscribe(topic, 0, func(client mqtt.Client, msg mqtt.Message) { - jsonString := string(msg.Payload()) - serialMessage := jsonToSerialMessage(jsonString) - - log.WithFields(log.Fields{ - "json": jsonString, - "topic": topic, - }).Trace("message received from mqtt") +func mqttToDevice(serialPort serial.Port, address string, topic string) { + + clientOptions := mqtt.NewClientOptions().AddBroker(address) + clientOptions.SetPingTimeout(30 * time.Second) + clientOptions.SetKeepAlive(30 * time.Second) + clientOptions.SetAutoReconnect(true) + clientOptions.SetMaxReconnectInterval(10 * time.Second) + clientOptions.SetOnConnectHandler(func(client mqtt.Client) { + client.Subscribe(topic, 0, func(client mqtt.Client, msg mqtt.Message) { + jsonString := string(msg.Payload()) + serialMessage := jsonToSerialMessage(jsonString) + + log.WithFields(log.Fields{ + "json": jsonString, + "topic": topic, + }).Trace("message received from mqtt") if serialMessage.Command == "" || serialMessage.Value == "" { log.WithField("json", jsonString).Debug("tried to send unsupported command to device") return } - // And finally send the command to the device - var mutex sync.Mutex - sendCommand(serialPort, serialMessage, &mutex) + // And finally send the command to the device + var mutex sync.Mutex + sendCommand(serialPort, serialMessage, &mutex) + }) }) + client := mqtt.NewClient(clientOptions) + MqttChannel <- client + if token := client.Connect(); token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + + log.WithField("broker", address).Info("connected to mqtt broker") + + log.WithField("topic", topic).Info("listening for commands to send to device") + } func jsonToSerialMessage(jsonString string) SerialMessage { diff --git a/shared.go b/shared.go new file mode 100644 index 0000000..72925bc --- /dev/null +++ b/shared.go @@ -0,0 +1,6 @@ +package main + +import mqtt "github.com/eclipse/paho.mqtt.golang" + +// Declare a shared channel +var MqttChannel = make(chan mqtt.Client)