Skip to content

Commit

Permalink
status: don't serve /healthz until all probes completed once
Browse files Browse the repository at this point in the history
Ensure that all status probes have completed at least once before
starting serving the agent /healthz API. This prevents possible
race conditions causing the agent to exit early from the startup
probe phase if the healthz endpoint is queried before initializing
any of the fields. Overall, the additional wait time is not expected
to introduce any concern, considering that practically all probes
should return quickly as don't involve expensive operations or API
calls. The only exception is the k8s probe, but at that point of
the initialization we are already guaranteed to have connected
to the k8s API server.

Signed-off-by: Marco Iorio <[email protected]>
  • Loading branch information
giorio94 authored and julianwiedmann committed Nov 25, 2024
1 parent 6d5fd6d commit dbb4bda
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 4 deletions.
4 changes: 3 additions & 1 deletion daemon/cmd/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1866,7 +1866,9 @@ func startDaemon(d *Daemon, restoredEndpoints *endpointRestoreState, cleaner *da
}
bootstrapStats.healthCheck.End(true)

d.startStatusCollector(cleaner)
if err := d.startStatusCollector(d.ctx, cleaner); err != nil {
return fmt.Errorf("failed to start status collector: %w", err)
}

d.startAgentHealthHTTPService()
if option.Config.KubeProxyReplacementHealthzBindAddr != "" {
Expand Down
10 changes: 9 additions & 1 deletion daemon/cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func (d *Daemon) getIdentityRange() *models.IdentityRange {
return s
}

func (d *Daemon) startStatusCollector(cleaner *daemonCleanup) {
func (d *Daemon) startStatusCollector(ctx context.Context, cleaner *daemonCleanup) error {
probes := []status.Probe{
{
Name: "kvstore",
Expand Down Expand Up @@ -931,6 +931,12 @@ func (d *Daemon) startStatusCollector(cleaner *daemonCleanup) {

d.statusCollector = status.NewCollector(probes, status.DefaultConfig)

// Block until all probes have been executed at least once, to make sure that
// the status has been fully initialized once we exit from this function.
if err := d.statusCollector.WaitForFirstRun(ctx); err != nil {
return fmt.Errorf("waiting for first run: %w", err)
}

// Set up a signal handler function which prints out logs related to daemon status.
cleaner.cleanupFuncs.Add(func() {
// If the KVstore state is not OK, print help for user.
Expand All @@ -949,4 +955,6 @@ func (d *Daemon) startStatusCollector(cleaner *daemonCleanup) {

d.statusCollector.Close()
})

return nil
}
27 changes: 25 additions & 2 deletions pkg/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type Collector struct {
// lastStackdumpTime is the last time we dumped stack; only do it
// every 5 minutes so we don't waste resources.
lastStackdumpTime atomic.Int64

// Tracks whether all probes have been executed at least once.
firstRunSwg *lock.StoppableWaitGroup
}

// Config is the collector configuration
Expand All @@ -99,6 +102,7 @@ func NewCollector(probes []Probe, config Config) *Collector {
stop: make(chan struct{}),
staleProbes: make(map[string]struct{}),
probeStartTime: make(map[string]time.Time),
firstRunSwg: lock.NewStoppableWaitGroup(),
}

if c.config.Interval == time.Duration(0) {
Expand All @@ -114,12 +118,25 @@ func NewCollector(probes []Probe, config Config) *Collector {
}

for i := range probes {
c.spawnProbe(&probes[i])
c.firstRunSwg.Add()
c.spawnProbe(&probes[i], c.firstRunSwg.Done)
}
c.firstRunSwg.Stop()

return c
}

// WaitForFirstRun blocks until all probes have been executed at least once, or
// the context gets canceled.
func (c *Collector) WaitForFirstRun(ctx context.Context) error {
select {
case <-c.firstRunSwg.WaitChannel():
return nil
case <-ctx.Done():
return ctx.Err()
}
}

// Close exits all probes and shuts down the collector
func (c *Collector) Close() {
close(c.stop)
Expand All @@ -143,11 +160,17 @@ func (c *Collector) GetStaleProbes() map[string]time.Time {
}

// spawnProbe starts a goroutine which invokes the probe at the particular interval.
func (c *Collector) spawnProbe(p *Probe) {
func (c *Collector) spawnProbe(p *Probe, firstRunCompleted func()) {
go func() {
for {
c.runProbe(p)

// The first run of the probe has completed.
if firstRunCompleted != nil {
firstRunCompleted()
firstRunCompleted = nil
}

interval := c.config.Interval
if p.Interval != nil {
interval = p.Interval(p.consecutiveFailures)
Expand Down
33 changes: 33 additions & 0 deletions pkg/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,36 @@ func TestCollectorSuccessAfterTimeout(t *testing.T) {
}, 1*time.Second))
require.Empty(t, collector.GetStaleProbes())
}

func TestWaitForFirstRun(t *testing.T) {
s := setUpTest(t)

unlock := make(chan struct{})
probeFn := func(ctx context.Context) (interface{}, error) {
<-unlock
return nil, nil
}

p := []Probe{
{Probe: probeFn, OnStatusUpdate: func(status Status) {}},
{Probe: probeFn, OnStatusUpdate: func(status Status) {}},
{Probe: probeFn, OnStatusUpdate: func(status Status) {}},
}

collector := NewCollector(p, s.Config())
defer collector.Close()

test := func() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
return collector.WaitForFirstRun(ctx)
}

require.Error(t, test())
unlock <- struct{}{}
require.Error(t, test())
unlock <- struct{}{}
require.Error(t, test())
unlock <- struct{}{}
require.NoError(t, test())
}

0 comments on commit dbb4bda

Please sign in to comment.