Skip to content

Commit

Permalink
support max event msg processing
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Siwiec <[email protected]>
  • Loading branch information
rizzza committed Oct 20, 2023
1 parent bb0c228 commit 99ad40f
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 61 deletions.
3 changes: 2 additions & 1 deletion chart/loadbalancer-provider-haproxy/templates/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ data:
LOADBALANCERPROVIDERHAPROXY_IPAM_ENDPOINT: "{{ .Values.provider.ipam.endpoint }}"
LOADBALANCERPROVIDERHAPROXY_IPBLOCK: "{{ .Values.provider.ipBlock }}"
LOADBALANCERPROVIDERHAPROXY_OIDC_CLIENT_ISSUER: "{{ .Values.provider.api.oidc.client.issuer }}"
LOADBALANCERPROVIDERHAPROXY_MAX_MSG_PROCESS_ATTEMPTS: "{{ .Values.provider.events.maxMsgProcessAttempts }}"
{{- if .Values.provider.tracing.enabled }}
LOADBALANCERPROVIDERHAPROXY_TRACING_ENABLED: "{{ .Values.provider.tracing.enabled }}"
LOADBALANCERPROVIDERHAPROXY_TRACING_PROVIDER: "{{ .Values.provider.tracing.provider }}"
Expand All @@ -27,4 +28,4 @@ data:
LOADBALANCERPROVIDERHAPROXY_TRACING_OTLP_INSECURE: "{{ .Values.provider.tracing.otlp.insecure }}"
LOADBALANCERPROVIDERHAPROXY_TRACING_OTLP_CERTIFICATE: "{{ .Values.provider.tracing.otlp.certificate }}"
{{- end }}
{{- end }}
{{- end }}
2 changes: 2 additions & 0 deletions chart/loadbalancer-provider-haproxy/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ provider:
- "*.load-balancer"
- "events.create.port"
location: ""
# amount of times to retry a failed message before discarding it
maxMsgProcessAttempts: 0
tracing:
# enabled is true if OpenTelemetry tracing should be enabled for load-balancer-operator
enabled: false
Expand Down
20 changes: 12 additions & 8 deletions cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func init() {
processCmd.PersistentFlags().String("ipblock", "", "ip block id to use for requesting load balancer IPs")
viperx.MustBindFlag(viper.GetViper(), "ipblock", processCmd.PersistentFlags().Lookup("ipblock"))

processCmd.PersistentFlags().Uint64("max-msg-process-attempts", 0, "maxiumum number of attempts at processing an event message")
viperx.MustBindFlag(viper.GetViper(), "max-msg-process-attempts", processCmd.PersistentFlags().Lookup("max-msg-process-attempts"))

events.MustViperFlags(viper.GetViper(), processCmd.Flags(), appName)
oauth2x.MustViperFlags(viper.GetViper(), processCmd.Flags())
otelx.MustViperFlags(viper.GetViper(), processCmd.Flags())
Expand Down Expand Up @@ -86,14 +89,15 @@ func process(ctx context.Context, logger *zap.SugaredLogger) error {
}

server := &server.Server{
Context: cx,
Debug: viper.GetBool("logging.debug"),
Echo: eSrv,
Locations: viper.GetStringSlice("event-locations"),
Logger: logger,
EventsConnection: conn,
ChangeTopics: viper.GetStringSlice("change-topics"),
IPBlock: viper.GetString("ipblock"),
Context: cx,
Debug: viper.GetBool("logging.debug"),
Echo: eSrv,
Locations: viper.GetStringSlice("event-locations"),
Logger: logger,
EventsConnection: conn,
ChangeTopics: viper.GetStringSlice("change-topics"),
IPBlock: viper.GetString("ipblock"),
MaxProcessMsgAttempts: viper.GetUint64("max-msg-process-attempts"),
}

// init lbapi client and ipam client
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/stretchr/testify v1.8.4
go.infratographer.com/ipam-api v0.0.4
go.infratographer.com/load-balancer-api v0.0.26
go.infratographer.com/loadbalancer-manager-haproxy v0.0.4
go.infratographer.com/x v0.3.8
go.opentelemetry.io/otel v1.16.0
go.uber.org/zap v1.25.0
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
)
Expand Down Expand Up @@ -69,7 +69,6 @@ require (
github.com/valyala/fasttemplate v1.2.2 // indirect
go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.42.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,6 @@ go.infratographer.com/ipam-api v0.0.4 h1:MDCHXJcl9o4XcM/Lu0xUxkrYEYzIfmtvCEcTuJe
go.infratographer.com/ipam-api v0.0.4/go.mod h1:thDypoVMMbTmR1cpOkT2yMNylC/QB233Slm7XTwxc4g=
go.infratographer.com/load-balancer-api v0.0.26 h1:8NKOxha1RIe6LJFEIq9FdC0JpZ0f7fqA8iqhy8LWyAY=
go.infratographer.com/load-balancer-api v0.0.26/go.mod h1:bg/+0/C+LJAKDsDpzm1IeeBTlC/MMO130z4Eh20nwhw=
go.infratographer.com/loadbalancer-manager-haproxy v0.0.4 h1:Av+aL1duiX6hot+wrRGk3ZoChF3es3Zmmroj8NP0H9I=
go.infratographer.com/loadbalancer-manager-haproxy v0.0.4/go.mod h1:9JG9KPPEvL46GMAX4e5ZEx3SQgxxOk09oTh0uMVLqhE=
go.infratographer.com/x v0.3.8 h1:ZKL/oeTO8an4p58ZXtDdCMl9DVr7Y+RAY2EVeTf1/Uc=
go.infratographer.com/x v0.3.8/go.mod h1:H8O2vkWmo26WNuQEFS2PlJoms9YLJ7BNiwFNMTwCuuA=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
Expand Down
74 changes: 37 additions & 37 deletions internal/server/handlers.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package server

import (
"errors"
"strings"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"golang.org/x/exp/slices"

"go.infratographer.com/x/events"
Expand All @@ -23,39 +23,53 @@ var (

func (s *Server) ListenChanges(messages <-chan events.Message[events.ChangeMessage]) {
for msg := range messages {
s.processChange(msg)
slogger := s.Logger.With(
"event.message.id", msg.ID(),
"event.message.topic", msg.Topic(),
"event.message.source", msg.Source(),
"event.message.timestamp", msg.Timestamp(),
"event.message.deliveries", msg.Deliveries(),
)

if err := s.processChange(msg); err != nil {
if s.MaxProcessMsgAttempts != 0 && msg.Deliveries()+1 > s.MaxProcessMsgAttempts {
slogger.Warnw("terminating event, too many attempts")

if termErr := msg.Term(); termErr != nil {
slogger.Warnw("error occurred while terminating event")
}
} else if nakErr := msg.Nak(defaultNakDelay); nakErr != nil {
slogger.Warnw("error occurred while naking", "error", nakErr)
}
} else if ackErr := msg.Ack(); ackErr != nil {
slogger.Warnw("error occurred while acking", "error", ackErr)
}
}
}

func (s *Server) processChange(msg events.Message[events.ChangeMessage]) {
func (s *Server) processChange(msg events.Message[events.ChangeMessage]) error {
var lb *loadbalancer.LoadBalancer

var err error

m := msg.Message()

ctx, span := otel.Tracer(instrumentationName).Start(m.GetTraceContext(s.Context), "processChange")
defer span.End()
defer func() {
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

span.End()
}()

if slices.ContainsFunc(m.AdditionalSubjectIDs, s.LocationCheck) || len(s.Locations) == 0 {
if m.EventType != string(events.DeleteChangeType) {
lb, err = loadbalancer.NewLoadBalancer(ctx, s.Logger, s.APIClient, m.SubjectID, m.AdditionalSubjectIDs)

Check failure on line 69 in internal/server/handlers.go

View workflow job for this annotation

GitHub Actions / lint

SA4006: this value of `lb` is never used (staticcheck)
if err != nil {
s.Logger.Errorw("unable to initialize loadbalancer", "error", err, "messageID", msg.ID(), "message", m)

if errors.Is(err, lbapi.ErrLBNotfound) {
// ack and ignore
if err = msg.Ack(); err != nil {
s.Logger.Errorw("unable to ack message", "error", err, "messageID", msg.ID(), "message", m)
}
} else {
// nack and retry
if err = msg.Nak(defaultNakDelay); err != nil {
s.Logger.Errorw("unable to nack message", "error", err, "messageID", msg.ID(), "message", m)
}

return
}
return err
} else {
loc := s.GetLocation(m.AdditionalSubjectIDs)

Expand All @@ -70,7 +84,7 @@ func (s *Server) processChange(msg events.Message[events.ChangeMessage]) {
}
}

if lb != nil && lb.LbType != loadbalancer.TypeNoLB {
if err == nil && lb != nil && lb.LbType != loadbalancer.TypeNoLB {
span.SetAttributes(
attribute.String("loadbalancer.id", lb.LoadBalancerID.String()),
attribute.String("message.event", m.EventType),
Expand All @@ -84,37 +98,23 @@ func (s *Server) processChange(msg events.Message[events.ChangeMessage]) {

if err := s.processLoadBalancerChangeCreate(ctx, lb); err != nil {
s.Logger.Errorw("handler unable to request address for loadbalancer", "error", err, "loadbalancer", lb.LoadBalancerID.String())

if err = msg.Nak(defaultNakDelay); err != nil {
s.Logger.Errorw("unable to nack message", "error", err, "messageID", msg.ID(), "message", m)
}

return
return err
}
case m.EventType == string(events.DeleteChangeType) && lb.LbType == loadbalancer.TypeLB:
s.Logger.Debugw("releasing address from loadbalancer", "loadbalancer", lb.LoadBalancerID.String())

if err := s.processLoadBalancerChangeDelete(ctx, lb); err != nil {
s.Logger.Errorw("handler unable to release address from loadbalancer", "error", err, "loadbalancer", lb.LoadBalancerID.String())

if err = msg.Nak(defaultNakDelay); err != nil {
s.Logger.Errorw("unable to nack message", "error", err, "messageID", msg.ID(), "message", m)
}

return
return err
}
default:
s.Logger.Debugw("Ignoring event", "loadbalancer", lb.LoadBalancerID.String(), "message", m)
}
}
}

// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
if err = msg.Ack(); err != nil {
s.Logger.Errorw("unable to ack message", "error", err, "messageID", msg.ID(), "message", m)
}
}

return nil
}

func (s *Server) LocationCheck(i gidx.PrefixedID) bool {
Expand Down
23 changes: 12 additions & 11 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ const instrumentationName = "go.infratographer.com/loadbalancer-provider-haproxy

// Server holds options for server connectivity and settings
type Server struct {
APIClient *lbapi.Client
IPAMClient *ipamclient.Client
Context context.Context
Debug bool
Echo *echox.Server
IPBlock string
Locations []string
Logger *zap.SugaredLogger
Publisher *events.Publisher
EventsConnection events.Connection
ChangeTopics []string
APIClient *lbapi.Client
IPAMClient *ipamclient.Client
Context context.Context
Debug bool
Echo *echox.Server
IPBlock string
Locations []string
Logger *zap.SugaredLogger
Publisher *events.Publisher
EventsConnection events.Connection
ChangeTopics []string
MaxProcessMsgAttempts uint64

ChangeChannels []<-chan events.Message[events.ChangeMessage]
}
Expand Down

0 comments on commit 99ad40f

Please sign in to comment.