Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaced log with slog, added debug levels and an option to have semi… #13

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"errors"
"log/slog"
"os"
"time"

Expand All @@ -12,6 +13,7 @@ type MQTTConfig struct {
Connection *ConnectionConfig `yaml:"connection"`
Topics *TopicsConfig `yaml:"topics"`
ImageAsBase64 bool `yaml:"image_as_base64"`
SendTimeout time.Duration `yaml:"send_timeout"`
}

type HTTPConfig struct {
Expand Down Expand Up @@ -59,9 +61,11 @@ type MapConfig struct {
}

type Config struct {
Mqtt *MQTTConfig `yaml:"mqtt"`
HTTP *HTTPConfig `yaml:"http"`
Map *MapConfig `yaml:"map"`
Mqtt *MQTTConfig `yaml:"mqtt"`
HTTP *HTTPConfig `yaml:"http"`
Map *MapConfig `yaml:"map"`
LogLevel string `yaml:"logLevel"`
LogType string `yaml:"logType"`
}

func NewConfig(configFile string) (*Config, error) {
Expand Down Expand Up @@ -131,6 +135,11 @@ func validate(c *Config) (*Config, error) {
return nil, errors.New("missing mqtt.topics section")
}

// Set default timeout for messages being published if it's not set
if c.Mqtt.SendTimeout == 0 {
c.Mqtt.SendTimeout = 10 * time.Second
}

// Check MQTT topics section
if c.Mqtt.Topics.ValetudoIdentifier == "" {
return nil, errors.New("missing mqtt.topics.valetudo_identifier value")
Expand All @@ -150,6 +159,34 @@ func validate(c *Config) (*Config, error) {
return nil, errors.New("invalid map.png_compression value")
}

// Check logging
lvl := new(slog.LevelVar)
switch c.LogLevel {
case "debug":
lvl.Set(slog.LevelDebug)
case "info":
lvl.Set(slog.LevelInfo)
case "warn":
lvl.Set(slog.LevelWarn)
case "error":
lvl.Set(slog.LevelError)
default:
lvl.Set(slog.LevelInfo)
}
if c.LogType == "json" {
logger := slog.New(
slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: lvl,
}))
slog.SetDefault(logger)

} else {
logger := slog.New(
slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: lvl,
}))
slog.SetDefault(logger)
}
// Everything else should fail when used (e.g. wrong IP/port will cause
// fatal error when starting http server)

Expand Down
52 changes: 40 additions & 12 deletions pkg/mqtt/consumer.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
package mqtt

import (
"log"
"context"
"log/slog"
"sync"
"time"

mqttgo "github.com/eclipse/paho.mqtt.golang"
"github.com/erkexzcx/valetudopng/pkg/config"
"github.com/erkexzcx/valetudopng/pkg/mqtt/decoder"
)

func startConsumer(c *config.MQTTConfig, mapJSONChan chan []byte) {
func startConsumer(ctx context.Context, wg *sync.WaitGroup, panic chan bool, c *config.MQTTConfig, mapJSONChan chan []byte) {
defer wg.Done()
opts := mqttgo.NewClientOptions()

if c.Connection.TLSEnabled {
opts.AddBroker("ssl://" + c.Connection.Host + ":" + c.Connection.Port)
tlsConfig, err := newTLSConfig(c.Connection.TLSCaPath, c.Connection.TLSInsecure, c.Connection.TLSMinVersion)
if err != nil {
log.Fatalln(err)
panic <- true
}
opts.SetTLSConfig(tlsConfig)
} else {
Expand All @@ -36,34 +39,59 @@ func startConsumer(c *config.MQTTConfig, mapJSONChan chan []byte) {

// On connection
opts.OnConnect = func(client mqttgo.Client) {
log.Println("[MQTT consumer] Connected")
token := client.Subscribe(c.Topics.ValetudoPrefix+"/"+c.Topics.ValetudoIdentifier+"/MapData/map-data", 1, nil)
token.Wait()
log.Println("[MQTT consumer] Subscribed to map data topic")
slog.Info("[MQTT consumer] Connected")
}

// On disconnection
opts.OnConnectionLost = func(client mqttgo.Client, err error) {
log.Printf("[MQTT consumer] Connection lost: %v", err)
slog.Error("[MQTT consumer] Connection lost", slog.String("error", err.Error()))
}

// Initial connection
client := mqttgo.NewClient(opts)
for {

const maxTries = 24
success := false
for i := 1; i <= maxTries; i++ { // try to connect for 2 minutes, then give up
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Printf("[MQTT consumer] Failed to connect: %v. Retrying in 5 seconds...\n", token.Error())
slog.Error("[MQTT consumer] Failed to connect to MQTT, trying again in 5s", slog.Int("tries left", maxTries-i), slog.String("type", "consumer"), slog.String("error", token.Error().Error()))
time.Sleep(5 * time.Second)
} else {
success = true
break
}
}
if !success {
slog.Error("[MQTT consumer] failed to connect to MQTT")
panic <- true
return
}

topic := c.Topics.ValetudoPrefix + "/" + c.Topics.ValetudoIdentifier + "/MapData/map-data"
slog.Info("[MQTT consumer] Subscribing to topic", slog.String("topic", topic))
token := client.Subscribe(topic, 1, nil)
token.Wait()
slog.Info("[MQTT consumer] Subscribed to map data topic")

DONE:
for {
select {
case <-ctx.Done():
break DONE
case <-panic:
break DONE
}
}
client.Disconnect(0)
slog.Info("[MQTT consumer] shutdown")
}

func consumerMapDataReceiveHandler(client mqttgo.Client, msg mqttgo.Message, mapJSONChan chan []byte) {
func consumerMapDataReceiveHandler(_ mqttgo.Client, msg mqttgo.Message, mapJSONChan chan []byte) {
payload, err := decoder.Decode(msg.Payload())
if err != nil {
log.Println("[MQTT consumer] Failed to process raw data:", err)
slog.Error("[MQTT consumer] Failed to parse MQTT message", slog.String("error", err.Error()))
return
}
slog.Debug("[MQTT consumer] Received message")
mapJSONChan <- payload
}
16 changes: 13 additions & 3 deletions pkg/mqtt/mqtt.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package mqtt

import (
"context"
"log/slog"
"sync"

"github.com/erkexzcx/valetudopng/pkg/config"
)

func Start(c *config.MQTTConfig, mapJSONChan, renderedMapChan, calibrationDataChan chan []byte) {
go startConsumer(c, mapJSONChan)
go startProducer(c, renderedMapChan, calibrationDataChan)
func Start(ctx context.Context, pwg *sync.WaitGroup, panic chan bool, c *config.MQTTConfig, mapJSONChan, renderedMapChan, calibrationDataChan chan []byte) {
defer pwg.Done()

var wg sync.WaitGroup
wg.Add(2)
go startConsumer(ctx, &wg, panic, c, mapJSONChan)
go startProducer(ctx, &wg, panic, c, renderedMapChan, calibrationDataChan)
wg.Wait()
slog.Info("MQTT shutting down")
}
Loading