Skip to content

Commit

Permalink
feat: allow dynamically exposing diagnostics for multiple instances
Browse files Browse the repository at this point in the history
  • Loading branch information
czeslavo committed Feb 7, 2025
1 parent c9e6ea5 commit 16523dc
Show file tree
Hide file tree
Showing 10 changed files with 429 additions and 70 deletions.
25 changes: 18 additions & 7 deletions internal/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Manager struct {
diagnosticsServer mo.Option[diagnostics.Server]
stopAnonymousReports func()
diagnosticsCollector mo.Option[*diagnostics.Collector]
diagnosticsHandler mo.Option[*diagnostics.HTTPHandler]
}

// New configures the controller manager call Start.
Expand Down Expand Up @@ -498,15 +499,17 @@ func (m *Manager) setupDiagnostics(
if c.EnableConfigDumps {
diagnosticsCollector := diagnostics.NewCollector(logger, c)
m.diagnosticsCollector = mo.Some(diagnosticsCollector)
configDiagnosticsHandler := diagnostics.NewConfigDiagnosticsHTTPHandler(diagnosticsCollector, c.DumpSensitiveConfig)
serverOpts = append(serverOpts, diagnostics.WithConfigDiagnostics(configDiagnosticsHandler))
m.diagnosticsHandler = mo.Some(diagnostics.NewConfigDiagnosticsHTTPHandler(diagnosticsCollector, c.DumpSensitiveConfig))
serverOpts = append(serverOpts, diagnostics.WithConfigDiagnostics(m.diagnosticsHandler.MustGet()))
}

m.diagnosticsServer = mo.Some(diagnostics.NewServer(logger, diagnostics.ServerConfig{
ProfilingEnabled: c.EnableProfiling,
DumpSensitiveConfig: c.DumpSensitiveConfig,
ListenerPort: c.DiagnosticServerPort,
}, serverOpts...))
if !c.DisableRunningDiagnosticsServer {
m.diagnosticsServer = mo.Some(diagnostics.NewServer(logger, diagnostics.ServerConfig{
ProfilingEnabled: c.EnableProfiling,
DumpSensitiveConfig: c.DumpSensitiveConfig,
ListenerPort: c.DiagnosticServerPort,
}, serverOpts...))
}

// If diagnosticsCollector is set, it means that config dumps are enabled and we should return a diagnostics.Client.
if dc, ok := m.diagnosticsCollector.Get(); ok {
Expand Down Expand Up @@ -566,3 +569,11 @@ func (m *Manager) IsReady() error {
}
return nil
}

// DiagnosticsHandler returns the diagnostics HTTP handler if it's enabled in the configuration. Otherwise, it returns nil.
func (m *Manager) DiagnosticsHandler() http.Handler {
if h, ok := m.diagnosticsHandler.Get(); ok {
return h
}
return nil
}
9 changes: 5 additions & 4 deletions pkg/manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,11 @@ type Config struct {
AdmissionServer AdmissionServerConfig

// Diagnostics and performance
EnableProfiling bool
EnableConfigDumps bool
DumpSensitiveConfig bool
DiagnosticServerPort int
EnableProfiling bool
EnableConfigDumps bool
DumpSensitiveConfig bool
DiagnosticServerPort int
DisableRunningDiagnosticsServer bool // TODO(czeslavo): instead of this toggle, move the server out of the internal.Manager

// Feature Gates
FeatureGates FeatureGates
Expand Down
6 changes: 6 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package manager
import (
"context"
"fmt"
"net/http"

"github.com/go-logr/logr"

Expand Down Expand Up @@ -55,3 +56,8 @@ func (m *Manager) ID() ID {
func (m *Manager) Config() managercfg.Config {
return m.config
}

// DiagnosticsHandler returns the diagnostics handler of the manager if available. Otherwise, it returns nil.
func (m *Manager) DiagnosticsHandler() http.Handler {
return m.manager.DiagnosticsHandler()
}
90 changes: 90 additions & 0 deletions pkg/manager/multiinstance/diagnostics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package multiinstance

import (
"context"
"fmt"
"net/http"
"sync"
"time"

"golang.org/x/sync/errgroup"

"github.com/kong/kubernetes-ingress-controller/v3/pkg/manager"
)

const (
DiagnosticsServerReadHeaderTimeout = 5 * time.Second
)

// DiagnosticsServer is a server that provides diagnostics information for multiple instances managed by the manager.
// Each instance exposes its own diagnostics endpoints on `/{instanceID}/debug/config/` prefix. On every call to
// RegisterInstance or UnregisterInstance, the server rebuilds its mux to include the latest set of handlers.
type DiagnosticsServer struct {
listenerPort int
handlers map[manager.ID]http.Handler

mux *http.ServeMux
muxLock sync.Mutex
}

func NewDiagnosticsServer(listenerPort int) *DiagnosticsServer {
return &DiagnosticsServer{
listenerPort: listenerPort,
handlers: make(map[manager.ID]http.Handler),
mux: http.NewServeMux(),
}
}

// Start starts the diagnostics server.
func (s *DiagnosticsServer) Start(ctx context.Context) error {
errg, _ := errgroup.WithContext(ctx)
errg.Go(func() error {
server := http.Server{
Addr: fmt.Sprintf(":%d", s.listenerPort),
Handler: s,
ReadHeaderTimeout: DiagnosticsServerReadHeaderTimeout,
}
return server.ListenAndServe()
})
return errg.Wait()
}

// ServeHTTP serves the diagnostics server.
func (s *DiagnosticsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.muxLock.Lock()
defer s.muxLock.Unlock()

s.mux.ServeHTTP(w, r)
}

// RegisterInstance registers a new instance to the diagnostics server.
func (s *DiagnosticsServer) RegisterInstance(instanceID manager.ID, instanceDiagnosticsHandler http.Handler) {
s.muxLock.Lock()
defer s.muxLock.Unlock()

s.handlers[instanceID] = instanceDiagnosticsHandler
s.rebuildMux()
}

// UnregisterInstance unregisters an instance from the diagnostics server.
func (s *DiagnosticsServer) UnregisterInstance(instanceID manager.ID) {
s.muxLock.Lock()
defer s.muxLock.Unlock()

delete(s.handlers, instanceID)
s.rebuildMux()
}

// rebuildMux rebuilds the mux with the current handlers. It should be called with the muxLock held.
func (s *DiagnosticsServer) rebuildMux() {
s.mux = http.NewServeMux()
for instanceID, handler := range s.handlers {
// It's possible an instance doesn't have a diagnostics handler. Handle that gracefully.
if handler == nil {
continue
}

prefix := fmt.Sprintf("/%s/debug/config", instanceID)
s.mux.Handle(fmt.Sprintf("%s/", prefix), http.StripPrefix(prefix, handler))
}
}
8 changes: 8 additions & 0 deletions pkg/manager/multiinstance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package multiinstance

import (
"context"
"net/http"
"sync"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -53,6 +54,13 @@ func (i *instance) Run(ctx context.Context) {
}
}

// DiagnosticsHandler returns an HTTP handler that exposes diagnostics information for this instance. It can return
// nil if the instance does not expose diagnostics information.
func (i *instance) DiagnosticsHandler() http.Handler {
return i.in.DiagnosticsHandler()
}

// IsReady returns an error if the instance is not ready.
func (i *instance) IsReady() error {
return i.in.IsReady()
}
50 changes: 43 additions & 7 deletions pkg/manager/multiinstance/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package multiinstance

import (
"context"
"net/http"
"sync"

"github.com/go-logr/logr"
Expand All @@ -21,30 +22,55 @@ type ManagerInstance interface {
ID() manager.ID
Run(context.Context) error
IsReady() error
DiagnosticsHandler() http.Handler
}

// DiagnosticsExposer is an interface that represents an object that can expose diagnostics data of manager.Manager
// instances.
type DiagnosticsExposer interface {
// RegisterInstance registers a new manager.Manager instance with the diagnostics exposer.
RegisterInstance(manager.ID, http.Handler)

// TODO(czeslavo): expose a getter for the diagnostics server and handle its lifecycle.
// UnregisterInstance unregisters a manager.Manager instance with the diagnostics exposer.
UnregisterInstance(manager.ID)
}

// Manager is able to dynamically run multiple instances of manager.Manager and manage their lifecycle.
// It is responsible for things like:
// - Making sure there's only one instance of a manager.Manager with a given ID.
// - Starting and stopping manager.Manager instances as needed.
// - Exposing a common diagnostics server for all manager.Manager instances.
// - Registering instances' diagnostic handlers in a DiagnosticsExposer when configured.
type Manager struct {
logger logr.Logger

instances map[manager.ID]*instance
instancesLock sync.RWMutex
schedulingQueue chan manager.ID
instances map[manager.ID]*instance
instancesLock sync.RWMutex
schedulingQueue chan manager.ID
diagnosticsExposer DiagnosticsExposer
}

// ManagerOption is a functional option that can be used to configure a new multi-instance manager.
type ManagerOption func(*Manager)

func WithDiagnosticsExposer(exposer DiagnosticsExposer) ManagerOption {
return func(m *Manager) {
m.diagnosticsExposer = exposer
}
}

// NewManager creates a new multi-instance manager.
func NewManager(logger logr.Logger) *Manager {
return &Manager{
func NewManager(logger logr.Logger, opts ...ManagerOption) *Manager {
m := &Manager{
logger: logger,
instances: make(map[manager.ID]*instance),
schedulingQueue: make(chan manager.ID, SchedulingQueueSize),
}

for _, opt := range opts {
opt(m)
}

return m
}

// Run starts the multi-instance manager and blocks until the context is canceled. It should only be called once.
Expand Down Expand Up @@ -93,6 +119,11 @@ func (m *Manager) StopInstance(instanceID manager.ID) error {
return NewInstanceNotFoundError(instanceID)
}

// If diagnostics are enabled, unregister the instance from the diagnostics exposer.
if m.diagnosticsExposer != nil {
m.diagnosticsExposer.UnregisterInstance(instanceID)
}

// Send a signal to the instance to stop and let the running goroutine handle the cleanup.
in.Stop()

Expand Down Expand Up @@ -125,6 +156,11 @@ func (m *Manager) runInstance(ctx context.Context, instanceID manager.ID) {
m.logger.Info("Starting instance", "instanceID", instanceID)
go in.Run(ctx)

// If diagnostics are enabled, register the instance with the diagnostics exposer.
if m.diagnosticsExposer != nil {
m.diagnosticsExposer.RegisterInstance(instanceID, in.DiagnosticsHandler())
}

// Wait for the instance to stop or the parent context be done.
select {
case <-in.StopChannel():
Expand Down
Loading

0 comments on commit 16523dc

Please sign in to comment.