Skip to content

Commit

Permalink
Wait for Prometheus to be Ready before trying to take leadership (#34)
Browse files Browse the repository at this point in the history
* feat(readiness): add minimal implementation to wait for readiness

* feat(cmd): wait for prometheus to be ready before starting

* chore(*): cleaner logging

* feat(helm): wait for prometheus to be ready

* chore(README): update configuration reference
  • Loading branch information
jlevesy authored May 25, 2023
1 parent 29723ff commit 6ec6364
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 16 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ If the leader proxy is enabled, all HTTP calls received on the port 9095 are for
How many times to retry notifying prometheus on failure. (default 5)
-output string
Path to write the active prometheus configuration
-readiness-http-url string
URL to Prometheus ready endpoint
-readiness-poll-period duration
Poll period prometheus readiness check (default 5s)
-runtime-metrics
Export go runtime metrics
````
```
10 changes: 10 additions & 0 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type cliConfig struct {
notifyRetryMaxAttempts int
notifyRetryDelay time.Duration

// How to wait for prometheus to be ready.
readinessHTTPURL string
readinessPollPeriod time.Duration

// API setup
apiListenAddr string
apiShutdownGraceDelay time.Duration
Expand Down Expand Up @@ -103,6 +107,10 @@ func (c *cliConfig) validateRuntimeConfig() error {
return errors.New("invalid notify-retry-delay, should be >= 1")
}

if c.readinessPollPeriod < 1 {
return errors.New("invalid readiness-poll-period, should be >= 1")
}

if c.apiListenAddr == "" {
return errors.New("missing api-listen-address")
}
Expand Down Expand Up @@ -137,6 +145,8 @@ func (c *cliConfig) setupFlags() {
flag.StringVar(&c.kubeConfigPath, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&c.configPath, "config", "", "Path of the prometheus-elector configuration")
flag.StringVar(&c.outputPath, "output", "", "Path to write the active prometheus configuration")
flag.StringVar(&c.readinessHTTPURL, "readiness-http-url", "", "URL to Prometheus ready endpoint")
flag.DurationVar(&c.readinessPollPeriod, "readiness-poll-period", 5*time.Second, "Poll period prometheus readiness check")
flag.StringVar(&c.notifyHTTPURL, "notify-http-url", "", "URL to the reload configuration endpoint")
flag.StringVar(&c.notifyHTTPMethod, "notify-http-method", http.MethodPost, "HTTP method to use when sending the reload config request.")
flag.IntVar(&c.notifyRetryMaxAttempts, "notify-retry-max-attempts", 5, "How many times to retry notifying prometheus on failure.")
Expand Down
30 changes: 30 additions & 0 deletions cmd/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) {
notifyHTTPMethod: http.MethodPost,
notifyRetryMaxAttempts: 1,
notifyRetryDelay: 10 * time.Second,
readinessPollPeriod: 10 * time.Second,
apiListenAddr: ":5678",
apiShutdownGraceDelay: 15 * time.Second,
},
Expand All @@ -86,6 +87,7 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) {
notifyHTTPMethod: http.MethodPost,
notifyRetryMaxAttempts: 1,
notifyRetryDelay: 10 * time.Second,
readinessPollPeriod: 10 * time.Second,
apiListenAddr: ":5678",
apiShutdownGraceDelay: 15 * time.Second,
},
Expand All @@ -101,6 +103,7 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) {
notifyHTTPMethod: http.MethodPost,
notifyRetryMaxAttempts: 1,
notifyRetryDelay: 10 * time.Second,
readinessPollPeriod: 10 * time.Second,
apiListenAddr: ":5678",
apiShutdownGraceDelay: 15 * time.Second,
},
Expand All @@ -115,6 +118,7 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) {
notifyHTTPMethod: http.MethodPost,
notifyRetryMaxAttempts: 1,
notifyRetryDelay: 10 * time.Second,
readinessPollPeriod: 10 * time.Second,
apiListenAddr: ":5678",
apiShutdownGraceDelay: 15 * time.Second,
},
Expand All @@ -129,6 +133,7 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) {
notifyHTTPMethod: http.MethodPost,
notifyRetryMaxAttempts: 1,
notifyRetryDelay: 10 * time.Second,
readinessPollPeriod: 10 * time.Second,
apiListenAddr: ":5678",
apiShutdownGraceDelay: 15 * time.Second,
},
Expand All @@ -143,6 +148,7 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) {
notifyHTTPMethod: "",
notifyRetryMaxAttempts: 1,
notifyRetryDelay: 10 * time.Second,
readinessPollPeriod: 10 * time.Second,
apiListenAddr: ":5678",
apiShutdownGraceDelay: 15 * time.Second,
},
Expand All @@ -157,6 +163,7 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) {
notifyHTTPMethod: "///3eee",
notifyRetryMaxAttempts: 1,
notifyRetryDelay: 10 * time.Second,
readinessPollPeriod: 10 * time.Second,
apiListenAddr: ":5678",
apiShutdownGraceDelay: 15 * time.Second,
},
Expand All @@ -171,6 +178,7 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) {
notifyHTTPMethod: http.MethodPost,
notifyRetryMaxAttempts: -1,
notifyRetryDelay: 10 * time.Second,
readinessPollPeriod: 10 * time.Second,
apiListenAddr: ":5678",
apiShutdownGraceDelay: 15 * time.Second,
},
Expand All @@ -185,6 +193,7 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) {
notifyHTTPMethod: http.MethodPost,
notifyRetryMaxAttempts: 1,
notifyRetryDelay: -10 * time.Second,
readinessPollPeriod: 10 * time.Second,
apiListenAddr: ":5678",
apiShutdownGraceDelay: 15 * time.Second,
},
Expand All @@ -199,6 +208,7 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) {
notifyHTTPMethod: http.MethodPost,
notifyRetryMaxAttempts: 1,
notifyRetryDelay: 10 * time.Second,
readinessPollPeriod: 10 * time.Second,
apiListenAddr: "",
apiShutdownGraceDelay: 15 * time.Second,
},
Expand All @@ -213,11 +223,28 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) {
notifyHTTPMethod: http.MethodPost,
notifyRetryMaxAttempts: 1,
notifyRetryDelay: 10 * time.Second,
readinessPollPeriod: 10 * time.Second,
apiListenAddr: ":5678",
apiShutdownGraceDelay: -15 * time.Second,
},
wantErr: errors.New("invalid api-shudown-grace-delay, should be >= 0"),
},
{
desc: "invalid readiness poll period",
cfg: cliConfig{
leaseName: "lease",
leaseNamespace: "namespace",
notifyHTTPURL: "http://reload.com",
notifyHTTPMethod: http.MethodPost,
notifyRetryMaxAttempts: 1,
notifyRetryDelay: 10 * time.Second,
readinessPollPeriod: -10 * time.Second,
apiListenAddr: ":5678",
apiShutdownGraceDelay: 15 * time.Second,
},
wantErr: errors.New("invalid readiness-poll-period, should be >= 1"),
},

{
desc: "proxy enabled invalid prometheus local port",
cfg: cliConfig{
Expand All @@ -227,6 +254,7 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) {
notifyHTTPMethod: http.MethodPost,
notifyRetryMaxAttempts: 1,
notifyRetryDelay: 10 * time.Second,
readinessPollPeriod: 10 * time.Second,
apiListenAddr: ":5678",
apiShutdownGraceDelay: 15 * time.Second,
apiProxyEnabled: true,
Expand All @@ -245,6 +273,7 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) {
notifyHTTPMethod: http.MethodPost,
notifyRetryMaxAttempts: 1,
notifyRetryDelay: 10 * time.Second,
readinessPollPeriod: 10 * time.Second,
apiListenAddr: ":5678",
apiShutdownGraceDelay: 15 * time.Second,
apiProxyEnabled: true,
Expand All @@ -263,6 +292,7 @@ func TestCliConfig_ValidateRuntimeConfig(t *testing.T) {
notifyHTTPMethod: http.MethodPost,
notifyRetryMaxAttempts: 1,
notifyRetryDelay: 10 * time.Second,
readinessPollPeriod: 10 * time.Second,
apiListenAddr: ":5678",
apiShutdownGraceDelay: 15 * time.Second,
apiProxyEnabled: true,
Expand Down
27 changes: 19 additions & 8 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/jlevesy/prometheus-elector/config"
"github.com/jlevesy/prometheus-elector/election"
"github.com/jlevesy/prometheus-elector/notifier"
"github.com/jlevesy/prometheus-elector/readiness"
"github.com/jlevesy/prometheus-elector/watcher"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
Expand Down Expand Up @@ -81,12 +82,6 @@ func main() {
klog.Fatal("Can't build the k8s client: ", err)
}

watcher, err := watcher.New(filepath.Dir(cfg.configPath), reconciller, notifier)
if err != nil {
klog.Fatal("Can't create the watcher: ", err)
}
defer watcher.Close()

elector, err := election.Setup(
election.Config{
LeaseName: cfg.leaseName,
Expand All @@ -107,6 +102,12 @@ func main() {
klog.Fatal("Can't setup election", err)
}

watcher, err := watcher.New(filepath.Dir(cfg.configPath), reconciller, notifier)
if err != nil {
klog.Fatal("Can't create the watcher: ", err)
}
defer watcher.Close()

apiServer, err := api.NewServer(
api.Config{
ListenAddress: cfg.apiListenAddr,
Expand All @@ -124,9 +125,19 @@ func main() {
klog.Fatal("Can't set up API server", err)
}

var readinessWaiter readiness.Waiter = readiness.NoopWaiter{}

if cfg.readinessHTTPURL != "" {
readinessWaiter = readiness.NewHTTP(cfg.readinessHTTPURL, cfg.readinessPollPeriod)
}

grp, grpCtx := errgroup.WithContext(ctx)

grp.Go(func() error {
if err := readinessWaiter.Wait(grpCtx); err != nil {
return err
}

elector.Run(grpCtx)
return nil
})
Expand All @@ -135,8 +146,8 @@ func main() {
grp.Go(func() error { return apiServer.Serve(grpCtx) })

if err := grp.Wait(); err != nil {
klog.Fatal("leader-agent failed, reason: ", err)
klog.Fatal("Error while running prometheus-elector, reason: ", err)
}

klog.Info("Leader-Agent exited successfully")
klog.Info("prometheus-elector exited successfully")
}
7 changes: 4 additions & 3 deletions election/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func Setup(cfg Config, k8sClient kubernetes.Interface, reconciller *config.Recon
RetryPeriod: cfg.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Info("I'm leading, setting leader configuration.")
klog.Info("Leading, applying leader configuration.")

if err := reconciller.Reconcile(ctx); err != nil {
klog.ErrorS(err, "Failed to reconcile configurations")
Expand All @@ -62,12 +62,13 @@ func Setup(cfg Config, k8sClient kubernetes.Interface, reconciller *config.Recon
}
},
OnStoppedLeading: func() {
klog.Info("I stopped leading, setting follower configuration.")
klog.Info("Stopped leading, applying follower configuration.")

ctx := context.Background()

if err := reconciller.Reconcile(ctx); err != nil {
klog.ErrorS(err, "failed to sync")
klog.ErrorS(err, "Failed to reconcile configurations")
return
}

if err := notifier.Notify(ctx); err != nil {
Expand Down
1 change: 1 addition & 0 deletions helm/templates/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ spec:
- -config=/etc/config/prometheus-elector.yaml
- -output=/etc/runtime/prometheus.yaml
- -notify-http-url=http://127.0.0.1:9090/-/reload
- -readiness-http-url=http://127.0.0.1:9090/-/ready
- -api-listen-address=:9095
{{- if .Values.enableLeaderProxy }}
- -api-proxy-enabled
Expand Down
2 changes: 1 addition & 1 deletion notifier/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (r *retryNotifier) Notify(ctx context.Context) error {
}

if j > 0 {
klog.ErrorS(err, "failed to notify prometheus, will retry...", "attempt", r.maxAttempts-j, "maxAttempts", r.maxAttempts)
klog.ErrorS(err, "Failed to notify prometheus, will retry...", "attempt", r.maxAttempts-j, "maxAttempts", r.maxAttempts)
time.Sleep(r.delay)
}
}
Expand Down
68 changes: 68 additions & 0 deletions readiness/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package readiness

import (
"context"
"net/http"
"time"

"k8s.io/klog/v2"
)

type httpWaiter struct {
url string
pollPeriod time.Duration

httpClient *http.Client
}

func NewHTTP(url string, pollPeriod time.Duration) Waiter {
return &httpWaiter{
url: url,
pollPeriod: pollPeriod,
httpClient: http.DefaultClient,
}
}

func (w *httpWaiter) Wait(ctx context.Context) error {
klog.InfoS("Waiting for prometheus to be ready", "poll_period", w.pollPeriod, "url", w.url)

tick := time.NewTicker(w.pollPeriod)
defer tick.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-tick.C:
ready, err := w.checkReadiness(ctx)
if err != nil {
return err
}

if ready {
klog.Info("Prometheus is ready")
return nil
}
}
}
}

func (w *httpWaiter) checkReadiness(ctx context.Context) (bool, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, w.url, http.NoBody)
if err != nil {
return false, err
}

rsp, err := w.httpClient.Do(req)
if err != nil {
klog.ErrorS(err, "Failed to check if Prometheus is ready")
return false, nil
}

if rsp.StatusCode != http.StatusOK {
klog.Error("Prometheus isn't ready yet", "status", rsp.StatusCode)
return false, nil
}

return true, nil
}
36 changes: 36 additions & 0 deletions readiness/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package readiness_test

import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/jlevesy/prometheus-elector/readiness"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestHTTPWaiter(t *testing.T) {
var checkCalled bool

srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/foo", r.URL.Path)

if checkCalled {
rw.WriteHeader(http.StatusOK)
return
}

checkCalled = true
rw.WriteHeader(http.StatusInsufficientStorage)
}))
defer srv.Close()

waiter := readiness.NewHTTP(srv.URL+"/foo", 200*time.Millisecond)

err := waiter.Wait(context.Background())
require.NoError(t, err)
assert.True(t, checkCalled)
}
Loading

0 comments on commit 6ec6364

Please sign in to comment.