From 99ad40f844f5dc9fb68ed5d1dc2839f2ea988489 Mon Sep 17 00:00:00 2001 From: Matt Siwiec Date: Fri, 20 Oct 2023 14:26:20 -0600 Subject: [PATCH] support max event msg processing Signed-off-by: Matt Siwiec --- .../templates/config.yaml | 3 +- .../loadbalancer-provider-haproxy/values.yaml | 2 + cmd/process.go | 20 +++-- go.mod | 3 +- go.sum | 2 - internal/server/handlers.go | 74 +++++++++---------- internal/server/server.go | 23 +++--- 7 files changed, 66 insertions(+), 61 deletions(-) diff --git a/chart/loadbalancer-provider-haproxy/templates/config.yaml b/chart/loadbalancer-provider-haproxy/templates/config.yaml index c538f36..56e300c 100644 --- a/chart/loadbalancer-provider-haproxy/templates/config.yaml +++ b/chart/loadbalancer-provider-haproxy/templates/config.yaml @@ -13,6 +13,7 @@ data: LOADBALANCERPROVIDERHAPROXY_IPAM_ENDPOINT: "{{ .Values.provider.ipam.endpoint }}" LOADBALANCERPROVIDERHAPROXY_IPBLOCK: "{{ .Values.provider.ipBlock }}" LOADBALANCERPROVIDERHAPROXY_OIDC_CLIENT_ISSUER: "{{ .Values.provider.api.oidc.client.issuer }}" + LOADBALANCERPROVIDERHAPROXY_MAX_MSG_PROCESS_ATTEMPTS: "{{ .Values.provider.events.maxMsgProcessAttempts }}" {{- if .Values.provider.tracing.enabled }} LOADBALANCERPROVIDERHAPROXY_TRACING_ENABLED: "{{ .Values.provider.tracing.enabled }}" LOADBALANCERPROVIDERHAPROXY_TRACING_PROVIDER: "{{ .Values.provider.tracing.provider }}" @@ -27,4 +28,4 @@ data: LOADBALANCERPROVIDERHAPROXY_TRACING_OTLP_INSECURE: "{{ .Values.provider.tracing.otlp.insecure }}" LOADBALANCERPROVIDERHAPROXY_TRACING_OTLP_CERTIFICATE: "{{ .Values.provider.tracing.otlp.certificate }}" {{- end }} -{{- end }} \ No newline at end of file +{{- end }} diff --git a/chart/loadbalancer-provider-haproxy/values.yaml b/chart/loadbalancer-provider-haproxy/values.yaml index 8d83c7c..b1bc5a1 100644 --- a/chart/loadbalancer-provider-haproxy/values.yaml +++ b/chart/loadbalancer-provider-haproxy/values.yaml @@ -51,6 +51,8 @@ provider: - "*.load-balancer" - "events.create.port" location: "" + # amount of times to retry a failed message before discarding it + maxMsgProcessAttempts: 0 tracing: # enabled is true if OpenTelemetry tracing should be enabled for load-balancer-operator enabled: false diff --git a/cmd/process.go b/cmd/process.go index de18eb7..1c842e5 100644 --- a/cmd/process.go +++ b/cmd/process.go @@ -56,6 +56,9 @@ func init() { processCmd.PersistentFlags().String("ipblock", "", "ip block id to use for requesting load balancer IPs") viperx.MustBindFlag(viper.GetViper(), "ipblock", processCmd.PersistentFlags().Lookup("ipblock")) + processCmd.PersistentFlags().Uint64("max-msg-process-attempts", 0, "maxiumum number of attempts at processing an event message") + viperx.MustBindFlag(viper.GetViper(), "max-msg-process-attempts", processCmd.PersistentFlags().Lookup("max-msg-process-attempts")) + events.MustViperFlags(viper.GetViper(), processCmd.Flags(), appName) oauth2x.MustViperFlags(viper.GetViper(), processCmd.Flags()) otelx.MustViperFlags(viper.GetViper(), processCmd.Flags()) @@ -86,14 +89,15 @@ func process(ctx context.Context, logger *zap.SugaredLogger) error { } server := &server.Server{ - Context: cx, - Debug: viper.GetBool("logging.debug"), - Echo: eSrv, - Locations: viper.GetStringSlice("event-locations"), - Logger: logger, - EventsConnection: conn, - ChangeTopics: viper.GetStringSlice("change-topics"), - IPBlock: viper.GetString("ipblock"), + Context: cx, + Debug: viper.GetBool("logging.debug"), + Echo: eSrv, + Locations: viper.GetStringSlice("event-locations"), + Logger: logger, + EventsConnection: conn, + ChangeTopics: viper.GetStringSlice("change-topics"), + IPBlock: viper.GetString("ipblock"), + MaxProcessMsgAttempts: viper.GetUint64("max-msg-process-attempts"), } // init lbapi client and ipam client diff --git a/go.mod b/go.mod index 66576be..362945b 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,8 @@ require ( github.com/stretchr/testify v1.8.4 go.infratographer.com/ipam-api v0.0.4 go.infratographer.com/load-balancer-api v0.0.26 - go.infratographer.com/loadbalancer-manager-haproxy v0.0.4 go.infratographer.com/x v0.3.8 + go.opentelemetry.io/otel v1.16.0 go.uber.org/zap v1.25.0 golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 ) @@ -69,7 +69,6 @@ require ( github.com/valyala/fasttemplate v1.2.2 // indirect go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.42.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect - go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.16.0 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect diff --git a/go.sum b/go.sum index ca7bf64..18aee53 100644 --- a/go.sum +++ b/go.sum @@ -311,8 +311,6 @@ go.infratographer.com/ipam-api v0.0.4 h1:MDCHXJcl9o4XcM/Lu0xUxkrYEYzIfmtvCEcTuJe go.infratographer.com/ipam-api v0.0.4/go.mod h1:thDypoVMMbTmR1cpOkT2yMNylC/QB233Slm7XTwxc4g= go.infratographer.com/load-balancer-api v0.0.26 h1:8NKOxha1RIe6LJFEIq9FdC0JpZ0f7fqA8iqhy8LWyAY= go.infratographer.com/load-balancer-api v0.0.26/go.mod h1:bg/+0/C+LJAKDsDpzm1IeeBTlC/MMO130z4Eh20nwhw= -go.infratographer.com/loadbalancer-manager-haproxy v0.0.4 h1:Av+aL1duiX6hot+wrRGk3ZoChF3es3Zmmroj8NP0H9I= -go.infratographer.com/loadbalancer-manager-haproxy v0.0.4/go.mod h1:9JG9KPPEvL46GMAX4e5ZEx3SQgxxOk09oTh0uMVLqhE= go.infratographer.com/x v0.3.8 h1:ZKL/oeTO8an4p58ZXtDdCMl9DVr7Y+RAY2EVeTf1/Uc= go.infratographer.com/x v0.3.8/go.mod h1:H8O2vkWmo26WNuQEFS2PlJoms9YLJ7BNiwFNMTwCuuA= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= diff --git a/internal/server/handlers.go b/internal/server/handlers.go index 63ecb31..b7f08fc 100644 --- a/internal/server/handlers.go +++ b/internal/server/handlers.go @@ -1,12 +1,12 @@ package server import ( - "errors" "strings" "time" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "golang.org/x/exp/slices" "go.infratographer.com/x/events" @@ -23,11 +23,31 @@ var ( func (s *Server) ListenChanges(messages <-chan events.Message[events.ChangeMessage]) { for msg := range messages { - s.processChange(msg) + slogger := s.Logger.With( + "event.message.id", msg.ID(), + "event.message.topic", msg.Topic(), + "event.message.source", msg.Source(), + "event.message.timestamp", msg.Timestamp(), + "event.message.deliveries", msg.Deliveries(), + ) + + if err := s.processChange(msg); err != nil { + if s.MaxProcessMsgAttempts != 0 && msg.Deliveries()+1 > s.MaxProcessMsgAttempts { + slogger.Warnw("terminating event, too many attempts") + + if termErr := msg.Term(); termErr != nil { + slogger.Warnw("error occurred while terminating event") + } + } else if nakErr := msg.Nak(defaultNakDelay); nakErr != nil { + slogger.Warnw("error occurred while naking", "error", nakErr) + } + } else if ackErr := msg.Ack(); ackErr != nil { + slogger.Warnw("error occurred while acking", "error", ackErr) + } } } -func (s *Server) processChange(msg events.Message[events.ChangeMessage]) { +func (s *Server) processChange(msg events.Message[events.ChangeMessage]) error { var lb *loadbalancer.LoadBalancer var err error @@ -35,27 +55,21 @@ func (s *Server) processChange(msg events.Message[events.ChangeMessage]) { m := msg.Message() ctx, span := otel.Tracer(instrumentationName).Start(m.GetTraceContext(s.Context), "processChange") - defer span.End() + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + span.End() + }() if slices.ContainsFunc(m.AdditionalSubjectIDs, s.LocationCheck) || len(s.Locations) == 0 { if m.EventType != string(events.DeleteChangeType) { lb, err = loadbalancer.NewLoadBalancer(ctx, s.Logger, s.APIClient, m.SubjectID, m.AdditionalSubjectIDs) if err != nil { s.Logger.Errorw("unable to initialize loadbalancer", "error", err, "messageID", msg.ID(), "message", m) - - if errors.Is(err, lbapi.ErrLBNotfound) { - // ack and ignore - if err = msg.Ack(); err != nil { - s.Logger.Errorw("unable to ack message", "error", err, "messageID", msg.ID(), "message", m) - } - } else { - // nack and retry - if err = msg.Nak(defaultNakDelay); err != nil { - s.Logger.Errorw("unable to nack message", "error", err, "messageID", msg.ID(), "message", m) - } - - return - } + return err } else { loc := s.GetLocation(m.AdditionalSubjectIDs) @@ -70,7 +84,7 @@ func (s *Server) processChange(msg events.Message[events.ChangeMessage]) { } } - if lb != nil && lb.LbType != loadbalancer.TypeNoLB { + if err == nil && lb != nil && lb.LbType != loadbalancer.TypeNoLB { span.SetAttributes( attribute.String("loadbalancer.id", lb.LoadBalancerID.String()), attribute.String("message.event", m.EventType), @@ -84,37 +98,23 @@ func (s *Server) processChange(msg events.Message[events.ChangeMessage]) { if err := s.processLoadBalancerChangeCreate(ctx, lb); err != nil { s.Logger.Errorw("handler unable to request address for loadbalancer", "error", err, "loadbalancer", lb.LoadBalancerID.String()) - - if err = msg.Nak(defaultNakDelay); err != nil { - s.Logger.Errorw("unable to nack message", "error", err, "messageID", msg.ID(), "message", m) - } - - return + return err } case m.EventType == string(events.DeleteChangeType) && lb.LbType == loadbalancer.TypeLB: s.Logger.Debugw("releasing address from loadbalancer", "loadbalancer", lb.LoadBalancerID.String()) if err := s.processLoadBalancerChangeDelete(ctx, lb); err != nil { s.Logger.Errorw("handler unable to release address from loadbalancer", "error", err, "loadbalancer", lb.LoadBalancerID.String()) - - if err = msg.Nak(defaultNakDelay); err != nil { - s.Logger.Errorw("unable to nack message", "error", err, "messageID", msg.ID(), "message", m) - } - - return + return err } default: s.Logger.Debugw("Ignoring event", "loadbalancer", lb.LoadBalancerID.String(), "message", m) } } } - - // we need to Acknowledge that we received and processed the message, - // otherwise, it will be resent over and over again. - if err = msg.Ack(); err != nil { - s.Logger.Errorw("unable to ack message", "error", err, "messageID", msg.ID(), "message", m) - } } + + return nil } func (s *Server) LocationCheck(i gidx.PrefixedID) bool { diff --git a/internal/server/server.go b/internal/server/server.go index 4eef9b9..83255c9 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -16,17 +16,18 @@ const instrumentationName = "go.infratographer.com/loadbalancer-provider-haproxy // Server holds options for server connectivity and settings type Server struct { - APIClient *lbapi.Client - IPAMClient *ipamclient.Client - Context context.Context - Debug bool - Echo *echox.Server - IPBlock string - Locations []string - Logger *zap.SugaredLogger - Publisher *events.Publisher - EventsConnection events.Connection - ChangeTopics []string + APIClient *lbapi.Client + IPAMClient *ipamclient.Client + Context context.Context + Debug bool + Echo *echox.Server + IPBlock string + Locations []string + Logger *zap.SugaredLogger + Publisher *events.Publisher + EventsConnection events.Connection + ChangeTopics []string + MaxProcessMsgAttempts uint64 ChangeChannels []<-chan events.Message[events.ChangeMessage] }