diff --git a/README.md b/README.md index 2b72310..a3e6db9 100644 --- a/README.md +++ b/README.md @@ -39,9 +39,10 @@ You can have more than one adapter configurations Provider options: `mysensors_v2` and `raw` ```yaml logger: - mode: development # logger mode: development, production - encoding: console # encoding options: console, json - level: info # log levels: debug, info, warn, error, fatal + mode: development # logger mode: development, production + encoding: console # encoding options: console, json + level: info # log levels: debug, info, warn, error, fatal + enable_stacktrace: false # enable or disable error stack trace adapters: # you can have more than one adapter - name: adapter1 # name of the adapter @@ -63,6 +64,7 @@ adapters: # you can have more than one adapter qos: 0 # qos number: 0, 1, 2 transmit_pre_delay: 0s reconnect_delay: 5s + connection_timeout: 30s # mqtt connection timeout (default 30 seconds) ``` ### Source device configuration Based on the source type the configurations will be different. @@ -136,9 +138,10 @@ In `raw` provider we can add script to support custom specification. If we leave ### Configuration file with raw provider and a script support ```yaml logger: - mode: development # logger mode: development, production - encoding: console # encoding options: console, json - level: info # log levels: debug, info, warn, error, fatal + mode: development # logger mode: development, production + encoding: console # encoding options: console, json + level: info # log levels: debug, info, warn, error, fatal + enable_stacktrace: false # enable or disable error stack trace adapters: # you can have more than one adapter - name: adapter1 # name of the adapter @@ -160,6 +163,7 @@ adapters: # you can have more than one adapter qos: 0 # qos number: 0, 1, 2 transmit_pre_delay: 0s reconnect_delay: 5s + connection_timeout: 30s # mqtt connection timeout (default 30 seconds) formatter_script: # script used to perform custom formatting to_mqtt: | // your multiline javascript diff --git a/cmd/helper/helper.go b/cmd/helper/helper.go new file mode 100644 index 0000000..b8efef9 --- /dev/null +++ b/cmd/helper/helper.go @@ -0,0 +1,37 @@ +package helper + +import ( + "context" + + "github.com/mycontroller-org/2mqtt/pkg/version" + "go.uber.org/zap" + + contextTY "github.com/mycontroller-org/2mqtt/pkg/types/context" + + cfgTY "github.com/mycontroller-org/2mqtt/pkg/types/config" + coreScheduler "github.com/mycontroller-org/server/v2/pkg/service/core_scheduler" + schedulerTY "github.com/mycontroller-org/server/v2/pkg/types/scheduler" + loggerUtils "github.com/mycontroller-org/server/v2/pkg/utils/logger" +) + +// loads logger +func loadLogger(ctx context.Context, cfg cfgTY.LoggerConfig) (context.Context, *zap.Logger) { + logger := loggerUtils.GetLogger(cfg.Mode, cfg.Level, cfg.Encoding, false, 0, cfg.EnableStacktrace) + logger.Info("welcome to the 2mqtt adapter server :)") + ver := version.Get() + logger.Info("server information", zap.Any("version", ver), zap.Any("logger", cfg)) + + // in some places still using "z.L()...", which needs global logger should be enabled + // enabling global logger. + // to fix this, do `grep -rl "zap\.L()"` and fix those manually. + zap.ReplaceGlobals(logger) + + return contextTY.LoggerWithContext(ctx, logger), logger +} + +// load core scheduler +func loadCoreScheduler(ctx context.Context) (context.Context, schedulerTY.CoreScheduler) { + coreScheduler := coreScheduler.New() + ctx = schedulerTY.WithContext(ctx, coreScheduler) + return ctx, coreScheduler +} diff --git a/cmd/helper/service.go b/cmd/helper/service.go new file mode 100644 index 0000000..8cf7285 --- /dev/null +++ b/cmd/helper/service.go @@ -0,0 +1,81 @@ +package helper + +import ( + "context" + "time" + + adapterSVC "github.com/mycontroller-org/2mqtt/pkg/service/adapter" + "github.com/mycontroller-org/2mqtt/pkg/service/scheduler" + "github.com/mycontroller-org/2mqtt/pkg/types/config" + schedulerTY "github.com/mycontroller-org/server/v2/pkg/types/scheduler" + "go.uber.org/zap" +) + +type ToMqtt struct { + ctx context.Context + config *config.Config + logger *zap.Logger + + // services + coreSchedulerSVC schedulerTY.CoreScheduler // core scheduler, used to execute all the cron jobs +} + +func (g *ToMqtt) Start(ctx context.Context, cfg *config.Config) error { + startTime := time.Now() + + g.ctx = ctx + + // load logger + ctx, logger := loadLogger(ctx, cfg.Logger) + + // get core scheduler and inject into context + ctx, coreScheduler := loadCoreScheduler(ctx) + err := coreScheduler.Start() + if err != nil { + logger.Error("error on starting core scheduler", zap.Error(err)) + return err + } + + // inject custom scheduler into context + customScheduler, err := scheduler.New(ctx) + if err != nil { + logger.Error("error on loading custom scheduler", zap.Error(err)) + return err + } + ctx = scheduler.WithContext(ctx, customScheduler) + + // add into struct + g.ctx = ctx + g.config = cfg + g.logger = logger + g.coreSchedulerSVC = coreScheduler + + // start adapter services + err = adapterSVC.Start(ctx, cfg.Adapters) + if err != nil { + logger.Error("error on starting adapter services", zap.Error(err)) + return err + } + + logger.Info("services are started", zap.String("timeTaken", time.Since(startTime).String())) + + // call shutdown hook + shutdownHook := NewShutdownHook(g.logger, g.stop) + shutdownHook.Start() + + return nil +} + +func (g *ToMqtt) stop() { + // stop services + + // stop adapter services + g.logger.Debug("closing adapter services") + adapterSVC.Close() + + g.logger.Debug("closing core scheduler") + // stop core scheduler + if err := g.coreSchedulerSVC.Close(); err != nil { + g.logger.Error("error on closing core scheduler", zap.Error(err)) + } +} diff --git a/cmd/helper/shutdown_hook.go b/cmd/helper/shutdown_hook.go new file mode 100644 index 0000000..54c7e76 --- /dev/null +++ b/cmd/helper/shutdown_hook.go @@ -0,0 +1,77 @@ +package helper + +import ( + "os" + "os/signal" + "syscall" + "time" + + "go.uber.org/zap" +) + +var ( + graceTerminationPeriod = time.Second * 30 // shutdown grace termination period +) + +type ShutdownHook struct { + logger *zap.Logger + callbackFunc func() +} + +func NewShutdownHook(logger *zap.Logger, callbackFunc func()) *ShutdownHook { + return &ShutdownHook{ + logger: logger.Named("shutdown_hook"), + callbackFunc: callbackFunc, + } +} + +func (sh *ShutdownHook) Start() { + sh.handleShutdownSignal() +} + +// handel process shutdown signal +func (sh *ShutdownHook) handleShutdownSignal() { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + // waiting for signal + sig := <-sigs + close(sigs) + + sh.logger.Info("shutdown initiated..", zap.Any("signal", sig)) + sh.triggerShutdown() +} + +func (sh *ShutdownHook) triggerShutdown() { + start := time.Now() + + // force termination block + ticker := time.NewTicker(graceTerminationPeriod) + done := make(chan bool) + go func() { + for { + select { + case <-done: + return + case <-ticker.C: + sh.logger.Warn("some services are not terminating on graceful period. Performing force termination", zap.String("gracePeriod", graceTerminationPeriod.String())) + os.Exit(-1) + } + } + }() + + // trigger callback function + if sh.callbackFunc != nil { + sh.callbackFunc() + } + + // stop force termination ticker + ticker.Stop() + done <- true + + sh.logger.Info("closing services completed", zap.String("timeTaken", time.Since(start).String())) + sh.logger.Debug("bye, see you soon :)") + + // stop web/api service + os.Exit(0) +} diff --git a/cmd/sub/generate_config.go b/cmd/sub/generate_config.go index cf1c766..fe0b43d 100644 --- a/cmd/sub/generate_config.go +++ b/cmd/sub/generate_config.go @@ -21,9 +21,10 @@ var generateConfigCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { sampleConfig := cfgTY.Config{ Logger: cfgTY.LoggerConfig{ - Mode: "development", - Encoding: "console", - Level: "info", + Mode: "development", + Encoding: "console", + Level: "info", + EnableStacktrace: false, }, Adapters: []cfgTY.AdapterConfig{ { diff --git a/cmd/sub/root.go b/cmd/sub/root.go index e5a878b..ffafc0e 100644 --- a/cmd/sub/root.go +++ b/cmd/sub/root.go @@ -1,10 +1,11 @@ package sub import ( + "context" "fmt" "os" - "github.com/mycontroller-org/2mqtt/pkg/service/start" + "github.com/mycontroller-org/2mqtt/cmd/helper" cfgTY "github.com/mycontroller-org/2mqtt/pkg/types/config" "github.com/mycontroller-org/2mqtt/pkg/version" loggerUtils "github.com/mycontroller-org/server/v2/pkg/utils/logger" @@ -50,11 +51,10 @@ var rootCmd = &cobra.Command{ logger.Fatal("failed to parse config file", zap.Error(err)) } - // load actual logger - start.InitLogger(cfg.Logger) - - // start services - start.StartServices(cfg) + // start service + ctx := context.Background() + toMqtt := helper.ToMqtt{} + toMqtt.Start(ctx, cfg) }, } diff --git a/go.mod b/go.mod index ce0e0c4..a73bf7a 100644 --- a/go.mod +++ b/go.mod @@ -1,41 +1,43 @@ module github.com/mycontroller-org/2mqtt -go 1.20 +go 1.21 + +toolchain go1.21.4 require ( - github.com/eclipse/paho.mqtt.golang v1.4.2 - github.com/mycontroller-org/server/v2 v2.0.0-20230219022220-c22a66475357 - github.com/spf13/cobra v1.6.1 - github.com/stretchr/testify v1.8.1 + github.com/eclipse/paho.mqtt.golang v1.4.3 + github.com/mycontroller-org/server/v2 v2.0.1-0.20240330143153-d3837c02560c + github.com/spf13/cobra v1.8.0 + github.com/stretchr/testify v1.9.0 github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07 - go.uber.org/zap v1.24.0 + go.uber.org/zap v1.27.0 gopkg.in/yaml.v3 v3.0.1 ) require ( - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/dlclark/regexp2 v1.8.0 // indirect - github.com/dop251/goja v0.0.0-20230216180835-5937a312edda // indirect - github.com/dop251/goja_nodejs v0.0.0-20230207183254-2229640ea097 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dlclark/regexp2 v1.11.0 // indirect + github.com/dop251/goja v0.0.0-20240220182346-e401ed450204 // indirect + github.com/dop251/goja_nodejs v0.0.0-20240221231712-27eeffc9c235 // indirect github.com/fatih/structs v1.1.0 // indirect github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect - github.com/google/go-cmp v0.5.9 // indirect - github.com/google/uuid v1.3.0 // indirect - github.com/gorilla/mux v1.8.0 // indirect - github.com/gorilla/websocket v1.5.0 // indirect + github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/mux v1.8.1 // indirect + github.com/gorilla/websocket v1.5.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/robfig/cron/v3 v3.0.2-0.20210106135023-bc59245fe10e // indirect github.com/spf13/pflag v1.0.5 // indirect - go.mongodb.org/mongo-driver v1.11.2 // indirect - go.uber.org/atomic v1.10.0 // indirect - go.uber.org/multierr v1.9.0 // indirect - golang.org/x/net v0.7.0 // indirect - golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.5.0 // indirect - golang.org/x/text v0.7.0 // indirect + go.mongodb.org/mongo-driver v1.14.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/net v0.22.0 // indirect + golang.org/x/sync v0.6.0 // indirect + golang.org/x/sys v0.18.0 // indirect + golang.org/x/text v0.14.0 // indirect ) diff --git a/go.sum b/go.sum index abb534c..ca63315 100644 --- a/go.sum +++ b/go.sum @@ -1,144 +1,125 @@ -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= -github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/chzyer/logex v1.2.0/go.mod h1:9+9sk7u7pGNWYMkh0hdiL++6OeibzJccyQU4p4MedaY= +github.com/chzyer/readline v1.5.0/go.mod h1:x22KAscuvRqlLoK9CsoYsmxoXZMMFVyOl86cAH8qUic= +github.com/chzyer/test v0.0.0-20210722231415-061457976a23/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= +github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= -github.com/dlclark/regexp2 v1.8.0 h1:rJD5HeGIT/2b5CDk63FVCwZA3qgYElfg+oQK7uH5pfE= -github.com/dlclark/regexp2 v1.8.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI= +github.com/dlclark/regexp2 v1.11.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= github.com/dop251/goja v0.0.0-20211022113120-dc8c55024d06/go.mod h1:R9ET47fwRVRPZnOGvHxxhuZcbrMCuiqOz3Rlrh4KSnk= -github.com/dop251/goja v0.0.0-20221118162653-d4bf6fde1b86/go.mod h1:yRkwfj0CBpOGre+TwBsqPV0IH0Pk73e4PXJOeNDboGs= -github.com/dop251/goja v0.0.0-20230216180835-5937a312edda h1:yWEvdMtib3RbPysHDTNf/c3gerF5r+iMcmhlAeE6hEk= -github.com/dop251/goja v0.0.0-20230216180835-5937a312edda/go.mod h1:yRkwfj0CBpOGre+TwBsqPV0IH0Pk73e4PXJOeNDboGs= +github.com/dop251/goja v0.0.0-20240220182346-e401ed450204 h1:O7I1iuzEA7SG+dK8ocOBSlYAA9jBUmCYl/Qa7ey7JAM= +github.com/dop251/goja v0.0.0-20240220182346-e401ed450204/go.mod h1:QMWlm50DNe14hD7t24KEqZuUdC9sOTy8W6XbCU1mlw4= github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y= github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d/go.mod h1:DngW8aVqWbuLRMHItjPUyqdj+HWPvnQe8V8y1nDpIbM= -github.com/dop251/goja_nodejs v0.0.0-20230207183254-2229640ea097 h1:WsLyDk8yHsVT1puf/32883ZxEb6Pgqd19AlQH9mxVK0= -github.com/dop251/goja_nodejs v0.0.0-20230207183254-2229640ea097/go.mod h1:0tlktQL7yHfYEtjcRGi/eiOkbDR5XF7gyFFvbC5//E0= -github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4= -github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= +github.com/dop251/goja_nodejs v0.0.0-20240221231712-27eeffc9c235 h1:5870ijWGCGCw7Ty4IGCquT6EfTck6f5zriYzFpPwOJ0= +github.com/dop251/goja_nodejs v0.0.0-20240221231712-27eeffc9c235/go.mod h1:bhGPmCgCCTSRfiMYWjpS46IDo9EUZXlsuUaPXSWGbv0= +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU= github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= -github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= -github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg= +github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 h1:pUa4ghanp6q4IJHwE9RwLgmVFfReJN+KbQ8ExNEUUoQ= +github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= -github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c h1:cqn374mizHuIWj+OSJCajGr/phAmuMug9qIX3l9CflE= +github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= -github.com/mycontroller-org/server/v2 v2.0.0-20230219022220-c22a66475357 h1:EtOXc7JgFmkPi7j1yFS1qviPdLdkEO2GmP0kj1muAk0= -github.com/mycontroller-org/server/v2 v2.0.0-20230219022220-c22a66475357/go.mod h1:6VQxtsfzUNn/EKU/8/GV96ye4PLcTqQgGXLRYdXQShk= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/mycontroller-org/server/v2 v2.0.1-0.20240330143153-d3837c02560c h1:O2l/cEgkQABTlpe9CFCP0zdwcRLOBTsDdHXhEt5ehRA= +github.com/mycontroller-org/server/v2 v2.0.1-0.20240330143153-d3837c02560c/go.mod h1:A28LnFZWxnh4rZvwOjb0JPvfcO1rceDFth1vsVlb3FI= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/robfig/cron/v3 v3.0.2-0.20210106135023-bc59245fe10e h1:0xChnl3lhHiXbgSJKgChye0D+DvoItkOdkGcwelDXH0= github.com/robfig/cron/v3 v3.0.2-0.20210106135023-bc59245fe10e/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA= -github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUqzrY= +github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= +github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07 h1:UyzmZLoiDWMRywV4DUYb9Fbt8uiOSooupjTq10vpvnU= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= -github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= -github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= -github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= -github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= -github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.mongodb.org/mongo-driver v1.11.2 h1:+1v2rDQUWNcGW7/7E0Jvdz51V38XXxJfhzbV17aNHCw= -go.mongodb.org/mongo-driver v1.11.2/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8= -go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= -go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= -go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= -go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= -go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= -go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80= +go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= -golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= -golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= +golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= @@ -146,6 +127,5 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/queue/message.go b/pkg/queue/message.go deleted file mode 100644 index 30be320..0000000 --- a/pkg/queue/message.go +++ /dev/null @@ -1,50 +0,0 @@ -package queue - -import ( - "sync" - - "github.com/mycontroller-org/2mqtt/pkg/types" - "go.uber.org/zap" -) - -// MessageQueue struct -type MessageQueue struct { - ID string - Limit int - Messages []*types.Message - mutex *sync.RWMutex -} - -// New creates a brand new MessageQueue -func New(ID string, limit int) *MessageQueue { - return &MessageQueue{ - ID: ID, - Messages: make([]*types.Message, 0), - Limit: limit, - mutex: &sync.RWMutex{}, - } -} - -// Add a message into the queue -func (q *MessageQueue) Add(msg *types.Message) { - q.mutex.Lock() - defer q.mutex.Unlock() - - q.Messages = append(q.Messages, msg) - if len(q.Messages) > q.Limit { - zap.L().Warn("dropping a message", zap.String("id", q.ID)) - } -} - -// Get a message from the queue, if empty returns nil -func (q *MessageQueue) Get() *types.Message { - q.mutex.Lock() - defer q.mutex.Unlock() - - if len(q.Messages) > 0 { - message := q.Messages[0] - q.Messages = q.Messages[1:] - return message - } - return nil -} diff --git a/pkg/service/adapter/service.go b/pkg/service/adapter/service.go index 7d6e157..6d79e64 100644 --- a/pkg/service/adapter/service.go +++ b/pkg/service/adapter/service.go @@ -1,72 +1,85 @@ package adapter import ( + "context" "fmt" "sync" "time" - "github.com/mycontroller-org/2mqtt/pkg/queue" scheduler "github.com/mycontroller-org/2mqtt/pkg/service/scheduler" "github.com/mycontroller-org/2mqtt/pkg/types" config "github.com/mycontroller-org/2mqtt/pkg/types/config" + contextTY "github.com/mycontroller-org/2mqtt/pkg/types/context" devicePlugin "github.com/mycontroller-org/2mqtt/plugin/device" providerPlugin "github.com/mycontroller-org/2mqtt/plugin/provider" - "github.com/mycontroller-org/server/v2/pkg/utils/concurrency" + queue "github.com/mycontroller-org/server/v2/pkg/utils/queue" "go.uber.org/zap" ) // default settings const ( - SourceQueueLimit = 1000 - MQTTQueueLimit = 1000 + SourceQueueSize = 1000 + MQTTQueueSize = 1000 DefaultReconnectDelay = "30s" MqttDeviceName = "mqtt" - - defaultMessageProcessTick = 1 * time.Second ) // Service component of the provider type Service struct { - adapterConfig *config.AdapterConfig - provider types.Formatter - sourceDevice types.Device - mqttDevice types.Device - sourceMessageQueue *queue.MessageQueue - mqttMessageQueue *queue.MessageQueue - statusSource types.State - statusMqtt types.State - mutex *sync.RWMutex - reconnectDelay string - sourceID string - mqttID string - terminateMqttChan *concurrency.Channel - terminateSourceChan *concurrency.Channel + ctx context.Context + logger *zap.Logger + scheduler *scheduler.Scheduler + adapterConfig *config.AdapterConfig + provider types.Formatter + sourceDevice types.Device + mqttDevice types.Device + sourceMessageQueue *queue.Queue + mqttMessageQueue *queue.Queue + statusSource types.State + statusMqtt types.State + mutex *sync.RWMutex + reconnectDelay string + sourceID string + mqttID string } // NewService creates brand new Service -func NewService(adapterCfg *config.AdapterConfig) (*Service, error) { - provider, err := providerPlugin.Create(adapterCfg.Provider, adapterCfg.Source, adapterCfg.FormatterScript) +func NewService(ctx context.Context, adapterCfg *config.AdapterConfig) (*Service, error) { + logger, err := contextTY.LoggerFromContext(ctx) + if err != nil { + return nil, err + } + + scheduler, err := scheduler.FromContext(ctx) + if err != nil { + logger.Error("error on getting scheduler", zap.Error(err)) + return nil, err + } + + provider, err := providerPlugin.Create(ctx, adapterCfg.Provider, adapterCfg.Source, adapterCfg.FormatterScript) if err != nil { + logger.Error("error on get a provider", zap.String("name", adapterCfg.Name), zap.String("provider", adapterCfg.Provider), zap.Error(err)) return nil, err } s := &Service{ - adapterConfig: adapterCfg, - provider: provider, - mutex: &sync.RWMutex{}, - sourceID: fmt.Sprintf("%s_adapter_source", adapterCfg.Name), - mqttID: fmt.Sprintf("%s_adapter_mqtt", adapterCfg.Name), - terminateMqttChan: concurrency.NewChannel(1), - terminateSourceChan: concurrency.NewChannel(1), + ctx: ctx, + logger: logger.Named("adapter"), + scheduler: scheduler, + adapterConfig: adapterCfg, + provider: provider, + mutex: &sync.RWMutex{}, + sourceID: fmt.Sprintf("%s_adapter_source", adapterCfg.Name), + mqttID: fmt.Sprintf("%s_adapter_mqtt", adapterCfg.Name), } - s.sourceMessageQueue = queue.New(s.sourceID, SourceQueueLimit) - s.mqttMessageQueue = queue.New(s.mqttID, MQTTQueueLimit) + s.sourceMessageQueue = queue.New(logger, s.sourceID, SourceQueueSize, func(item interface{}) {}, 1) + s.mqttMessageQueue = queue.New(logger, s.mqttID, MQTTQueueSize, func(item interface{}) {}, 1) // update reconnectDelay _, err = time.ParseDuration(adapterCfg.ReconnectDelay) if err != nil { - zap.L().Info("error on parsing reconnect delay, running with default", zap.String("reconnectDelay", adapterCfg.ReconnectDelay), zap.String("default", DefaultReconnectDelay), zap.Error(err)) + logger.Info("error on parsing reconnect delay, running with default", zap.String("reconnectDelay", adapterCfg.ReconnectDelay), zap.String("default", DefaultReconnectDelay), zap.Error(err)) s.reconnectDelay = DefaultReconnectDelay } else { s.reconnectDelay = adapterCfg.ReconnectDelay @@ -79,8 +92,9 @@ func NewService(adapterCfg *config.AdapterConfig) (*Service, error) { func (s *Service) Start() { s.reconnectMqttDevice() s.reconnectSourceDevice() - go s.sourceMessageProcessor() - go s.mqttMessageProcessor() + + s.sourceMessageQueue.Queue.StartConsumers(1, s.sourceMessageProcessor) + s.mqttMessageQueue.Queue.StartConsumers(1, s.mqttMessageProcessor) } // Start stops a adapter service @@ -88,94 +102,92 @@ func (s *Service) Stop() { if s.sourceDevice != nil { err := s.sourceDevice.Close() if err != nil { - zap.L().Error("error on closing a source connection", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) + s.logger.Error("error on closing a source connection", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) } } if s.mqttDevice != nil { err := s.mqttDevice.Close() if err != nil { - zap.L().Error("error on closing a mqtt connection", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) + s.logger.Error("error on closing a mqtt connection", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) } } + // close message queues - s.terminateMqttChan.SafeClose() - s.terminateSourceChan.SafeClose() + if s.mqttMessageQueue != nil { + s.mqttMessageQueue.Queue.Stop() + } + if s.sourceMessageQueue != nil { + s.sourceMessageQueue.Queue.Stop() + } } -func (s *Service) mqttMessageProcessor() { - ticker := time.NewTicker(defaultMessageProcessTick) - defer ticker.Stop() - - for { - select { - case <-s.terminateMqttChan.CH: - return - case <-ticker.C: - for { - if s.statusMqtt.Status == types.StatusUP { - message := s.mqttMessageQueue.Get() - if message == nil { - break - } - message.Others.Set(types.KeyMqttQoS, int(s.adapterConfig.MQTT.GetInt64(types.KeyMqttQoS)), nil) - err := s.mqttDevice.Write(message) - if err != nil { - zap.L().Error("error on writing a message to mqtt", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) - } - } else { - break - } - } +func (s *Service) mqttMessageProcessor(item interface{}) { + if item == nil { + return + } + message, ok := item.(*types.Message) + if !ok { + s.logger.Error("error on cast a message", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Any("message", item)) + return + } + if message == nil { + s.logger.Error("message can not be nil", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider)) + return + } + + if s.statusMqtt.Status == types.StatusUP { + message.Others.Set(types.KeyMqttQoS, int(s.adapterConfig.MQTT.GetInt64(types.KeyMqttQoS)), nil) + err := s.mqttDevice.Write(message) + if err != nil { + s.logger.Error("error on writing a message to mqtt", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) } + } else { + // TODO: this message will be dropped, needs to handle this message } } -func (s *Service) sourceMessageProcessor() { - ticker := time.NewTicker(defaultMessageProcessTick) - defer ticker.Stop() - - for { - select { - case <-s.terminateSourceChan.CH: - return - case <-ticker.C: - for { - if s.statusSource.Status == types.StatusUP { - message := s.sourceMessageQueue.Get() - if message == nil { - break - } - zap.L().Debug("posting a message to source device", zap.String("message", message.ToString()), zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider)) - err := s.sourceDevice.Write(message) - if err != nil { - zap.L().Error("error on writing a message to source device", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) - } - } else { - break - } - } +func (s *Service) sourceMessageProcessor(item interface{}) { + if item == nil { + return + } + message, ok := item.(*types.Message) + if !ok { + s.logger.Error("error on cast a message", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Any("message", item)) + return + } + if message == nil { + s.logger.Error("message can not be nil", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider)) + return + } + if s.statusSource.Status == types.StatusUP { + s.logger.Debug("posting a message to source device", zap.String("message", message.ToString()), zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider)) + err := s.sourceDevice.Write(message) + if err != nil { + s.logger.Error("error on writing a message to source device", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) } + } else { + // TODO: this message will be dropped, needs to handle this message } } func (s *Service) onMqttMessage(message *types.Message) { - zap.L().Debug("received a mqtt message", zap.String("message", message.ToString()), zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider)) + s.logger.Debug("received a mqtt message", zap.String("message", message.ToString()), zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider)) formattedMsg, err := s.provider.ToSourceMessage(message) if err != nil { - zap.L().Error("error on formatting to source type", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) + s.logger.Error("error on formatting to source type", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) return } - s.sourceMessageQueue.Add(formattedMsg) + s.sourceMessageQueue.Produce(formattedMsg) } func (s *Service) onSourceMessage(message *types.Message) { - zap.L().Debug("received a message from source device", zap.String("message", message.ToString()), zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider)) + s.logger.Debug("received a message from source device", zap.String("message", message.ToString()), zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider)) formattedMsg, err := s.provider.ToMQTTMessage(message) if err != nil { - zap.L().Error("error on formatting to mqtt", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) + s.logger.Error("error on formatting to mqtt", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) return } - s.mqttMessageQueue.Add(formattedMsg) + s.mqttMessageQueue.Produce(formattedMsg) } func (s *Service) onMqttStatus(state *types.State) { @@ -189,9 +201,9 @@ func (s *Service) onMqttStatus(state *types.State) { } // schedule a job for reconnect - err := scheduler.Schedule(s.mqttID, s.reconnectDelay, s.reconnectMqttDevice) + err := s.scheduler.Schedule(s.mqttID, s.reconnectDelay, s.reconnectMqttDevice) if err != nil { - zap.L().Error("error on configuring a schedule", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.String("id", s.mqttID), zap.Error(err)) + s.logger.Error("error on configuring a schedule", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.String("id", s.mqttID), zap.Error(err)) } } @@ -207,15 +219,15 @@ func (s *Service) onSourceStatus(state *types.State) { } // schedule a job for reconnect - err := scheduler.Schedule(s.sourceID, s.reconnectDelay, s.reconnectSourceDevice) + err := s.scheduler.Schedule(s.sourceID, s.reconnectDelay, s.reconnectSourceDevice) if err != nil { - zap.L().Error("error on configuring a schedule", zap.String("id", s.sourceID), zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) + s.logger.Error("error on configuring a schedule", zap.String("id", s.sourceID), zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) } } func (s *Service) reconnectMqttDevice() { - scheduler.Unschedule(s.mqttID) + s.scheduler.Unschedule(s.mqttID) if s.statusMqtt.Status == types.StatusUP { return } @@ -223,11 +235,11 @@ func (s *Service) reconnectMqttDevice() { if s.mqttDevice != nil { err := s.mqttDevice.Close() if err != nil { - zap.L().Error("error on colsing a mqtt connection", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) + s.logger.Error("error on colsing a mqtt connection", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) } } - mqttDevice, err := devicePlugin.Create(MqttDeviceName, s.adapterConfig.Name, s.adapterConfig.MQTT, s.onMqttMessage, s.onMqttStatus) + mqttDevice, err := devicePlugin.Create(s.ctx, MqttDeviceName, s.adapterConfig.Name, s.adapterConfig.MQTT, s.onMqttMessage, s.onMqttStatus) if err == nil { s.mqttDevice = mqttDevice // update status UP @@ -235,19 +247,19 @@ func (s *Service) reconnectMqttDevice() { Status: types.StatusUP, Since: time.Now(), } - zap.L().Info("connected to the mqtt broker", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider)) + s.logger.Info("connected to the mqtt broker", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider)) return } - zap.L().Error("error on getting mqtt connection", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.String("reconnectDelay", s.reconnectDelay), zap.Error(err)) + s.logger.Error("error on getting mqtt connection", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.String("reconnectDelay", s.reconnectDelay), zap.Error(err)) // schedule a job for reconnect - err = scheduler.Schedule(s.mqttID, s.reconnectDelay, s.reconnectMqttDevice) + err = s.scheduler.Schedule(s.mqttID, s.reconnectDelay, s.reconnectMqttDevice) if err != nil { - zap.L().Error("error on configuring a schedule", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.String("id", s.mqttID), zap.Error(err)) + s.logger.Error("error on configuring a schedule", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.String("id", s.mqttID), zap.Error(err)) } } func (s *Service) reconnectSourceDevice() { - scheduler.Unschedule(s.sourceID) + s.scheduler.Unschedule(s.sourceID) if s.statusSource.Status == types.StatusUP { return } @@ -255,11 +267,11 @@ func (s *Service) reconnectSourceDevice() { if s.sourceDevice != nil { err := s.sourceDevice.Close() if err != nil { - zap.L().Error("error on closing a source connection", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) + s.logger.Error("error on closing a source connection", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.Error(err)) } } - sourceDevice, err := devicePlugin.Create(s.adapterConfig.Source.GetString(types.KeyType), s.adapterConfig.Name, s.adapterConfig.Source, s.onSourceMessage, s.onSourceStatus) + sourceDevice, err := devicePlugin.Create(s.ctx, s.adapterConfig.Source.GetString(types.KeyType), s.adapterConfig.Name, s.adapterConfig.Source, s.onSourceMessage, s.onSourceStatus) if err == nil { s.sourceDevice = sourceDevice // update status UP @@ -267,13 +279,13 @@ func (s *Service) reconnectSourceDevice() { Status: types.StatusUP, Since: time.Now(), } - zap.L().Info("connected to the source device", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider)) + s.logger.Info("connected to the source device", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider)) return } - zap.L().Error("error on getting source connection", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.String("reconnectDelay", s.reconnectDelay), zap.Error(err)) + s.logger.Error("error on getting source connection", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.String("reconnectDelay", s.reconnectDelay), zap.Error(err)) // schedule a job for reconnect - err = scheduler.Schedule(s.sourceID, s.reconnectDelay, s.reconnectSourceDevice) + err = s.scheduler.Schedule(s.sourceID, s.reconnectDelay, s.reconnectSourceDevice) if err != nil { - zap.L().Error("error on configuring a schedule", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.String("id", s.sourceID), zap.Error(err)) + s.logger.Error("error on configuring a schedule", zap.String("adapterName", s.adapterConfig.Name), zap.String("provider", s.adapterConfig.Provider), zap.String("id", s.sourceID), zap.Error(err)) } } diff --git a/pkg/service/adapter/store.go b/pkg/service/adapter/store.go index 6ba3101..a429e14 100644 --- a/pkg/service/adapter/store.go +++ b/pkg/service/adapter/store.go @@ -1,9 +1,11 @@ package adapter import ( + "context" "sync" config "github.com/mycontroller-org/2mqtt/pkg/types/config" + contextTY "github.com/mycontroller-org/2mqtt/pkg/types/context" "go.uber.org/zap" ) @@ -39,21 +41,28 @@ func (s *store) StopAll() { } // Start all the services -func Start(adapters []config.AdapterConfig) { +func Start(ctx context.Context, adapters []config.AdapterConfig) error { + logger, err := contextTY.LoggerFromContext(ctx) + if err != nil { + return err + } + for index := range adapters { adapterCfg := adapters[index] if !adapterCfg.Enabled { continue } - zap.L().Debug("starting an adapter", zap.String("name", adapterCfg.Name), zap.String("provider", adapterCfg.Provider)) - service, err := NewService(&adapterCfg) + logger.Debug("starting an adapter", zap.String("name", adapterCfg.Name), zap.String("provider", adapterCfg.Provider)) + service, err := NewService(ctx, &adapterCfg) if err != nil { - zap.L().Error("error on starting a service", zap.Error(err), zap.String("adapterName", adapterCfg.Name)) + logger.Error("error on starting a service", zap.Error(err), zap.String("adapterName", adapterCfg.Name)) continue } service.Start() servicesStore.Add(service) } + + return nil } // Close stops all the services diff --git a/pkg/service/scheduler/service.go b/pkg/service/scheduler/service.go index 2105491..6ada467 100644 --- a/pkg/service/scheduler/service.go +++ b/pkg/service/scheduler/service.go @@ -1,33 +1,76 @@ package scheduler import ( + "context" + "errors" "fmt" - coreScheduler "github.com/mycontroller-org/server/v2/pkg/service/core_scheduler" + contextTY "github.com/mycontroller-org/2mqtt/pkg/types/context" + schedulerTY "github.com/mycontroller-org/server/v2/pkg/types/scheduler" "go.uber.org/zap" ) +type Scheduler struct { + logger *zap.Logger + coreScheduler schedulerTY.CoreScheduler +} + +func FromContext(ctx context.Context) (*Scheduler, error) { + _scheduler, ok := ctx.Value(contextTY.CUSTOM_SCHEDULER).(*Scheduler) + if !ok { + return nil, errors.New("invalid scheduler instance received in context") + } + if _scheduler == nil { + return nil, errors.New("scheduler instance not provided in context") + } + return _scheduler, nil +} + +func WithContext(ctx context.Context, scheduler *Scheduler) context.Context { + return context.WithValue(ctx, contextTY.CUSTOM_SCHEDULER, scheduler) +} + +func New(ctx context.Context) (*Scheduler, error) { + logger, err := contextTY.LoggerFromContext(ctx) + if err != nil { + return nil, err + } + + coreScheduler, err := schedulerTY.FromContext(ctx) + if err != nil { + logger.Error("error on getting a core scheduler", zap.Error(err)) + return nil, err + } + + _scheduler := &Scheduler{ + logger: logger.Named("scheduler"), + coreScheduler: coreScheduler, + } + + return _scheduler, nil +} + // Schedule adds a schedule -func Schedule(schedulerID, interval string, triggerFunc func()) error { - Unschedule(schedulerID) - zap.L().Info("schedule", zap.String("id", schedulerID), zap.String("interval", interval), zap.Any("triggerFunc", triggerFunc)) +func (sh *Scheduler) Schedule(schedulerID, interval string, triggerFunc func()) error { + sh.Unschedule(schedulerID) + sh.logger.Info("schedule", zap.String("id", schedulerID), zap.String("interval", interval), zap.Any("triggerFunc", triggerFunc)) cronSpec := fmt.Sprintf("@every %s", interval) - err := coreScheduler.SVC.AddFunc(schedulerID, cronSpec, triggerFunc) + err := sh.coreScheduler.AddFunc(schedulerID, cronSpec, triggerFunc) if err != nil { - zap.L().Error("error on adding schedule", zap.Error(err)) + sh.logger.Error("error on adding schedule", zap.Error(err)) return err } - zap.L().Debug("added a schedule", zap.String("schedulerID", schedulerID), zap.String("interval", interval)) + sh.logger.Debug("added a schedule", zap.String("schedulerID", schedulerID), zap.String("interval", interval)) return nil } // UnscheduleAll removes all with prefix -func UnscheduleAll(prefix string) { - coreScheduler.SVC.RemoveWithPrefix(prefix) +func (sh *Scheduler) UnscheduleAll(prefix string) { + sh.coreScheduler.RemoveWithPrefix(prefix) } // Unschedule removes a schedule -func Unschedule(scheduleID string) { - coreScheduler.SVC.RemoveFunc(scheduleID) +func (sh *Scheduler) Unschedule(scheduleID string) { + sh.coreScheduler.RemoveFunc(scheduleID) } diff --git a/pkg/service/start/logger.go b/pkg/service/start/logger.go deleted file mode 100644 index 645ca4b..0000000 --- a/pkg/service/start/logger.go +++ /dev/null @@ -1,16 +0,0 @@ -package start - -import ( - cfgTY "github.com/mycontroller-org/2mqtt/pkg/types/config" - "github.com/mycontroller-org/2mqtt/pkg/version" - loggerUtils "github.com/mycontroller-org/server/v2/pkg/utils/logger" - "go.uber.org/zap" -) - -// InitLogger func -func InitLogger(loggerCfg cfgTY.LoggerConfig) { - logger := loggerUtils.GetLogger(loggerCfg.Mode, loggerCfg.Level, loggerCfg.Encoding, false, 0, false) - zap.ReplaceGlobals(logger) - zap.L().Info("welcome to the 2mqtt adapter server :)") - zap.L().Info("server information", zap.Any("version", version.Get()), zap.Any("logger", loggerCfg)) -} diff --git a/pkg/service/start/start.go b/pkg/service/start/start.go deleted file mode 100644 index d39a49c..0000000 --- a/pkg/service/start/start.go +++ /dev/null @@ -1,55 +0,0 @@ -package start - -import ( - "os" - "os/signal" - "syscall" - "time" - - adapterSVC "github.com/mycontroller-org/2mqtt/pkg/service/adapter" - cfgTY "github.com/mycontroller-org/2mqtt/pkg/types/config" - sch "github.com/mycontroller-org/server/v2/pkg/service/core_scheduler" - "go.uber.org/zap" -) - -func StartServices(cfg *cfgTY.Config) { - start := time.Now() - - sch.Init() // scheduler - - // start adapter services - adapterSVC.Start(cfg.Adapters) - - zap.L().Info("services started", zap.String("timeTaken", time.Since(start).String())) - - handleShutdownSignal() -} - -// handleShutdownSignal func -func handleShutdownSignal() { - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - - // waiting for signal - sig := <-sigs - close(sigs) - - zap.L().Info("shutdown initiated..", zap.Any("signal", sig)) - triggerShutdown() -} - -func triggerShutdown() { - start := time.Now() - - // close adapter services - adapterSVC.Close() - - if sch.SVC != nil { - sch.SVC.Close() - } - - zap.L().Debug("closing services are done", zap.String("timeTaken", time.Since(start).String())) - zap.L().Debug("bye, see you soon :)") - - os.Exit(0) -} diff --git a/pkg/types/config/types.go b/pkg/types/config/types.go index cf9f0d8..29e961e 100644 --- a/pkg/types/config/types.go +++ b/pkg/types/config/types.go @@ -10,9 +10,10 @@ type Config struct { // LoggerConfig struct type LoggerConfig struct { - Mode string `yaml:"mode"` - Encoding string `yaml:"encoding"` - Level string `yaml:"level"` + Mode string `yaml:"mode"` + Encoding string `yaml:"encoding"` + Level string `yaml:"level"` + EnableStacktrace bool `yaml:"enable_stacktrace"` } // AdapterConfig struct diff --git a/pkg/types/context/types.go b/pkg/types/context/types.go new file mode 100644 index 0000000..c1f59c4 --- /dev/null +++ b/pkg/types/context/types.go @@ -0,0 +1,31 @@ +package context + +import ( + "context" + "errors" + + "go.uber.org/zap" +) + +const ( + CONFIG ContextKey = "mc_config" + LOGGER ContextKey = "mc_logger" + CUSTOM_SCHEDULER ContextKey = "mc_custom_scheduler" +) + +type ContextKey string + +func LoggerFromContext(ctx context.Context) (*zap.Logger, error) { + logger, ok := ctx.Value(LOGGER).(*zap.Logger) + if !ok { + return nil, errors.New("invalid logger instance received in context") + } + if logger == nil { + return nil, errors.New("logger instance not provided in context") + } + return logger, nil +} + +func LoggerWithContext(ctx context.Context, logger *zap.Logger) context.Context { + return context.WithValue(ctx, LOGGER, logger) +} diff --git a/plugin/device/ethernet/device.go b/plugin/device/ethernet/device.go index 9544fe9..bc2c6d6 100644 --- a/plugin/device/ethernet/device.go +++ b/plugin/device/ethernet/device.go @@ -1,11 +1,13 @@ package ethernet import ( + "context" "net" "net/url" "time" "github.com/mycontroller-org/2mqtt/pkg/types" + contextTY "github.com/mycontroller-org/2mqtt/pkg/types/context" deviceType "github.com/mycontroller-org/2mqtt/plugin/device/types" "github.com/mycontroller-org/server/v2/pkg/types/cmap" "github.com/mycontroller-org/server/v2/pkg/utils" @@ -32,6 +34,7 @@ type Config struct { // Endpoint data type Endpoint struct { + logger *zap.Logger ID string Config Config connUrl *url.URL @@ -43,10 +46,16 @@ type Endpoint struct { } // NewDevice ethernet driver -func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *types.Message), statusFunc func(state *types.State)) (deviceType.Plugin, error) { +func NewDevice(ctx context.Context, ID string, config cmap.CustomMap, rxFunc func(msg *types.Message), statusFunc func(state *types.State)) (deviceType.Plugin, error) { + logger, err := contextTY.LoggerFromContext(ctx) + if err != nil { + return nil, err + } + var cfg Config - err := utils.MapToStruct(utils.TagNameYaml, config, &cfg) + err = utils.MapToStruct(utils.TagNameYaml, config, &cfg) if err != nil { + logger.Error("error on converting map to struct", zap.Error(err)) return nil, err } @@ -55,7 +64,7 @@ func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *types.Message) cfg.MessageSplitter = &splitter } - zap.L().Debug("source device config", zap.String("id", ID), zap.Any("config", cfg)) + logger.Debug("source device config", zap.String("id", ID), zap.Any("config", cfg)) serverURL, err := url.Parse(cfg.Server) if err != nil { @@ -68,6 +77,7 @@ func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *types.Message) } endpoint := &Endpoint{ + logger: logger.Named("ethernet_client"), ID: ID, Config: cfg, connUrl: serverURL, @@ -107,7 +117,7 @@ func (ep *Endpoint) Close() error { if ep.conn != nil { err := ep.conn.Close() if err != nil { - zap.L().Error("error on closing a connection", zap.String("adapterID", ep.ID), zap.String("server", ep.Config.Server), zap.Error(err)) + ep.logger.Error("error on closing a connection", zap.String("adapterID", ep.ID), zap.String("server", ep.Config.Server), zap.Error(err)) } ep.conn = nil } @@ -121,12 +131,12 @@ func (ep *Endpoint) dataListener() { for { select { case <-ep.safeClose.CH: - zap.L().Info("received close signal", zap.String("adapterID", ep.ID), zap.String("server", ep.Config.Server)) + ep.logger.Info("received close signal", zap.String("adapterID", ep.ID), zap.String("server", ep.Config.Server)) return default: rxLength, err := ep.conn.Read(readBuf) if err != nil { - zap.L().Error("error on reading data from a ethernet connection", zap.String("adapterID", ep.ID), zap.String("server", ep.Config.Server), zap.Error(err)) + ep.logger.Error("error on reading data from a ethernet connection", zap.String("adapterID", ep.ID), zap.String("server", ep.Config.Server), zap.Error(err)) state := &types.State{ Status: types.StatusError, Message: err.Error(), diff --git a/plugin/device/http/basic_authentication.go b/plugin/device/http/basic_authentication.go index 6316a7f..ec2a995 100644 --- a/plugin/device/http/basic_authentication.go +++ b/plugin/device/http/basic_authentication.go @@ -4,7 +4,7 @@ import ( "net/http" "strings" - handlerUtils "github.com/mycontroller-org/server/v2/cmd/server/app/handler/utils" + handlerUtils "github.com/mycontroller-org/server/v2/pkg/utils/http_handler" ) const ( diff --git a/plugin/device/http/device.go b/plugin/device/http/device.go index 9169f02..0fa9921 100644 --- a/plugin/device/http/device.go +++ b/plugin/device/http/device.go @@ -1,12 +1,14 @@ package http import ( + "context" "fmt" "net" "net/http" "time" model "github.com/mycontroller-org/2mqtt/pkg/types" + contextTY "github.com/mycontroller-org/2mqtt/pkg/types/context" deviceType "github.com/mycontroller-org/2mqtt/plugin/device/types" "github.com/mycontroller-org/server/v2/pkg/types/cmap" "github.com/mycontroller-org/server/v2/pkg/utils" @@ -32,6 +34,7 @@ type Config struct { // Endpoint data type Endpoint struct { + logger *zap.Logger ID string Config *Config receiveMsgFunc func(rm *model.Message) @@ -40,22 +43,29 @@ type Endpoint struct { } // New http client -func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *model.Message), statusFunc func(state *model.State)) (deviceType.Plugin, error) { +func NewDevice(ctx context.Context, ID string, config cmap.CustomMap, rxFunc func(msg *model.Message), statusFunc func(state *model.State)) (deviceType.Plugin, error) { + logger, err := contextTY.LoggerFromContext(ctx) + if err != nil { + return nil, err + } + var cfg Config - err := utils.MapToStruct(utils.TagNameYaml, config, &cfg) + err = utils.MapToStruct(utils.TagNameYaml, config, &cfg) if err != nil { + logger.Error("error on converting map to struct", zap.Error(err)) return nil, err } - zap.L().Debug("source device config", zap.String("id", ID), zap.Any("config", cfg)) + logger.Debug("source device config", zap.String("id", ID), zap.Any("config", cfg)) - zap.L().Info("opening the listening address", zap.String("adapterName", ID), zap.String("listenAddress", cfg.ListenAddress)) + logger.Info("opening the listening address", zap.String("adapterName", ID), zap.String("listenAddress", cfg.ListenAddress)) listener, err := net.Listen("tcp", cfg.ListenAddress) if err != nil { return nil, err } endpoint := &Endpoint{ + logger: logger.Named("http_client"), ID: ID, Config: &cfg, listener: listener, @@ -64,7 +74,7 @@ func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *model.Message) } // handler - var handler http.Handler = deviceHandler{receiveMsgFunc: rxFunc, ID: ID} + var handler http.Handler = deviceHandler{logger: endpoint.logger, receiveMsgFunc: rxFunc, ID: ID} // include basic auth if enabled if cfg.IsAuthEnabled { handler = MiddlewareBasicAuthentication(cfg.Username, cfg.Password, handler) @@ -80,7 +90,7 @@ func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *model.Message) go func() { err = server.Serve(endpoint.listener) if err != nil { - zap.L().Error("error serve", zap.String("adapterName", ID), zap.String("listenAddress", cfg.ListenAddress), zap.Error(err)) + endpoint.logger.Error("error serve", zap.String("adapterName", ID), zap.String("listenAddress", cfg.ListenAddress), zap.Error(err)) endpoint.statusFunc(&model.State{ Status: model.StatusError, Message: err.Error(), @@ -111,7 +121,7 @@ func (ep *Endpoint) Close() error { if ep.listener != nil { err := ep.listener.Close() ep.listener = nil - zap.L().Error("error on closing listener", zap.Error(err)) + ep.logger.Error("error on closing listener", zap.Error(err)) return err } return nil diff --git a/plugin/device/http/handler.go b/plugin/device/http/handler.go index 8caeaaf..40431f2 100644 --- a/plugin/device/http/handler.go +++ b/plugin/device/http/handler.go @@ -22,6 +22,7 @@ type RequestData struct { } type deviceHandler struct { + logger *zap.Logger ID string receiveMsgFunc func(rm *types.Message) } @@ -34,7 +35,7 @@ func (h deviceHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Method == http.MethodDelete { data, err := io.ReadAll(r.Body) if err != nil { - zap.L().Error("error on reading body", zap.String("adapterName", h.ID), zap.String("path", r.URL.Path), zap.Error(err)) + h.logger.Error("error on reading body", zap.String("adapterName", h.ID), zap.String("path", r.URL.Path), zap.Error(err)) return } body = string(data) @@ -65,7 +66,7 @@ func (h deviceHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { bytes, err := json.Marshal(&requestData) if err != nil { - zap.L().Error("error on converting request data to json bytes", zap.String("adapterName", h.ID), zap.String("path", r.URL.Path), zap.Error(err)) + h.logger.Error("error on converting request data to json bytes", zap.String("adapterName", h.ID), zap.String("path", r.URL.Path), zap.Error(err)) return } diff --git a/plugin/device/mqtt/device.go b/plugin/device/mqtt/device.go index f0e93a1..143c33c 100644 --- a/plugin/device/mqtt/device.go +++ b/plugin/device/mqtt/device.go @@ -1,6 +1,7 @@ package mqtt import ( + "context" "crypto/tls" "fmt" "strings" @@ -8,6 +9,7 @@ import ( paho "github.com/eclipse/paho.mqtt.golang" model "github.com/mycontroller-org/2mqtt/pkg/types" + contextTY "github.com/mycontroller-org/2mqtt/pkg/types/context" deviceType "github.com/mycontroller-org/2mqtt/plugin/device/types" "github.com/mycontroller-org/server/v2/pkg/types/cmap" "github.com/mycontroller-org/server/v2/pkg/utils" @@ -19,26 +21,29 @@ import ( const ( PluginMQTT = "mqtt" - transmitPreDelayDefault = time.Microsecond * 1 // 1 micro second - reconnectDelayDefault = time.Second * 10 // 10 seconds + transmitPreDelayDefault = time.Microsecond * 1 // 1 micro second + reconnectDelayDefault = time.Second * 10 // 10 seconds + connectionTimeoutDefault = time.Second * 30 // 30 seconds ) // Config struct type Config struct { - Name string `yaml:"name"` - Broker string `yaml:"broker"` - Insecure bool `yaml:"insecure"` - Username string `yaml:"username"` - Password string `yaml:"password" json:"-"` - Subscribe string `yaml:"subscribe"` - Publish string `yaml:"publish"` - QoS int `yaml:"qos"` - TransmitPreDelay string `yaml:"transmit_pre_delay"` - ReconnectDelay string `yaml:"reconnect_delay"` + Name string `yaml:"name"` + Broker string `yaml:"broker"` + Insecure bool `yaml:"insecure"` + Username string `yaml:"username"` + Password string `yaml:"password" json:"-"` + Subscribe string `yaml:"subscribe"` + Publish string `yaml:"publish"` + QoS int `yaml:"qos"` + TransmitPreDelay string `yaml:"transmit_pre_delay"` + ReconnectDelay string `yaml:"reconnect_delay"` + ConnectionTimeout string `yaml:"connection_timeout"` } // Endpoint data type Endpoint struct { + logger *zap.Logger ID string Config *Config receiveMsgFunc func(msg *model.Message) @@ -48,19 +53,25 @@ type Endpoint struct { } // NewDevice mqtt driver -func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *model.Message), statusFunc func(state *model.State)) (deviceType.Plugin, error) { +func NewDevice(ctx context.Context, ID string, config cmap.CustomMap, rxFunc func(msg *model.Message), statusFunc func(state *model.State)) (deviceType.Plugin, error) { + logger, err := contextTY.LoggerFromContext(ctx) + if err != nil { + return nil, err + } start := time.Now() var cfg Config - err := utils.MapToStruct(utils.TagNameYaml, config, &cfg) + err = utils.MapToStruct(utils.TagNameYaml, config, &cfg) if err != nil { + logger.Error("error on converting map to struct", zap.Error(err)) return nil, err } - zap.L().Debug("mqtt config", zap.Any("adapterName", ID), zap.Any("config", cfg)) + logger.Debug("mqtt config", zap.Any("adapterName", ID), zap.Any("config", cfg)) // endpoint endpoint := &Endpoint{ + logger: logger.Named("mqtt_client"), ID: ID, Config: &cfg, receiveMsgFunc: rxFunc, @@ -78,12 +89,15 @@ func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *model.Message) opts.SetConnectRetryInterval(utils.ToDuration(cfg.ReconnectDelay, reconnectDelayDefault)) opts.SetOnConnectHandler(endpoint.onConnectionHandler) opts.SetConnectionLostHandler(endpoint.onConnectionLostHandler) + opts.SetConnectTimeout(utils.ToDuration(cfg.ConnectionTimeout, connectionTimeoutDefault)) // update tls config tlsConfig := &tls.Config{InsecureSkipVerify: cfg.Insecure} opts.SetTLSConfig(tlsConfig) endpoint.Client = paho.NewClient(opts) + + endpoint.logger.Debug("mqtt client connecting to broker", zap.Any("adapterName", ID), zap.Any("clientConfig", cfg)) token := endpoint.Client.Connect() for !token.WaitTimeout(3 * time.Second) { } @@ -91,7 +105,7 @@ func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *model.Message) return nil, err } - zap.L().Debug("mqtt client connected successfully", zap.Any("adapterName", ID), zap.String("timeTaken", time.Since(start).String()), zap.Any("clientConfig", cfg)) + endpoint.logger.Debug("mqtt client connected successfully", zap.Any("adapterName", ID), zap.String("timeTaken", time.Since(start).String()), zap.Any("clientConfig", cfg)) return endpoint, nil } @@ -100,11 +114,11 @@ func (ep *Endpoint) Name() string { } func (ep *Endpoint) onConnectionHandler(c paho.Client) { - zap.L().Debug("mqtt connection success", zap.Any("adapterName", ep.ID)) + ep.logger.Debug("mqtt connection success", zap.Any("adapterName", ep.ID)) err := ep.Subscribe(ep.Config.Subscribe) if err != nil { - zap.L().Error("error on subscribe topics", zap.Any("adapterName", ep.ID), zap.String("topics", ep.Config.Subscribe), zap.Error(err)) + ep.logger.Error("error on subscribe topics", zap.Any("adapterName", ep.ID), zap.String("topics", ep.Config.Subscribe), zap.Error(err)) } ep.statusFunc(&model.State{ @@ -115,7 +129,7 @@ func (ep *Endpoint) onConnectionHandler(c paho.Client) { } func (ep *Endpoint) onConnectionLostHandler(c paho.Client, err error) { - zap.L().Error("mqtt connection lost", zap.Any("adapterName", ep.ID), zap.Error(err)) + ep.logger.Error("mqtt connection lost", zap.Any("adapterName", ep.ID), zap.Error(err)) // Report connection lost if err != nil { ep.statusFunc(&model.State{ @@ -131,7 +145,7 @@ func (ep *Endpoint) Write(message *model.Message) error { if message == nil { return nil } - zap.L().Debug("about to send a message", zap.Any("adapterName", ep.ID), zap.String("message", message.ToString())) + ep.logger.Debug("about to send a message", zap.Any("adapterName", ep.ID), zap.String("message", message.ToString())) topic := message.Others.GetString(model.KeyMqttTopic) qos := byte(ep.Config.QoS) @@ -157,7 +171,7 @@ func (ep *Endpoint) Close() error { if ep.Client.IsConnected() { ep.Client.Unsubscribe(ep.Config.Subscribe) ep.Client.Disconnect(0) - zap.L().Debug("mqtt client connection closed", zap.String("adapterName", ep.ID)) + ep.logger.Debug("mqtt client connection closed", zap.String("adapterName", ep.ID)) } return nil } @@ -182,10 +196,10 @@ func (ep *Endpoint) Subscribe(topicsStr string) error { token := ep.Client.Subscribe(topic, 0, ep.getCallBack()) token.WaitTimeout(3 * time.Second) if token.Error() != nil { - zap.L().Error("error on subscription", zap.String("adapterName", ep.ID), zap.String("topic", topic), zap.Error(token.Error())) + ep.logger.Error("error on subscription", zap.String("adapterName", ep.ID), zap.String("topic", topic), zap.Error(token.Error())) return token.Error() } - zap.L().Debug("subscribed a topic", zap.String("adapterName", ep.ID), zap.String("topic", topic)) + ep.logger.Debug("subscribed a topic", zap.String("adapterName", ep.ID), zap.String("topic", topic)) } return nil } diff --git a/plugin/device/plugin.go b/plugin/device/plugin.go index 6d335a0..fc1d134 100644 --- a/plugin/device/plugin.go +++ b/plugin/device/plugin.go @@ -1,6 +1,7 @@ package plugin import ( + "context" "fmt" model "github.com/mycontroller-org/2mqtt/pkg/types" @@ -10,7 +11,7 @@ import ( ) // CreatorFn func type -type CreatorFn func(ID string, config cmap.CustomMap, rxFunc func(msg *model.Message), statusFunc func(state *model.State)) (deviceType.Plugin, error) +type CreatorFn func(ctx context.Context, ID string, config cmap.CustomMap, rxFunc func(msg *model.Message), statusFunc func(state *model.State)) (deviceType.Plugin, error) // Creators is used for create plugins. var creators = make(map[string]CreatorFn) @@ -23,9 +24,9 @@ func Register(name string, fn CreatorFn) { creators[name] = fn } -func Create(name, ID string, config cmap.CustomMap, rxFunc func(msg *model.Message), statusFunc func(state *model.State)) (p deviceType.Plugin, err error) { +func Create(ctx context.Context, name, ID string, config cmap.CustomMap, rxFunc func(msg *model.Message), statusFunc func(state *model.State)) (p deviceType.Plugin, err error) { if fn, ok := creators[name]; ok { - p, err = fn(ID, config, rxFunc, statusFunc) + p, err = fn(ctx, ID, config, rxFunc, statusFunc) } else { err = fmt.Errorf("device plugin [%s] is not registered", name) } diff --git a/plugin/device/serial/device.go b/plugin/device/serial/device.go index 2bff835..5532954 100644 --- a/plugin/device/serial/device.go +++ b/plugin/device/serial/device.go @@ -1,9 +1,11 @@ package serial import ( + "context" "time" "github.com/mycontroller-org/2mqtt/pkg/types" + contextTY "github.com/mycontroller-org/2mqtt/pkg/types/context" deviceType "github.com/mycontroller-org/2mqtt/plugin/device/types" "github.com/mycontroller-org/server/v2/pkg/types/cmap" "github.com/mycontroller-org/server/v2/pkg/utils" @@ -32,6 +34,7 @@ type Config struct { // Endpoint data type Endpoint struct { + logger *zap.Logger ID string Config *Config serCfg *ser.Config @@ -43,10 +46,16 @@ type Endpoint struct { } // New serial client -func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *types.Message), statusFunc func(state *types.State)) (deviceType.Plugin, error) { +func NewDevice(ctx context.Context, ID string, config cmap.CustomMap, rxFunc func(msg *types.Message), statusFunc func(state *types.State)) (deviceType.Plugin, error) { + logger, err := contextTY.LoggerFromContext(ctx) + if err != nil { + return nil, err + } + var cfg Config - err := utils.MapToStruct(utils.TagNameYaml, config, &cfg) + err = utils.MapToStruct(utils.TagNameYaml, config, &cfg) if err != nil { + logger.Error("error on converting map to struct", zap.Error(err)) return nil, err } @@ -55,17 +64,18 @@ func NewDevice(ID string, config cmap.CustomMap, rxFunc func(msg *types.Message) cfg.MessageSplitter = &splitter } - zap.L().Debug("source device config", zap.String("id", ID), zap.Any("config", cfg)) + logger.Debug("source device config", zap.String("id", ID), zap.Any("config", cfg)) serCfg := &ser.Config{Name: cfg.Port, Baud: cfg.BaudRate} - zap.L().Info("opening a serial port", zap.String("adapterName", ID), zap.String("port", cfg.Port)) + logger.Info("opening a serial port", zap.String("adapterName", ID), zap.String("port", cfg.Port)) port, err := ser.OpenPort(serCfg) if err != nil { return nil, err } endpoint := &Endpoint{ + logger: logger.Named("serial_client"), ID: ID, Config: &cfg, serCfg: serCfg, @@ -87,7 +97,7 @@ func (ep *Endpoint) Name() string { } func (ep *Endpoint) Write(message *types.Message) error { - if message == nil && len(message.Data) > 0 { + if message == nil || len(message.Data) == 0 { return nil } @@ -111,11 +121,11 @@ func (ep *Endpoint) Close() error { if ep.Port != nil { if err := ep.Port.Flush(); err != nil { - zap.L().Error("error on flushing into serial port", zap.String("adapterName", ep.ID), zap.String("port", ep.serCfg.Name), zap.Error(err)) + ep.logger.Error("error on flushing into serial port", zap.String("adapterName", ep.ID), zap.String("port", ep.serCfg.Name), zap.Error(err)) } err := ep.Port.Close() if err != nil { - zap.L().Error("error on closing the serial port", zap.String("adapterName", ep.ID), zap.String("port", ep.serCfg.Name), zap.Error(err)) + ep.logger.Error("error on closing the serial port", zap.String("adapterName", ep.ID), zap.String("port", ep.serCfg.Name), zap.Error(err)) } return err } @@ -129,23 +139,21 @@ func (ep *Endpoint) dataListener() { for { select { case <-ep.safeClose.CH: - zap.L().Info("received a close signal.", zap.String("id", ep.ID), zap.String("port", ep.serCfg.Name)) + ep.logger.Info("received a close signal.", zap.String("id", ep.ID), zap.String("port", ep.serCfg.Name)) return default: rxLength, err := ep.Port.Read(readBuf) if err != nil { - zap.L().Error("error on reading data from a serial port", zap.String("adapterName", ep.ID), zap.String("port", ep.serCfg.Name), zap.Error(err)) + ep.logger.Error("error on reading data from a serial port", zap.String("adapterName", ep.ID), zap.String("port", ep.serCfg.Name), zap.Error(err)) // notify failed - if err != nil { - ep.statusFunc(&types.State{ - Status: types.StatusError, - Message: err.Error(), - Since: time.Now(), - }) - } + ep.statusFunc(&types.State{ + Status: types.StatusError, + Message: err.Error(), + Since: time.Now(), + }) return } - //zap.L().Debug("data", zap.Any("data", string(data))) + //ep.logger.Debug("data", zap.Any("data", string(data))) for index := 0; index < rxLength; index++ { b := readBuf[index] if b == *ep.Config.MessageSplitter { diff --git a/plugin/provider/mysensors_v2/formatter.go b/plugin/provider/mysensors_v2/formatter.go index ca91c7d..9815c0d 100644 --- a/plugin/provider/mysensors_v2/formatter.go +++ b/plugin/provider/mysensors_v2/formatter.go @@ -1,10 +1,12 @@ package mysensors import ( + "context" "errors" "strings" "github.com/mycontroller-org/2mqtt/pkg/types" + contextTY "github.com/mycontroller-org/2mqtt/pkg/types/context" "go.uber.org/zap" ) @@ -12,18 +14,35 @@ const ( MessageSplitter = '\n' ) -type SourceType string +type MySensorsFormatter struct { + logger *zap.Logger + name string +} + +func New(ctx context.Context, name string) (*MySensorsFormatter, error) { + logger, err := contextTY.LoggerFromContext(ctx) + if err != nil { + return nil, err + } + + formatter := &MySensorsFormatter{ + logger: logger.Named("mys_formatter"), + name: name, + } + + return formatter, nil +} -func (st SourceType) Name() string { +func (mys *MySensorsFormatter) Name() string { return PluginMySensors } -func (st SourceType) ToSourceMessage(mqttMessage *types.Message) (*types.Message, error) { +func (mys *MySensorsFormatter) ToSourceMessage(mqttMessage *types.Message) (*types.Message, error) { // node-id;child-sensor-id;command;ack;type;payload\n topic := mqttMessage.Others.GetString(types.KeyMqttTopic) topicSlice := strings.Split(topic, "/") if len(topicSlice) < 5 { - zap.L().Warn("invalid topic", zap.Any("message", mqttMessage)) + mys.logger.Warn("invalid topic", zap.Any("message", mqttMessage)) return nil, errors.New("invalid topic") } topicSlice = topicSlice[len(topicSlice)-5:] @@ -45,7 +64,7 @@ func (st SourceType) ToSourceMessage(mqttMessage *types.Message) (*types.Message return formattedMessage, nil } -func (st SourceType) ToMQTTMessage(sourceMessage *types.Message) (*types.Message, error) { +func (mys *MySensorsFormatter) ToMQTTMessage(sourceMessage *types.Message) (*types.Message, error) { // structure: node-id/child-sensor-id/command/ack/type payload data := "" if len(sourceMessage.Data) > 0 { @@ -53,7 +72,7 @@ func (st SourceType) ToMQTTMessage(sourceMessage *types.Message) (*types.Message } dataSlice := strings.Split(data, ";") if len(dataSlice) != 6 { - zap.L().Warn("invalid message format", zap.String("message", data)) + mys.logger.Warn("invalid message format", zap.String("message", data)) return nil, errors.New("invalid message format") } topic := strings.Join(dataSlice[:5], "/") diff --git a/plugin/provider/mysensors_v2/mysensors.go b/plugin/provider/mysensors_v2/mysensors.go index 511bede..a217206 100644 --- a/plugin/provider/mysensors_v2/mysensors.go +++ b/plugin/provider/mysensors_v2/mysensors.go @@ -1,6 +1,7 @@ package mysensors import ( + "context" "fmt" "github.com/mycontroller-org/2mqtt/pkg/types" @@ -11,14 +12,14 @@ import ( const PluginMySensors = "mysensors_v2" -func NewProvider(config cmap.CustomMap, formatter cfgTY.FormatterScript) (providerType.Plugin, error) { +func NewProvider(ctx context.Context, config cmap.CustomMap, formatter cfgTY.FormatterScript) (providerType.Plugin, error) { sourceType := config.GetString(types.KeyType) name := config.GetString(types.KeyName) switch sourceType { case types.DeviceSerial, types.DeviceEthernet: config.Set(types.KeyMessageSplitter, MessageSplitter, nil) - return SourceType(name), nil + return New(ctx, name) default: return nil, fmt.Errorf("unsupported source type:%s", sourceType) diff --git a/plugin/provider/plugin.go b/plugin/provider/plugin.go index 75879d8..6d77d74 100644 --- a/plugin/provider/plugin.go +++ b/plugin/provider/plugin.go @@ -1,6 +1,7 @@ package plugin import ( + "context" "fmt" cfgTY "github.com/mycontroller-org/2mqtt/pkg/types/config" @@ -10,7 +11,7 @@ import ( ) // CreatorFn func type -type CreatorFn func(config cmap.CustomMap, formatter cfgTY.FormatterScript) (providerType.Plugin, error) +type CreatorFn func(ctx context.Context, config cmap.CustomMap, formatter cfgTY.FormatterScript) (providerType.Plugin, error) // Creators is used for create plugins. var creators = make(map[string]CreatorFn) @@ -23,9 +24,9 @@ func Register(name string, fn CreatorFn) { creators[name] = fn } -func Create(name string, config cmap.CustomMap, formatter cfgTY.FormatterScript) (p providerType.Plugin, err error) { +func Create(ctx context.Context, name string, config cmap.CustomMap, formatter cfgTY.FormatterScript) (p providerType.Plugin, err error) { if fn, ok := creators[name]; ok { - p, err = fn(config, formatter) + p, err = fn(ctx, config, formatter) } else { err = fmt.Errorf("device plugin [%s] is not registered", name) } diff --git a/plugin/provider/raw/formatter.go b/plugin/provider/raw/formatter.go index de0ccd7..1237d05 100644 --- a/plugin/provider/raw/formatter.go +++ b/plugin/provider/raw/formatter.go @@ -1,13 +1,17 @@ package raw import ( + "context" "fmt" "strings" + "time" "github.com/mycontroller-org/2mqtt/pkg/types" cfgTY "github.com/mycontroller-org/2mqtt/pkg/types/config" + contextTY "github.com/mycontroller-org/2mqtt/pkg/types/context" "github.com/mycontroller-org/server/v2/pkg/types/cmap" js "github.com/mycontroller-org/server/v2/pkg/utils/javascript" + "go.uber.org/zap" ) const ( @@ -18,10 +22,25 @@ const ( ) type RawProvider struct { + logger *zap.Logger name string formatter cfgTY.FormatterScript } +func New(ctx context.Context, name string, formatter cfgTY.FormatterScript) (*RawProvider, error) { + logger, err := contextTY.LoggerFromContext(ctx) + if err != nil { + return nil, err + } + + _formatter := &RawProvider{ + logger: logger.Named("raw"), + name: name, + formatter: formatter, + } + return _formatter, nil +} + func (rp *RawProvider) Name() string { return PluginRaw } @@ -94,7 +113,8 @@ func (rp *RawProvider) executeScript(script string, msg *types.Message) (*types. } // executes script - response, err := js.Execute(script, input) + _timeout := time.Second * 2 // timeout 2 seconds + response, err := js.Execute(rp.logger, script, input, &_timeout) if err != nil { return nil, err } diff --git a/plugin/provider/raw/raw.go b/plugin/provider/raw/raw.go index a4b8f77..c48950c 100644 --- a/plugin/provider/raw/raw.go +++ b/plugin/provider/raw/raw.go @@ -1,6 +1,7 @@ package raw import ( + "context" "fmt" "github.com/mycontroller-org/2mqtt/pkg/types" @@ -11,13 +12,13 @@ import ( const PluginRaw = "raw" -func NewProvider(cfg cmap.CustomMap, formatter cfgTY.FormatterScript) (providerType.Plugin, error) { +func NewProvider(ctx context.Context, cfg cmap.CustomMap, formatter cfgTY.FormatterScript) (providerType.Plugin, error) { sourceType := cfg.GetString(types.KeyType) name := cfg.GetString(types.KeyName) switch sourceType { case types.DeviceSerial, types.DeviceEthernet, types.DeviceHTTP: - return &RawProvider{name: name, formatter: formatter}, nil + return New(ctx, name, formatter) default: return nil, fmt.Errorf("unsupported source type:%s", sourceType)