diff --git a/cmd/run.go b/cmd/run.go index 39f8f7a0..7b762fa1 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -30,12 +30,10 @@ var cmdRun = &cobra.Command{ // run worker command var ( - useStatusKV bool dryrun bool faultInjection bool facilityCode string storeKind string - replicas int ) var ( @@ -79,7 +77,7 @@ func runWorker(ctx context.Context) { flasher.Logger.Fatal("--facility-code parameter required") } - natsURL, natsCreds, connectTimeout, err := flasher.NatsParams() + natsCfg, err := flasher.NatsParams() if err != nil { flasher.Logger.Fatal(err) } @@ -88,13 +86,13 @@ func runWorker(ctx context.Context) { model.AppName, facilityCode, "firmwareInstall", - natsURL, - natsCreds, + natsCfg.NatsURL, + natsCfg.CredsFile, "firmwareInstall", - controller.WithConcurrency(10), - controller.WithKVReplicas(1), + controller.WithConcurrency(flasher.Config.Concurrency), + controller.WithKVReplicas(natsCfg.KVReplicas), controller.WithLogger(flasher.Logger), - controller.WithConnectionTimeout(connectTimeout), + controller.WithConnectionTimeout(natsCfg.ConnectTimeout), ) if err := nc.Connect(ctx); err != nil { @@ -126,9 +124,7 @@ func initStore(ctx context.Context, config *app.Configuration, logger *logrus.Lo func init() { cmdRun.PersistentFlags().StringVar(&storeKind, "store", "", "Inventory store to lookup devices for update - 'serverservice' or an inventory file with a .yml/.yaml extenstion") cmdRun.PersistentFlags().BoolVarP(&dryrun, "dry-run", "", false, "In dryrun mode, the worker actions the task without installing firmware") - cmdRun.PersistentFlags().BoolVarP(&useStatusKV, "use-kv", "", false, "When this is true, flasher writes status to a NATS KV store instead of sending reply messages (requires --facility-code)") cmdRun.PersistentFlags().BoolVarP(&faultInjection, "fault-injection", "", false, "Tasks can include a Fault attribute to allow fault injection for development purposes") - cmdRun.PersistentFlags().IntVarP(&replicas, "replica-count", "r", 3, "The number of replicas to use for NATS data") cmdRun.PersistentFlags().StringVar(&facilityCode, "facility-code", "", "The facility code this flasher instance is associated with") if err := cmdRun.MarkPersistentFlagRequired("store"); err != nil { diff --git a/go.mod b/go.mod index 86e2d909..c3c33bc7 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jinzhu/copier v0.4.0 github.com/jpillora/backoff v1.0.0 github.com/metal-toolbox/fleetdb v0.17.1 - github.com/metal-toolbox/rivets v1.0.2 + github.com/metal-toolbox/rivets v1.0.3-0.20240404085830-34516b35bd8e github.com/mitchellh/mapstructure v1.5.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.0 diff --git a/go.sum b/go.sum index a0e495b8..5d5a6e80 100644 --- a/go.sum +++ b/go.sum @@ -570,6 +570,8 @@ github.com/metal-toolbox/fleetdb v0.17.1 h1:eyaCg4yGQnbXIjiBMZPsLqmEpxbltlEUNDG+ github.com/metal-toolbox/fleetdb v0.17.1/go.mod h1:TbRbU+ppHIknqiAONR7JDQfzKij30uiPkehgxlA1Hv0= github.com/metal-toolbox/rivets v1.0.2 h1:cdLXPcGZOw0v8BP2driAMvBTf+PF9Eua2UnUpBFyDrs= github.com/metal-toolbox/rivets v1.0.2/go.mod h1:EMQJRT1mjIyFRXxvKNaBlz7Z4Sp88rTaGO8W18olN2I= +github.com/metal-toolbox/rivets v1.0.3-0.20240404085830-34516b35bd8e h1:EFarcZokTOhcvGvd0VQkpgvEHtlsbAy/nKSMTnbnA4Q= +github.com/metal-toolbox/rivets v1.0.3-0.20240404085830-34516b35bd8e/go.mod h1:EMQJRT1mjIyFRXxvKNaBlz7Z4Sp88rTaGO8W18olN2I= github.com/microsoft/go-mssqldb v0.17.0/go.mod h1:OkoNGhGEs8EZqchVTtochlXruEhEOaO4S0d2sB5aeGQ= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= diff --git a/internal/app/config.go b/internal/app/config.go index cfdb3d14..e96f4188 100644 --- a/internal/app/config.go +++ b/internal/app/config.go @@ -109,6 +109,10 @@ func (a *App) LoadConfiguration(cfgFile string, storeKind model.StoreKind) error } } + if a.Config.Concurrency == 0 { + a.Config.Concurrency = WorkerConcurrency + } + return nil } @@ -145,29 +149,39 @@ func (a *App) envBindVars() error { return nil } -func (a *App) NatsParams() (nurl, credsFile string, connectTimeout time.Duration, err error) { - if a.v.GetString("nats.url") != "" { - nurl = a.v.GetString("nats.url") +type NatsConfig struct { + NatsURL string + CredsFile string + KVReplicas int + ConnectTimeout time.Duration +} + +func (a *App) NatsParams() (NatsConfig, error) { + cfg := NatsConfig{ + ConnectTimeout: defaultNatsConnectTimeout, } - if nurl == "" { - return "", "", 0, errors.New("missing parameter: nats.url") + if a.v.GetString("nats.url") != "" { + cfg.NatsURL = a.v.GetString("nats.url") + } else { + return NatsConfig{}, errors.New("missing parameter: nats.url") } if a.v.GetString("nats.creds.file") != "" { - credsFile = a.v.GetString("nats.creds.file") + cfg.CredsFile = a.v.GetString("nats.creds.file") + } else { + return NatsConfig{}, errors.New("missing parameter: nats.creds.file") } - if credsFile == "" { - return "", "", 0, errors.New("missing parameter: nats.creds.file") + if a.v.GetDuration("nats.connect.timeout") != 0 { + cfg.ConnectTimeout = a.v.GetDuration("nats.connect.timeout") } - connectTimeout = defaultNatsConnectTimeout - if a.v.GetDuration("nats.connect.timeout") != 0 { - connectTimeout = a.v.GetDuration("nats.connect.timeout") + if a.v.GetInt("nats.kv.replicas") != 0 { + cfg.KVReplicas = a.v.GetInt("nats.kv.replicas") } - return nurl, credsFile, connectTimeout, nil + return cfg, nil } // Server service configuration options