-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
100 additions
and
93 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,108 +1,120 @@ | ||
package bmc | ||
|
||
import ( | ||
"crypto/tls" | ||
"crypto/x509" | ||
"encoding/json" | ||
"fmt" | ||
"strings" | ||
"time" | ||
"os" | ||
|
||
"github.com/metal-stack/go-hal" | ||
"github.com/nsqio/go-nsq" | ||
) | ||
|
||
"github.com/metal-stack/metal-lib/bus" | ||
const ( | ||
mqChannel = "core" | ||
) | ||
|
||
// timeout for the nsq handler methods | ||
const receiverHandlerTimeout = 15 * time.Second | ||
|
||
func mapLogLevel(level string) bus.Level { | ||
switch strings.ToLower(level) { | ||
case "debug": | ||
return bus.Debug | ||
case "info": | ||
return bus.Info | ||
case "warn", "warning": | ||
return bus.Warning | ||
case "error": | ||
return bus.Error | ||
default: | ||
return bus.Warning | ||
func (b *BMCService) InitConsumer() error { | ||
caCert, err := os.ReadFile(b.mqCACertFile) | ||
if err != nil { | ||
return fmt.Errorf("failed to load cert: %w", err) | ||
} | ||
} | ||
|
||
func (b *BMCService) timeoutHandler(err bus.TimeoutError) error { | ||
b.log.Errorw("timeout processing event", "event", err.Event()) | ||
return nil | ||
} | ||
caCertPool := x509.NewCertPool() | ||
caCertPool.AppendCertsFromPEM(caCert) | ||
|
||
func (b *BMCService) InitConsumer() error { | ||
tlsCfg := &bus.TLSConfig{ | ||
CACertFile: b.mqCACertFile, | ||
ClientCertFile: b.mqClientCertFile, | ||
cert, err := tls.LoadX509KeyPair(b.mqClientCertFile, b.mqClientCertFile) // FIXME: where is the key? | ||
if err != nil { | ||
return err | ||
} | ||
c, err := bus.NewConsumer(b.log.Desugar(), tlsCfg, b.mqAddress) | ||
|
||
tlsConfig := &tls.Config{ | ||
Certificates: []tls.Certificate{cert}, | ||
ClientCAs: caCertPool, | ||
ClientAuth: tls.RequireAndVerifyClientCert, | ||
MinVersion: tls.VersionTLS13, | ||
} | ||
|
||
config := nsq.NewConfig() | ||
config.TlsConfig = tlsConfig | ||
consumer, err := nsq.NewConsumer(b.machineTopic, mqChannel, config) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = c.With(bus.LogLevel(mapLogLevel(b.mqLogLevel))). | ||
MustRegister(b.machineTopic, "core"). | ||
Consume(MachineEvent{}, func(message interface{}) error { | ||
event := message.(*MachineEvent) | ||
b.log.Debugw("got message", "topic", b.machineTopic, "channel", "core", "event", event) | ||
consumer.AddHandler(b) | ||
|
||
if event.Cmd.IPMI == nil { | ||
return fmt.Errorf("event does not contain ipmi details:%v", event) | ||
} | ||
outBand, err := b.outBand(event.Cmd.IPMI) | ||
if err != nil { | ||
b.log.Errorw("error creating outband connection", "error", err) | ||
return err | ||
} | ||
err = consumer.ConnectToNSQD(b.mqAddress) // FIXME: must point to NSQd, not lookupd | ||
if err != nil { | ||
return err | ||
} | ||
|
||
switch event.Type { | ||
case Delete: | ||
err := outBand.BootFrom(hal.BootTargetPXE) | ||
if err != nil { | ||
return err | ||
} | ||
return outBand.PowerCycle() | ||
case Command: | ||
switch event.Cmd.Command { | ||
case MachineOnCmd: | ||
return outBand.PowerOn() | ||
case MachineOffCmd: | ||
return outBand.PowerOff() | ||
case MachineResetCmd: | ||
return outBand.PowerReset() | ||
case MachineCycleCmd: | ||
return outBand.PowerCycle() | ||
case MachineBiosCmd: | ||
return outBand.BootFrom(hal.BootTargetBIOS) | ||
case MachineDiskCmd: | ||
return outBand.BootFrom(hal.BootTargetDisk) | ||
case MachinePxeCmd: | ||
return outBand.BootFrom(hal.BootTargetPXE) | ||
case MachineReinstallCmd: | ||
err := outBand.BootFrom(hal.BootTargetPXE) | ||
if err != nil { | ||
return err | ||
} | ||
return outBand.PowerReset() | ||
case ChassisIdentifyLEDOnCmd: | ||
return outBand.IdentifyLEDOn() | ||
case ChassisIdentifyLEDOffCmd: | ||
return outBand.IdentifyLEDOff() | ||
case UpdateFirmwareCmd: | ||
return b.UpdateFirmware(outBand, event) | ||
default: | ||
b.log.Errorw("unhandled command", "topic", b.machineTopic, "channel", "core", "event", event) | ||
} | ||
case Create, Update: | ||
fallthrough | ||
default: | ||
b.log.Warnw("unhandled event", "topic", b.machineTopic, "channel", "core", "event", event) | ||
} | ||
return nil | ||
}, 5, bus.Timeout(receiverHandlerTimeout, b.timeoutHandler), bus.TTL(b.machineTopicTTL)) | ||
// TODO: Do we need a timeout and ttl handler? | ||
|
||
return err | ||
} | ||
|
||
func (b *BMCService) HandleMessage(message *nsq.Message) error { | ||
var event MachineEvent | ||
err := json.Unmarshal(message.Body, &event) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
b.log.Debugw("got message", "topic", b.machineTopic, "channel", mqChannel, "event", event) | ||
|
||
if event.Cmd.IPMI == nil { | ||
return fmt.Errorf("event does not contain ipmi details:%v", event) | ||
} | ||
outBand, err := b.outBand(event.Cmd.IPMI) | ||
if err != nil { | ||
b.log.Errorw("error creating outband connection", "error", err) | ||
return err | ||
} | ||
|
||
switch event.Type { | ||
case Delete: | ||
err := outBand.BootFrom(hal.BootTargetPXE) | ||
if err != nil { | ||
return err | ||
} | ||
return outBand.PowerCycle() | ||
case Command: | ||
switch event.Cmd.Command { | ||
case MachineOnCmd: | ||
return outBand.PowerOn() | ||
case MachineOffCmd: | ||
return outBand.PowerOff() | ||
case MachineResetCmd: | ||
return outBand.PowerReset() | ||
case MachineCycleCmd: | ||
return outBand.PowerCycle() | ||
case MachineBiosCmd: | ||
return outBand.BootFrom(hal.BootTargetBIOS) | ||
case MachineDiskCmd: | ||
return outBand.BootFrom(hal.BootTargetDisk) | ||
case MachinePxeCmd: | ||
return outBand.BootFrom(hal.BootTargetPXE) | ||
case MachineReinstallCmd: | ||
err := outBand.BootFrom(hal.BootTargetPXE) | ||
if err != nil { | ||
return err | ||
} | ||
return outBand.PowerReset() | ||
case ChassisIdentifyLEDOnCmd: | ||
return outBand.IdentifyLEDOn() | ||
case ChassisIdentifyLEDOffCmd: | ||
return outBand.IdentifyLEDOff() | ||
case UpdateFirmwareCmd: | ||
return b.UpdateFirmware(outBand, &event) | ||
default: | ||
b.log.Errorw("unhandled command", "topic", b.machineTopic, "channel", "core", "event", event) | ||
} | ||
case Create, Update: | ||
fallthrough | ||
default: | ||
b.log.Warnw("unhandled event", "topic", b.machineTopic, "channel", "core", "event", event) | ||
} | ||
return nil | ||
} |