Skip to content

Commit

Permalink
managed lbID should be gidx. Add debug logging (#52)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Siwiec <[email protected]>
  • Loading branch information
rizzza authored Aug 8, 2023
1 parent 2289b0b commit 95701fe
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 50 deletions.
13 changes: 6 additions & 7 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,17 @@ func run(cmdCtx context.Context, v *viper.Viper) error {
cancel()
}()

managedLBID, err := gidx.Parse(viper.GetString("loadbalancer.id"))
if err != nil {
logger.Fatalw("failed to parse loadbalancer.id gidx: %w", err, "loadbalancerID", viper.GetString("loadbalancer.id"))
}

mgr := &manager.Manager{
Context: ctx,
Logger: logger,
DataPlaneClient: dataplaneapi.NewClient(viper.GetString("dataplane.url")),
LBClient: lbapi.NewClient(viper.GetString("loadbalancerapi.url")),
ManagedLBID: viper.GetString("loadbalancer.id"),
ManagedLBID: managedLBID,
BaseCfgPath: viper.GetString("haproxy.config.base"),
}

Expand Down Expand Up @@ -162,12 +167,6 @@ func validateMandatoryFlags() error {

if viper.GetString("loadbalancer.id") == "" {
errs = append(errs, ErrLBIDRequired)
} else {
// check if the loadbalancer id is a valid gidx
_, err := gidx.Parse(viper.GetString("loadbalancer.id"))
if err != nil {
errs = append(errs, ErrLBIDInvalid)
}
}

if len(errs) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion internal/manager/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

var (
// errLoadBalancerIDParamInvalid is returned when an invalid load balancer ID is provided
errLoadBalancerIDParamInvalid = errors.New("optional loadbalancer ID param must be not set or set to a singular loadbalancer ID")
errLoadBalancerIDParamInvalid = errors.New("loadbalancer ID is empty")

// errFrontendSectionLabelFailure is returned when a frontend section cannot be created
errFrontendSectionLabelFailure = errors.New("failed to create frontend section with label")
Expand Down
55 changes: 30 additions & 25 deletions internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.infratographer.com/loadbalancer-manager-haproxy/pkg/lbapi"

"go.infratographer.com/x/events"
"go.infratographer.com/x/gidx"
"go.uber.org/zap"

"github.com/ThreeDotsLabs/watermill/message"
Expand Down Expand Up @@ -46,7 +47,7 @@ type Manager struct {
Subscriber eventSubscriber
DataPlaneClient dataPlaneAPI
LBClient lbAPI
ManagedLBID string
ManagedLBID gidx.PrefixedID
BaseCfgPath string

// currentConfig for unit testing
Expand Down Expand Up @@ -90,11 +91,13 @@ func (m *Manager) Run() error {
// loadbalancerTargeted returns true if this ChangeMessage is targeted to the
// loadbalancerID the manager is configured to act on
func (m Manager) loadbalancerTargeted(msg *events.ChangeMessage) bool {
if msg.SubjectID.String() == m.ManagedLBID {
m.Logger.Debugw("change msg received", "event-type", msg.EventType, "subjectID", msg.SubjectID, "additonalSubjects", msg.AdditionalSubjectIDs)

if msg.SubjectID == m.ManagedLBID {
return true
} else {
for _, subject := range msg.AdditionalSubjectIDs {
if subject.String() == m.ManagedLBID {
if subject == m.ManagedLBID {
return true
}
}
Expand Down Expand Up @@ -122,19 +125,23 @@ func (m *Manager) ProcessMsg(msg *message.Message) error {
return nil
}

m.Logger.Debugw("msg received",
zap.String("loadbalancerID", m.ManagedLBID),
m.Logger.Infow("msg received",
zap.String("loadbalancerID", m.ManagedLBID.String()),
zap.String("event-type", changeMsg.EventType),
zap.String("messageID", msg.UUID),
"message", msg.Payload)
zap.String("message", string(msg.Payload)),
zap.String("subjectID", changeMsg.SubjectID.String()),
"additionalSubjects", changeMsg.AdditionalSubjectIDs)

if err := m.updateConfigToLatest(m.ManagedLBID); err != nil {
if err := m.updateConfigToLatest(); err != nil {
m.Logger.Errorw("failed to update haproxy config",
zap.String("loadbalancerID", m.ManagedLBID),
zap.String("loadbalancerID", m.ManagedLBID.String()),
zap.String("event-type", changeMsg.EventType),
zap.Error(err),
zap.String("messageID", msg.UUID),
"message", msg.Payload)
zap.String("message", string(msg.Payload)),
zap.String("subjectID", changeMsg.SubjectID.String()),
"additionalSubjects", changeMsg.AdditionalSubjectIDs)

return err
}
Expand All @@ -149,31 +156,29 @@ func (m *Manager) ProcessMsg(msg *message.Message) error {
}

// updateConfigToLatest update the haproxy cfg to either baseline or one requested from lbapi with optional lbID param
func (m *Manager) updateConfigToLatest(lbID ...string) error {
if len(lbID) > 1 {
func (m *Manager) updateConfigToLatest() error {
m.Logger.Infow("updating haproxy config", zap.String("loadbalancerID", m.ManagedLBID.String()))

if m.ManagedLBID == "" {
return errLoadBalancerIDParamInvalid
}

m.Logger.Infow("updating haproxy config", zap.String("loadbalancerID", m.ManagedLBID))

// load base config
cfg, err := parser.New(options.Path(m.BaseCfgPath), options.NoNamedDefaultsFrom)
if err != nil {
m.Logger.Fatalw("failed to load haproxy base config", zap.Error(err))
}

if len(lbID) == 1 {
// get desired state from lbapi
lb, err := m.LBClient.GetLoadBalancer(m.Context, m.ManagedLBID)
if err != nil {
return err
}
// get desired state from lbapi
lb, err := m.LBClient.GetLoadBalancer(m.Context, m.ManagedLBID.String())
if err != nil {
return err
}

// merge response
cfg, err = mergeConfig(cfg, lb)
if err != nil {
return err
}
// merge response
cfg, err = mergeConfig(cfg, lb)
if err != nil {
return err
}

// check dataplaneapi to see if a valid config
Expand All @@ -186,7 +191,7 @@ func (m *Manager) updateConfigToLatest(lbID ...string) error {
return err
}

m.Logger.Infow("config successfully updated", zap.String("loadbalancerID", m.ManagedLBID))
m.Logger.Infow("config successfully updated", zap.String("loadbalancerID", m.ManagedLBID.String()))
m.currentConfig = cfg.String() // for testing

return nil
Expand Down
69 changes: 52 additions & 17 deletions internal/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ func TestUpdateConfigToLatest(t *testing.T) {
Logger: logger,
LBClient: mockLBAPI,
BaseCfgPath: testBaseCfgPath,
ManagedLBID: gidx.PrefixedID("loadbal-testing"),
}

err := mgr.updateConfigToLatest("loadbal-test")
err := mgr.updateConfigToLatest()
assert.NotNil(t, err)
})

Expand All @@ -113,9 +114,30 @@ func TestUpdateConfigToLatest(t *testing.T) {
require.Error(t, err)
})

t.Run("errors when manager loadbalancerID is empty", func(t *testing.T) {
mgr := Manager{
Logger: logger,
BaseCfgPath: testBaseCfgPath,
}

err := mgr.updateConfigToLatest()
require.ErrorIs(t, err, errLoadBalancerIDParamInvalid)
})

t.Run("successfully sets initial base config", func(t *testing.T) {
t.Parallel()

mockLBAPI := &mock.LBAPIClient{
DoGetLoadBalancer: func(ctx context.Context, id string) (*lbapi.GetLoadBalancer, error) {
return &lbapi.GetLoadBalancer{
LoadBalancer: lbapi.LoadBalancer{
ID: "loadbal-test",
Ports: lbapi.Ports{},
},
}, nil
},
}

mockDataplaneAPI := &mock.DataplaneAPIClient{
DoPostConfig: func(ctx context.Context, config string) error {
return nil
Expand All @@ -128,7 +150,9 @@ func TestUpdateConfigToLatest(t *testing.T) {
mgr := Manager{
Logger: logger,
DataPlaneClient: mockDataplaneAPI,
LBClient: mockLBAPI,
BaseCfgPath: testBaseCfgPath,
ManagedLBID: gidx.PrefixedID("loadbal-test"),
}

err := mgr.updateConfigToLatest()
Expand Down Expand Up @@ -220,9 +244,10 @@ func TestUpdateConfigToLatest(t *testing.T) {
LBClient: mockLBAPI,
DataPlaneClient: mockDataplaneAPI,
BaseCfgPath: testBaseCfgPath,
ManagedLBID: gidx.PrefixedID("loadbal-test"),
}

err := mgr.updateConfigToLatest("loadbal-test")
err := mgr.updateConfigToLatest()
require.Nil(t, err)

expCfg, err := os.ReadFile(fmt.Sprintf("%s/%s", testDataBaseDir, "lb-ex-1-exp.cfg"))
Expand All @@ -233,33 +258,43 @@ func TestUpdateConfigToLatest(t *testing.T) {
}

func TestLoadBalancerTargeted(t *testing.T) {
l, _ := zap.NewDevelopmentConfig().Build()
logger := l.Sugar()

testcases := []struct {
name string
pubsubMsg events.ChangeMessage
pubsubMsg *events.ChangeMessage
msgTargetedForLB bool
}{
{
name: "subjectID targeted for loadbalancer",
pubsubMsg: events.ChangeMessage{SubjectID: "loadbal-test",
AdditionalSubjectIDs: []gidx.PrefixedID{"loadpol-test"}},
pubsubMsg: &events.ChangeMessage{
SubjectID: gidx.PrefixedID("loadbal-testing"),
AdditionalSubjectIDs: []gidx.PrefixedID{"loadpol-testing"},
},
msgTargetedForLB: true,
},
{
name: "AdditionalSubjectID is targeted for loadbalancer",
pubsubMsg: events.ChangeMessage{SubjectID: "loadprt-test",
AdditionalSubjectIDs: []gidx.PrefixedID{"loadbal-test"}},
pubsubMsg: &events.ChangeMessage{
SubjectID: gidx.PrefixedID("loadprt-testing"),
AdditionalSubjectIDs: []gidx.PrefixedID{"loadbal-testing"},
},
msgTargetedForLB: true,
},
{
name: "msg is not targeted for loadbalancer",
pubsubMsg: events.ChangeMessage{SubjectID: "loadprt-notme",
AdditionalSubjectIDs: []gidx.PrefixedID{"loadbal-notme"}},
pubsubMsg: &events.ChangeMessage{
SubjectID: gidx.PrefixedID("loadprt-nottargeted"),
AdditionalSubjectIDs: []gidx.PrefixedID{"loadbal-nottargeted"},
},
msgTargetedForLB: false,
},
}

mgr := Manager{
ManagedLBID: "loadbal-test",
ManagedLBID: gidx.PrefixedID("loadbal-testing"),
Logger: logger,
}

for _, tt := range testcases {
Expand All @@ -269,7 +304,7 @@ func TestLoadBalancerTargeted(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

targeted := mgr.loadbalancerTargeted(&tt.pubsubMsg)
targeted := mgr.loadbalancerTargeted(tt.pubsubMsg)
assert.Equal(t, tt.msgTargetedForLB, targeted)
})
}
Expand All @@ -283,7 +318,7 @@ func TestProcessMsg(t *testing.T) {

mgr := Manager{
Logger: logger,
ManagedLBID: "loadbal-managedbythisprocess",
ManagedLBID: gidx.PrefixedID("loadbal-managedbythisprocess"),
}

ProcessMsgTests := []struct {
Expand All @@ -302,7 +337,7 @@ func TestProcessMsg(t *testing.T) {
},
{
name: "ignores messages not targeted for this lb",
pubsubMsg: events.ChangeMessage{SubjectID: "loadbal-test", EventType: string(events.CreateChangeType)},
pubsubMsg: events.ChangeMessage{SubjectID: gidx.PrefixedID("loadbal-test"), EventType: string(events.CreateChangeType)},
},
}

Expand Down Expand Up @@ -357,11 +392,11 @@ func TestProcessMsg(t *testing.T) {
Logger: logger,
DataPlaneClient: mockDataplaneAPI,
LBClient: mockLBAPI,
ManagedLBID: "loadbal-managedbythisprocess",
ManagedLBID: gidx.PrefixedID("loadbal-managedbythisprocess"),
}

data, _ := json.Marshal(events.ChangeMessage{
SubjectID: "loadbal-managedbythisprocess",
SubjectID: gidx.PrefixedID("loadbal-managedbythisprocess"),
EventType: string(events.CreateChangeType),
})

Expand Down Expand Up @@ -457,7 +492,7 @@ func TestEventsIntegration(t *testing.T) {
Logger: logger,
DataPlaneClient: mockDataplaneAPI,
LBClient: mockLBAPI,
ManagedLBID: "loadbal-managedbythisprocess",
ManagedLBID: gidx.PrefixedID("loadbal-managedbythisprocess"),
}

// setup timeout context to break free from pubsub Listen()
Expand All @@ -482,7 +517,7 @@ func TestEventsIntegration(t *testing.T) {
context.Background(),
"loadbalancer",
events.ChangeMessage{
SubjectID: "loadbal-managedbythisprocess",
SubjectID: gidx.PrefixedID("loadbal-managedbythisprocess"),
EventType: string(events.CreateChangeType),
})
require.NoError(t, err)
Expand Down

0 comments on commit 95701fe

Please sign in to comment.