diff --git a/go.mod b/go.mod index d14b69f..67d09fc 100644 --- a/go.mod +++ b/go.mod @@ -7,8 +7,8 @@ require ( github.com/kelseyhightower/envconfig v1.4.0 github.com/metal-stack/go-hal v0.4.2 github.com/metal-stack/metal-go v0.18.5 - github.com/metal-stack/metal-lib v0.9.2 github.com/metal-stack/v v1.0.3 + github.com/nsqio/go-nsq v1.1.0 github.com/stretchr/testify v1.8.0 go.uber.org/zap v1.21.0 golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d @@ -49,9 +49,9 @@ require ( github.com/lestrrat-go/option v1.0.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/metal-stack/masterdata-api v0.8.12 // indirect + github.com/metal-stack/metal-lib v0.9.2 // indirect github.com/metal-stack/security v0.6.4 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect - github.com/nsqio/go-nsq v1.1.0 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/go.sum b/go.sum index c59f2c5..b9c8703 100644 --- a/go.sum +++ b/go.sum @@ -44,8 +44,6 @@ github.com/avast/retry-go/v4 v4.1.0 h1:CwudD9anYv6JMVnDuTRlK6kLo4dBamiL+F3U8YDiy github.com/avast/retry-go/v4 v4.1.0/go.mod h1:HqmLvS2VLdStPCGDFjSuZ9pzlTqVRldCI4w2dO4m1Ms= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= -github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ= -github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= @@ -208,7 +206,6 @@ github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFF github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= -github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4= github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= @@ -265,10 +262,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/nsqio/go-diskqueue v1.1.0 h1:r0dJ0DMXT3+2mOq+79cvCjnhoBxyGC2S9O+OjQrpe4Q= github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= -github.com/nsqio/nsq v1.2.1 h1:ZVjANYLnX1vPLmuSNCOdiw4nNPnzWgAC4t8wFhznMqU= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= diff --git a/internal/bmc/nsq.go b/internal/bmc/nsq.go index 31f52e3..118d5a7 100644 --- a/internal/bmc/nsq.go +++ b/internal/bmc/nsq.go @@ -1,108 +1,120 @@ package bmc import ( + "crypto/tls" + "crypto/x509" + "encoding/json" "fmt" - "strings" - "time" + "os" "github.com/metal-stack/go-hal" + "github.com/nsqio/go-nsq" +) - "github.com/metal-stack/metal-lib/bus" +const ( + mqChannel = "core" ) -// timeout for the nsq handler methods -const receiverHandlerTimeout = 15 * time.Second - -func mapLogLevel(level string) bus.Level { - switch strings.ToLower(level) { - case "debug": - return bus.Debug - case "info": - return bus.Info - case "warn", "warning": - return bus.Warning - case "error": - return bus.Error - default: - return bus.Warning +func (b *BMCService) InitConsumer() error { + caCert, err := os.ReadFile(b.mqCACertFile) + if err != nil { + return fmt.Errorf("failed to load cert: %w", err) } -} -func (b *BMCService) timeoutHandler(err bus.TimeoutError) error { - b.log.Errorw("timeout processing event", "event", err.Event()) - return nil -} + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) -func (b *BMCService) InitConsumer() error { - tlsCfg := &bus.TLSConfig{ - CACertFile: b.mqCACertFile, - ClientCertFile: b.mqClientCertFile, + cert, err := tls.LoadX509KeyPair(b.mqClientCertFile, b.mqClientCertFile) // FIXME: where is the key? + if err != nil { + return err } - c, err := bus.NewConsumer(b.log.Desugar(), tlsCfg, b.mqAddress) + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + ClientCAs: caCertPool, + ClientAuth: tls.RequireAndVerifyClientCert, + MinVersion: tls.VersionTLS13, + } + + config := nsq.NewConfig() + config.TlsConfig = tlsConfig + consumer, err := nsq.NewConsumer(b.machineTopic, mqChannel, config) if err != nil { return err } - err = c.With(bus.LogLevel(mapLogLevel(b.mqLogLevel))). - MustRegister(b.machineTopic, "core"). - Consume(MachineEvent{}, func(message interface{}) error { - event := message.(*MachineEvent) - b.log.Debugw("got message", "topic", b.machineTopic, "channel", "core", "event", event) + consumer.AddHandler(b) - if event.Cmd.IPMI == nil { - return fmt.Errorf("event does not contain ipmi details:%v", event) - } - outBand, err := b.outBand(event.Cmd.IPMI) - if err != nil { - b.log.Errorw("error creating outband connection", "error", err) - return err - } + err = consumer.ConnectToNSQD(b.mqAddress) // FIXME: must point to NSQd, not lookupd + if err != nil { + return err + } - switch event.Type { - case Delete: - err := outBand.BootFrom(hal.BootTargetPXE) - if err != nil { - return err - } - return outBand.PowerCycle() - case Command: - switch event.Cmd.Command { - case MachineOnCmd: - return outBand.PowerOn() - case MachineOffCmd: - return outBand.PowerOff() - case MachineResetCmd: - return outBand.PowerReset() - case MachineCycleCmd: - return outBand.PowerCycle() - case MachineBiosCmd: - return outBand.BootFrom(hal.BootTargetBIOS) - case MachineDiskCmd: - return outBand.BootFrom(hal.BootTargetDisk) - case MachinePxeCmd: - return outBand.BootFrom(hal.BootTargetPXE) - case MachineReinstallCmd: - err := outBand.BootFrom(hal.BootTargetPXE) - if err != nil { - return err - } - return outBand.PowerReset() - case ChassisIdentifyLEDOnCmd: - return outBand.IdentifyLEDOn() - case ChassisIdentifyLEDOffCmd: - return outBand.IdentifyLEDOff() - case UpdateFirmwareCmd: - return b.UpdateFirmware(outBand, event) - default: - b.log.Errorw("unhandled command", "topic", b.machineTopic, "channel", "core", "event", event) - } - case Create, Update: - fallthrough - default: - b.log.Warnw("unhandled event", "topic", b.machineTopic, "channel", "core", "event", event) - } - return nil - }, 5, bus.Timeout(receiverHandlerTimeout, b.timeoutHandler), bus.TTL(b.machineTopicTTL)) + // TODO: Do we need a timeout and ttl handler? return err } + +func (b *BMCService) HandleMessage(message *nsq.Message) error { + var event MachineEvent + err := json.Unmarshal(message.Body, &event) + if err != nil { + return err + } + + b.log.Debugw("got message", "topic", b.machineTopic, "channel", mqChannel, "event", event) + + if event.Cmd.IPMI == nil { + return fmt.Errorf("event does not contain ipmi details:%v", event) + } + outBand, err := b.outBand(event.Cmd.IPMI) + if err != nil { + b.log.Errorw("error creating outband connection", "error", err) + return err + } + + switch event.Type { + case Delete: + err := outBand.BootFrom(hal.BootTargetPXE) + if err != nil { + return err + } + return outBand.PowerCycle() + case Command: + switch event.Cmd.Command { + case MachineOnCmd: + return outBand.PowerOn() + case MachineOffCmd: + return outBand.PowerOff() + case MachineResetCmd: + return outBand.PowerReset() + case MachineCycleCmd: + return outBand.PowerCycle() + case MachineBiosCmd: + return outBand.BootFrom(hal.BootTargetBIOS) + case MachineDiskCmd: + return outBand.BootFrom(hal.BootTargetDisk) + case MachinePxeCmd: + return outBand.BootFrom(hal.BootTargetPXE) + case MachineReinstallCmd: + err := outBand.BootFrom(hal.BootTargetPXE) + if err != nil { + return err + } + return outBand.PowerReset() + case ChassisIdentifyLEDOnCmd: + return outBand.IdentifyLEDOn() + case ChassisIdentifyLEDOffCmd: + return outBand.IdentifyLEDOff() + case UpdateFirmwareCmd: + return b.UpdateFirmware(outBand, &event) + default: + b.log.Errorw("unhandled command", "topic", b.machineTopic, "channel", "core", "event", event) + } + case Create, Update: + fallthrough + default: + b.log.Warnw("unhandled event", "topic", b.machineTopic, "channel", "core", "event", event) + } + return nil +}