diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f1ef19c..e2f1604 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -54,7 +54,6 @@ jobs: with: go-version-file: './go.mod' check-latest: true - cache: true - name: Install Ko uses: imjasonh/setup-ko@v0.6 - name: Release a New Version diff --git a/README.md b/README.md index 59fe2a4..b10e70c 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,6 @@ This is still a proof of concept, until Prometheus releases https://github.com/p Here's what will come next! -- Prometheus Metrics! - Proxy to route requests to the leader! - Release pipeline for the Helm chart? - (optional) Notify prometheus using signal ? diff --git a/api/server.go b/api/server.go new file mode 100644 index 0000000..d76df93 --- /dev/null +++ b/api/server.go @@ -0,0 +1,60 @@ +package api + +import ( + "context" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "k8s.io/klog/v2" +) + +type Server struct { + httpSrv http.Server + + shutdownGraceDelay time.Duration +} + +func NewServer(listenAddr string, shutdownGraceDelay time.Duration, metricsRegistry prometheus.Gatherer) *Server { + var mux http.ServeMux + + mux.HandleFunc("/healthz", func(rw http.ResponseWriter, r *http.Request) { rw.WriteHeader(http.StatusOK) }) + mux.Handle("/metrics", promhttp.HandlerFor( + metricsRegistry, + promhttp.HandlerOpts{EnableOpenMetrics: true}, + )) + + return &Server{ + shutdownGraceDelay: shutdownGraceDelay, + httpSrv: http.Server{ + Addr: listenAddr, + Handler: &mux, + }, + } +} + +func (s *Server) Serve(ctx context.Context) error { + shutdownDone := make(chan error) + + go func() { + <-ctx.Done() + + shutdownCtx, cancel := context.WithTimeout(context.Background(), s.shutdownGraceDelay) + defer cancel() + + err := s.httpSrv.Shutdown(shutdownCtx) + if err != nil { + klog.Info("Server shutdown reported an error, forcing close") + err = s.httpSrv.Close() + } + + shutdownDone <- err + }() + + if err := s.httpSrv.ListenAndServe(); err != http.ErrServerClosed { + return err + } + + return <-shutdownDone +} diff --git a/api/server_test.go b/api/server_test.go new file mode 100644 index 0000000..57c8587 --- /dev/null +++ b/api/server_test.go @@ -0,0 +1,58 @@ +package api_test + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/jlevesy/prometheus-elector/api" +) + +func TestServer_Serve(t *testing.T) { + var ( + ctx, cancel = context.WithCancel(context.Background()) + + srv = api.NewServer( + ":63549", + 15*time.Second, + prometheus.NewRegistry(), + ) + + srvDone = make(chan struct{}) + ) + + go func() { + err := srv.Serve(ctx) + require.NoError(t, err) + + close(srvDone) + }() + + var attempt = 0 + + for { + time.Sleep(200 * time.Millisecond) + attempt += 1 + + if attempt == 5 { + t.Fatal("Exausted max retries getting the /healthz endpoint") + } + + resp, err := http.Get("http://localhost:63549/healthz") + if err != nil { + continue + } + if resp.StatusCode != http.StatusOK { + continue + } + + break + } + + cancel() + <-srvDone +} diff --git a/cmd/config.go b/cmd/config.go index b9d54da..78894ad 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -37,6 +37,12 @@ type cliConfig struct { notifyRetryMaxAttempts int notifyRetryDelay time.Duration + // API setup + apiListenAddr string + apiShutdownGraceDelay time.Duration + + runtimeMetrics bool + // Path to a kubeconfig (if running outside from the cluster). kubeConfigPath string } @@ -93,6 +99,14 @@ func (c *cliConfig) validateRuntimeConfig() error { return errors.New("invalid notify-retry-delay, should be >= 1") } + if c.apiListenAddr == "" { + return errors.New("missing api-listen-address") + } + + if c.apiShutdownGraceDelay < 0 { + return errors.New("invalid api-shudown-grace-delay, should be >= 0") + } + return nil } @@ -110,6 +124,9 @@ func (c *cliConfig) setupFlags() { flag.IntVar(&c.notifyRetryMaxAttempts, "notify-retry-max-attempts", 5, "How many times to retry notifying prometheus on failure.") flag.DurationVar(&c.notifyRetryDelay, "notify-retry-delay", 10*time.Second, "How much time to wait between two notify retries.") flag.BoolVar(&c.init, "init", false, "Only init the prometheus config file") + flag.StringVar(&c.apiListenAddr, "api-listen-address", ":9095", "HTTP listen address to use for the API.") + flag.DurationVar(&c.apiShutdownGraceDelay, "api-shutdown-grace-delay", 15*time.Second, "Grace delay to apply when shutting down the API server") + flag.BoolVar(&c.runtimeMetrics, "runtime-metrics", false, "Export go runtime metrics") } // this is how the http standard library validates the method in NewRequestWithContext. diff --git a/cmd/config_test.go b/cmd/config_test.go index f3e8f4c..8f4deff 100644 --- a/cmd/config_test.go +++ b/cmd/config_test.go @@ -46,7 +46,6 @@ func TestCliConfig_ValidateInitConfig(t *testing.T) { testCase.wantErr, testCase.cfg.validateInitConfig(), ) - }) } } @@ -71,6 +70,8 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) { notifyHTTPMethod: http.MethodPost, notifyRetryMaxAttempts: 1, notifyRetryDelay: 10 * time.Second, + apiListenAddr: ":5678", + apiShutdownGraceDelay: 15 * time.Second, }, wantMemberID: "bloupi", wantErr: nil, @@ -85,6 +86,8 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) { notifyHTTPMethod: http.MethodPost, notifyRetryMaxAttempts: 1, notifyRetryDelay: 10 * time.Second, + apiListenAddr: ":5678", + apiShutdownGraceDelay: 15 * time.Second, }, wantMemberID: hostname, wantErr: nil, @@ -98,6 +101,8 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) { notifyHTTPMethod: http.MethodPost, notifyRetryMaxAttempts: 1, notifyRetryDelay: 10 * time.Second, + apiListenAddr: ":5678", + apiShutdownGraceDelay: 15 * time.Second, }, wantErr: errors.New("missing lease-name flag"), }, @@ -110,6 +115,8 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) { notifyHTTPMethod: http.MethodPost, notifyRetryMaxAttempts: 1, notifyRetryDelay: 10 * time.Second, + apiListenAddr: ":5678", + apiShutdownGraceDelay: 15 * time.Second, }, wantErr: errors.New("missing lease-namespace flag"), }, @@ -122,6 +129,8 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) { notifyHTTPMethod: http.MethodPost, notifyRetryMaxAttempts: 1, notifyRetryDelay: 10 * time.Second, + apiListenAddr: ":5678", + apiShutdownGraceDelay: 15 * time.Second, }, wantErr: errors.New("missing notify-http-url flag"), }, @@ -134,6 +143,8 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) { notifyHTTPMethod: "", notifyRetryMaxAttempts: 1, notifyRetryDelay: 10 * time.Second, + apiListenAddr: ":5678", + apiShutdownGraceDelay: 15 * time.Second, }, wantErr: errors.New("invalid notify-http-method"), }, @@ -146,6 +157,8 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) { notifyHTTPMethod: "///3eee", notifyRetryMaxAttempts: 1, notifyRetryDelay: 10 * time.Second, + apiListenAddr: ":5678", + apiShutdownGraceDelay: 15 * time.Second, }, wantErr: errors.New("invalid notify-http-method"), }, @@ -158,6 +171,8 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) { notifyHTTPMethod: http.MethodPost, notifyRetryMaxAttempts: -1, notifyRetryDelay: 10 * time.Second, + apiListenAddr: ":5678", + apiShutdownGraceDelay: 15 * time.Second, }, wantErr: errors.New("invalid notify-retry-max-attempts, should be >= 1"), }, @@ -170,9 +185,39 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) { notifyHTTPMethod: http.MethodPost, notifyRetryMaxAttempts: 1, notifyRetryDelay: -10 * time.Second, + apiListenAddr: ":5678", + apiShutdownGraceDelay: 15 * time.Second, }, wantErr: errors.New("invalid notify-retry-delay, should be >= 1"), }, + { + desc: "missing api-listen-address", + cfg: cliConfig{ + leaseName: "lease", + leaseNamespace: "namespace", + notifyHTTPURL: "http://reload.com", + notifyHTTPMethod: http.MethodPost, + notifyRetryMaxAttempts: 1, + notifyRetryDelay: 10 * time.Second, + apiListenAddr: "", + apiShutdownGraceDelay: 15 * time.Second, + }, + wantErr: errors.New("missing api-listen-address"), + }, + { + desc: "invalid api-shutdown-grace-delay", + cfg: cliConfig{ + leaseName: "lease", + leaseNamespace: "namespace", + notifyHTTPURL: "http://reload.com", + notifyHTTPMethod: http.MethodPost, + notifyRetryMaxAttempts: 1, + notifyRetryDelay: 10 * time.Second, + apiListenAddr: ":5678", + apiShutdownGraceDelay: -15 * time.Second, + }, + wantErr: errors.New("invalid api-shudown-grace-delay, should be >= 0"), + }, } { t.Run(testCase.desc, func(t *testing.T) { assert.Equal( diff --git a/cmd/main.go b/cmd/main.go index 28fde3b..26061b3 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -12,10 +12,13 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + "github.com/jlevesy/prometheus-elector/api" "github.com/jlevesy/prometheus-elector/config" "github.com/jlevesy/prometheus-elector/election" "github.com/jlevesy/prometheus-elector/notifier" "github.com/jlevesy/prometheus-elector/watcher" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" ) func main() { @@ -30,7 +33,7 @@ func main() { flag.Parse() if err := cfg.validateInitConfig(); err != nil { - klog.Fatal("Invalid init config: ", err) + klog.Fatal("Invalid init config: ", err) } reconciller := config.NewReconciller(cfg.configPath, cfg.outputPath) @@ -48,10 +51,21 @@ func main() { klog.Fatal("Invalid election config: ", err) } + metricsRegistry := prometheus.NewRegistry() + if cfg.runtimeMetrics { + metricsRegistry.MustRegister(collectors.NewBuildInfoCollector()) + metricsRegistry.MustRegister(collectors.NewGoCollector( + collectors.WithGoCollectorRuntimeMetrics(collectors.MetricsAll), + )) + } + notifier := notifier.WithRetry( - notifier.NewHTTP( - cfg.notifyHTTPURL, - cfg.notifyHTTPMethod, + notifier.WithMetrics( + metricsRegistry, + notifier.NewHTTP( + cfg.notifyHTTPURL, + cfg.notifyHTTPMethod, + ), ), cfg.notifyRetryMaxAttempts, cfg.notifyRetryDelay, @@ -86,12 +100,19 @@ func main() { k8sClient, reconciller, notifier, + metricsRegistry, ) if err != nil { klog.Fatal("Can't setup election", err) } + apiServer := api.NewServer( + cfg.apiListenAddr, + cfg.apiShutdownGraceDelay, + metricsRegistry, + ) + grp, grpCtx := errgroup.WithContext(ctx) grp.Go(func() error { @@ -99,9 +120,8 @@ func main() { return nil }) - grp.Go(func() error { - return watcher.Watch(grpCtx) - }) + grp.Go(func() error { return watcher.Watch(grpCtx) }) + grp.Go(func() error { return apiServer.Serve(grpCtx) }) if err := grp.Wait(); err != nil { klog.Fatal("leader-agent failed, reason: ", err) diff --git a/election/k8s.go b/election/k8s.go index 426bc31..8a6c167 100644 --- a/election/k8s.go +++ b/election/k8s.go @@ -12,6 +12,7 @@ import ( "github.com/jlevesy/prometheus-elector/config" "github.com/jlevesy/prometheus-elector/notifier" + "github.com/prometheus/client_golang/prometheus" ) type Config struct { @@ -24,7 +25,11 @@ type Config struct { RetryPeriod time.Duration } -func Setup(cfg Config, k8sClient kubernetes.Interface, reconciller *config.Reconciler, notifier notifier.Notifier) (*leaderelection.LeaderElector, error) { +func Setup(cfg Config, k8sClient kubernetes.Interface, reconciller *config.Reconciler, notifier notifier.Notifier, reg prometheus.Registerer) (*leaderelection.LeaderElector, error) { + leaderelection.SetProvider(metricsProvider(func() leaderelection.SwitchMetric { + return newLeaderMetrics(reg) + })) + le, err := leaderelection.NewLeaderElector( leaderelection.LeaderElectionConfig{ Lock: &resourcelock.LeaseLock{ @@ -37,6 +42,7 @@ func Setup(cfg Config, k8sClient kubernetes.Interface, reconciller *config.Recon Identity: cfg.MemberID, }, }, + Name: cfg.MemberID, // required to properly set election metrics. ReleaseOnCancel: cfg.ReleaseOnCancel, LeaseDuration: cfg.LeaseDuration, RenewDeadline: cfg.RenewDeadline, diff --git a/election/metrics.go b/election/metrics.go new file mode 100644 index 0000000..8452b6d --- /dev/null +++ b/election/metrics.go @@ -0,0 +1,48 @@ +package election + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "k8s.io/client-go/tools/leaderelection" +) + +type metricsProvider func() leaderelection.SwitchMetric + +func (mp metricsProvider) NewLeaderMetric() leaderelection.SwitchMetric { + return mp() +} + +type leaderMetrics struct { + isLeader *prometheus.GaugeVec + lastTranstitionTime prometheus.Gauge +} + +func newLeaderMetrics(r prometheus.Registerer) *leaderMetrics { + return &leaderMetrics{ + lastTranstitionTime: promauto.With(r).NewGauge( + prometheus.GaugeOpts{ + Namespace: "prometheus_elector", + Name: "election_last_transition_time_seconds", + Help: "last time the member changed status", + }, + ), + isLeader: promauto.With(r).NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "prometheus_elector", + Name: "election_is_leader", + Help: "Set which member is the actual leader of the cluster", + }, + []string{"member_id"}, + ), + } +} + +func (m *leaderMetrics) On(name string) { + m.isLeader.WithLabelValues(name).Set(1.0) + m.lastTranstitionTime.SetToCurrentTime() +} + +func (m *leaderMetrics) Off(name string) { + m.isLeader.WithLabelValues(name).Set(0.0) + m.lastTranstitionTime.SetToCurrentTime() +} diff --git a/example/k8s/agent-values.yaml b/example/k8s/agent-values.yaml index d6bdff9..1f1d61a 100644 --- a/example/k8s/agent-values.yaml +++ b/example/k8s/agent-values.yaml @@ -20,7 +20,10 @@ configFiles: static_configs: - targets: - localhost:9090 - + - job_name: prometheus-elector + static_configs: + - targets: + - localhost:9095/metrics - job_name: 'kubernetes-nodes' scheme: https tls_config: diff --git a/helm/templates/statefulset.yaml b/helm/templates/statefulset.yaml index b003ff7..1567211 100644 --- a/helm/templates/statefulset.yaml +++ b/helm/templates/statefulset.yaml @@ -52,13 +52,26 @@ spec: - -config=/etc/config/prometheus-elector.yaml - -output=/etc/runtime/prometheus.yaml - -notify-http-url=http://127.0.0.1:9090/-/reload + - -api-listen-address=:9095 {{- range $arg := .Values.elector.args }} - {{ $arg }} {{- end }} + ports: + - name: http + containerPort: 9095 + protocol: TCP securityContext: {{- toYaml .Values.securityContext | nindent 12 }} resources: {{- toYaml .Values.resources | nindent 12 }} +{{- if .Values.elector.readinessProbe }} + readinessProbe: +{{ toYaml .Values.elector.readinessProbe | indent 12 }} + {{- end }} + {{- if .Values.elector.livenessProbe }} + livenessProbe: +{{ toYaml .Values.elector.livenessProbe | indent 12 }} + {{- end }} volumeMounts: - name: config-volume mountPath: /etc/config diff --git a/helm/values.yaml b/helm/values.yaml index 57a3bcb..807092b 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -10,6 +10,24 @@ elector: image: repository: ghcr.io/jlevesy/prometheus-elector pullPolicy: IfNotPresent + readinessProbe: + httpGet: + path: /healthz + port: http + initialDelaySeconds: 30 + periodSeconds: 5 + timeoutSeconds: 4 + failureThreshold: 3 + successThreshold: 1 + livenessProbe: + httpGet: + path: /healthz + port: http + initialDelaySeconds: 30 + periodSeconds: 15 + timeoutSeconds: 10 + failureThreshold: 3 + successThreshold: 1 prometheus: env: diff --git a/notifier/metrics.go b/notifier/metrics.go new file mode 100644 index 0000000..84b4746 --- /dev/null +++ b/notifier/metrics.go @@ -0,0 +1,59 @@ +package notifier + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type metricsNotifier struct { + next Notifier + + total prometheus.Counter + errors prometheus.Counter + duration prometheus.Histogram +} + +func WithMetrics(reg prometheus.Registerer, notifier Notifier) Notifier { + return &metricsNotifier{ + next: notifier, + + total: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Namespace: "prometheus_elector", + Name: "notifier_calls_total", + Help: "The total amount of times Prometheus Elector notified Prometheus about a configuration update", + }), + errors: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Namespace: "prometheus_elector", + Name: "notifier_calls_errors", + Help: "The total amount of times Prometheus Elector failed to notify Prometheus about a configuration update", + }), + duration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Namespace: "prometheus_elector", + Name: "notifier_calls_duration_seconds", + Help: "The time it took to notify prometheus about a configuration update", + }), + } +} + +func (m *metricsNotifier) Notify(ctx context.Context) error { + return m.intrument(func() error { + return m.next.Notify(ctx) + }) +} + +func (m *metricsNotifier) intrument(cb func() error) error { + startTime := time.Now() + + err := cb() + + if err != nil { + m.errors.Inc() + } + + m.duration.Observe(time.Since(startTime).Seconds()) + m.total.Inc() + return err +} diff --git a/notifier/http_test.go b/notifier/notifier_test.go similarity index 59% rename from notifier/http_test.go rename to notifier/notifier_test.go index 3738866..8ee19b8 100644 --- a/notifier/http_test.go +++ b/notifier/notifier_test.go @@ -1,12 +1,15 @@ package notifier_test import ( + "bytes" "context" "net/http" "net/http/httptest" "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -16,6 +19,7 @@ import ( func TestHTTPNotifierWithRetries(t *testing.T) { var ( totalReceived int + reg = prometheus.NewRegistry() ctx = context.Background() srv = httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { require.Equal(t, r.Method, http.MethodPost) @@ -26,21 +30,39 @@ func TestHTTPNotifierWithRetries(t *testing.T) { } })) notifier = notifier.WithRetry( - notifier.NewHTTP( - srv.URL, - http.MethodPost, + notifier.WithMetrics( + reg, + notifier.NewHTTP( + srv.URL, + http.MethodPost, + ), ), 10, 0*time.Second, ) ) + const wantMetrics = ` +# HELP prometheus_elector_notifier_calls_errors The total amount of times Prometheus Elector failed to notify Prometheus about a configuration update +# TYPE prometheus_elector_notifier_calls_errors counter +prometheus_elector_notifier_calls_errors 4 +# HELP prometheus_elector_notifier_calls_total The total amount of times Prometheus Elector notified Prometheus about a configuration update +# TYPE prometheus_elector_notifier_calls_total counter +prometheus_elector_notifier_calls_total{} 5 +` + defer srv.Close() err := notifier.Notify(ctx) require.NoError(t, err) assert.Equal(t, 5, totalReceived) + assert.NoError(t, testutil.GatherAndCompare( + reg, + bytes.NewBuffer([]byte(wantMetrics)), + "prometheus_elector_notifier_calls_errors", + "prometheus_elector_notifier_calls_total", + )) } func TestHTTPNotifierExhaustRetries(t *testing.T) {