diff --git a/lib/service/service.go b/lib/service/service.go index 52f0c3e110f49..5e877bd188570 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -53,6 +53,7 @@ import ( "github.com/gravitational/roundtrip" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -470,6 +471,15 @@ type TeleportProcess struct { // resolver is used to identify the reverse tunnel address when connecting via // the proxy. resolver reversetunnelclient.Resolver + + // metricRegistry is the prometheus metric registry for the process. + // Every teleport service that wants to register metrics should use this + // instead of the global prometheus.DefaultRegisterer to avoid registration + // conflicts. + // + // Both the metricsRegistry and the default global registry are gathered by + // Telepeort's metric service. + metricsRegistry *prometheus.Registry } type keyPairKey struct { @@ -906,6 +916,15 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { "pid", fmt.Sprintf("%v.%v", os.Getpid(), processID), ) + // Use the custom metrics registry if specified, else create a new one. + // We must create the registry in NewTeleport, as opposed to ApplyConfig(), + // because some tests are running multiple Teleport instances from the same + // config. + metricsRegistry := cfg.MetricsRegistry + if metricsRegistry == nil { + metricsRegistry = prometheus.NewRegistry() + } + // If FIPS mode was requested make sure binary is build against BoringCrypto. if cfg.FIPS { if !modules.GetModules().IsBoringBinary() { @@ -1086,6 +1105,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { keyPairs: make(map[keyPairKey]KeyPair), cloudLabels: cloudLabels, TracingProvider: tracing.NoopProvider(), + metricsRegistry: metricsRegistry, } process.registerExpectedServices(cfg) @@ -3346,11 +3366,23 @@ func (process *TeleportProcess) initUploaderService() error { return nil } +// promHTTPLogAdapter adapts a slog.Logger into a promhttp.Logger. +type promHTTPLogAdapter struct { + ctx context.Context + *slog.Logger +} + +// Println implements the promhttp.Logger interface. +func (l promHTTPLogAdapter) Println(v ...interface{}) { + //nolint:sloglint // msg cannot be constant + l.ErrorContext(l.ctx, fmt.Sprint(v...)) +} + // initMetricsService starts the metrics service currently serving metrics for // prometheus consumption func (process *TeleportProcess) initMetricsService() error { mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.Handler()) + mux.Handle("/metrics", process.newMetricsHandler()) logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentMetrics, process.id)) @@ -3435,6 +3467,33 @@ func (process *TeleportProcess) initMetricsService() error { return nil } +// newMetricsHandler creates a new metrics handler serving metrics both from the global prometheus registry and the +// in-process one. +func (process *TeleportProcess) newMetricsHandler() http.Handler { + // We gather metrics both from the in-process registry (preferred metrics registration method) + // and the global registry (used by some Teleport services and many dependencies). + gatherers := prometheus.Gatherers{ + process.metricsRegistry, + prometheus.DefaultGatherer, + } + + metricsHandler := promhttp.InstrumentMetricHandler( + process.metricsRegistry, promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{ + // Errors can happen if metrics are registered with identical names in both the local and the global registry. + // In this case, we log the error but continue collecting metrics. The first collected metric will win + // (the one from the local metrics registry takes precedence). + // As we move more things to the local registry, especially in other tools like tbot, we will have less + // conflicts in tests. + ErrorHandling: promhttp.ContinueOnError, + ErrorLog: promHTTPLogAdapter{ + ctx: process.ExitContext(), + Logger: process.logger.With(teleport.ComponentKey, teleport.ComponentMetrics), + }, + }), + ) + return metricsHandler +} + // initDiagnosticService starts diagnostic service currently serving healthz // and prometheus endpoints func (process *TeleportProcess) initDiagnosticService() error { @@ -3444,7 +3503,7 @@ func (process *TeleportProcess) initDiagnosticService() error { // metrics will otherwise be served by the metrics service if it's enabled // in the config. if !process.Config.Metrics.Enabled { - mux.Handle("/metrics", promhttp.Handler()) + mux.Handle("/metrics", process.newMetricsHandler()) } if process.Config.Debug { diff --git a/lib/service/service_test.go b/lib/service/service_test.go index 2010734d95318..411c64e9ce4c4 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -23,9 +23,12 @@ import ( "crypto/tls" "errors" "fmt" + "github.com/sirupsen/logrus" + "io" "log/slog" "net" "net/http" + "net/url" "os" "path/filepath" "strings" @@ -39,7 +42,8 @@ import ( "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/crypto/ssh" "google.golang.org/grpc" @@ -1755,6 +1759,152 @@ func TestInstanceMetadata(t *testing.T) { } } +// TestMetricsService tests that the optional metrics service exposes +// metrics from both the in-process and global metrics registry. When the +// service is disabled, metrics are served by the diagnostics service +// (tested in TestMetricsInDiagnosticsService). +func TestMetricsService(t *testing.T) { + t.Parallel() + // Test setup: create a listener for the metrics server, get its file descriptor. + + // Note: this code is copied from integrations/helpers/NewListenerOn() to avoid including helpers in a production + // build and avoid a cyclic dependency. + metricsListener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, metricsListener.Close()) + }) + require.IsType(t, &net.TCPListener{}, metricsListener) + metricsListenerFile, err := metricsListener.(*net.TCPListener).File() + require.NoError(t, err) + + // Test setup: create a new teleport process + dataDir := makeTempDir(t) + cfg := servicecfg.MakeDefaultConfig() + cfg.DataDir = dataDir + cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}) + cfg.Auth.Enabled = true + cfg.Proxy.Enabled = false + cfg.SSH.Enabled = false + cfg.DebugService.Enabled = false + cfg.Auth.StorageConfig.Params["path"] = dataDir + cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"} + cfg.Metrics.Enabled = true + + // Configure the metrics server to use the listener we previously created. + cfg.Metrics.ListenAddr = &utils.NetAddr{AddrNetwork: "tcp", Addr: metricsListener.Addr().String()} + cfg.FileDescriptors = []*servicecfg.FileDescriptor{ + {Type: string(ListenerMetrics), Address: metricsListener.Addr().String(), File: metricsListenerFile}, + } + + // Create and start the Teleport service. + process, err := NewTeleport(cfg) + require.NoError(t, err) + require.NoError(t, process.Start()) + t.Cleanup(func() { + assert.NoError(t, process.Close()) + assert.NoError(t, process.Wait()) + }) + + // Test setup: create our test metrics. + nonce := strings.ReplaceAll(uuid.NewString(), "-", "") + localMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "test", + Name: "local_metric_" + nonce, + }) + globalMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "test", + Name: "global_metric_" + nonce, + }) + require.NoError(t, process.metricsRegistry.Register(localMetric)) + require.NoError(t, prometheus.Register(globalMetric)) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + t.Cleanup(cancel) + _, err = process.WaitForEvent(ctx, MetricsReady) + require.NoError(t, err) + + // Test execution: get metrics and check the tests metrics are here. + metricsURL, err := url.Parse("http://" + metricsListener.Addr().String()) + require.NoError(t, err) + metricsURL.Path = "/metrics" + resp, err := http.Get(metricsURL.String()) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + + // Test validation: check that the metrics server served both the local and global registry. + require.Contains(t, string(body), "local_metric_"+nonce) + require.Contains(t, string(body), "global_metric_"+nonce) +} + +// TestMetricsInDiagnosticsService tests that the diagnostics service exposes +// metrics from both the in-process and global metrics registry when the metrics +// service is disabled. +func TestMetricsInDiagnosticsService(t *testing.T) { + t.Parallel() + // Test setup: create a new teleport process + dataDir := makeTempDir(t) + cfg := servicecfg.MakeDefaultConfig() + cfg.DataDir = dataDir + cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}) + cfg.Auth.Enabled = true + cfg.Proxy.Enabled = false + cfg.SSH.Enabled = false + cfg.DebugService.Enabled = false + cfg.Auth.StorageConfig.Params["path"] = dataDir + cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"} + cfg.DiagnosticAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"} + + // Test setup: Create and start the Teleport service. + process, err := NewTeleport(cfg) + require.NoError(t, err) + require.NoError(t, process.Start()) + t.Cleanup(func() { + assert.NoError(t, process.Close()) + assert.NoError(t, process.Wait()) + }) + + // Test setup: create our test metrics. + nonce := strings.ReplaceAll(uuid.NewString(), "-", "") + localMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "test", + Name: "local_metric_" + nonce, + }) + globalMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "test", + Name: "global_metric_" + nonce, + }) + require.NoError(t, process.metricsRegistry.Register(localMetric)) + require.NoError(t, prometheus.Register(globalMetric)) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + t.Cleanup(cancel) + _, err = process.WaitForEvent(ctx, TeleportReadyEvent) + require.NoError(t, err) + + // Test execution: query the metrics endpoint and check the tests metrics are here. + diagAddr, err := process.DiagnosticAddr() + require.NoError(t, err) + metricsURL, err := url.Parse("http://" + diagAddr.String()) + require.NoError(t, err) + metricsURL.Path = "/metrics" + resp, err := http.Get(metricsURL.String()) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + + // Test validation: check that the metrics server served both the local and global registry. + require.Contains(t, string(body), "local_metric_"+nonce) + require.Contains(t, string(body), "global_metric_"+nonce) +} + // makeTempDir makes a temp dir with a shorter name than t.TempDir() in order to // avoid https://github.com/golang/go/issues/62614. func makeTempDir(t *testing.T) string { diff --git a/lib/service/servicecfg/config.go b/lib/service/servicecfg/config.go index b756dc0359ca9..5ea33e78f0032 100644 --- a/lib/service/servicecfg/config.go +++ b/lib/service/servicecfg/config.go @@ -33,6 +33,7 @@ import ( "github.com/ghodss/yaml" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" + "github.com/prometheus/client_golang/prometheus" "github.com/sashabaranov/go-openai" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" @@ -263,6 +264,12 @@ type Config struct { // AccessGraph represents AccessGraph server config AccessGraph AccessGraphConfig + // MetricsRegistry is the prometheus metrics registry used by the Teleport process to register its metrics. + // As of today, not every Teleport metric is registered against this registry. Some Teleport services + // and Teleport dependencies are using the global registry. + // Both the MetricsRegistry and the default global registry are gathered by Teleport's metric service. + MetricsRegistry *prometheus.Registry + // token is either the token needed to join the auth server, or a path pointing to a file // that contains the token //