From 63dc9a8f0f4c1d63df974f2a0a2246ac163fd455 Mon Sep 17 00:00:00 2001 From: starptech Date: Tue, 26 Nov 2024 15:38:54 +0100 Subject: [PATCH 1/3] chore: ensure metric store per graph mux --- router/core/context.go | 14 ++++ router/core/graph_server.go | 54 +++++++-------- router/pkg/metric/metric_store.go | 107 ++++++++++++++++++++++++------ 3 files changed, 127 insertions(+), 48 deletions(-) diff --git a/router/core/context.go b/router/core/context.go index 4d9a4e2f09..147d486f44 100644 --- a/router/core/context.go +++ b/router/core/context.go @@ -204,6 +204,20 @@ func (r *requestTelemetryAttributes) addCommonAttribute(vals ...attribute.KeyVal r.traceAttrs = append(r.traceAttrs, vals...) } +func (r *requestTelemetryAttributes) addCommonTraceAttribute(vals ...attribute.KeyValue) { + if !r.traceEnabled { + return + } + r.traceAttrs = append(r.traceAttrs, vals...) +} + +func (r *requestTelemetryAttributes) addCommonMetricAttribute(vals ...attribute.KeyValue) { + if !r.metricsEnabled { + return + } + r.metricAttrs = append(r.metricAttrs, vals...) +} + func (r *requestTelemetryAttributes) addMetricAttribute(vals ...attribute.KeyValue) { if !r.metricsEnabled { return diff --git a/router/core/graph_server.go b/router/core/graph_server.go index 1db270849b..17ade5b907 100644 --- a/router/core/graph_server.go +++ b/router/core/graph_server.go @@ -80,8 +80,6 @@ type ( publicKey *ecdsa.PublicKey executionTransport *http.Transport baseOtelAttributes []attribute.KeyValue - runtimeMetrics *rmetric.RuntimeMetrics - metricStore rmetric.Store baseRouterConfigVersion string mux *chi.Mux // inFlightRequests is used to track the number of requests currently being processed @@ -100,7 +98,6 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC cancelFunc: cancel, Config: &r.Config, websocketStats: r.WebsocketStats, - metricStore: rmetric.NewNoopMetrics(), executionTransport: newHTTPTransport(r.subgraphTransportOptions, proxy), playgroundHandler: r.playgroundHandler, baseRouterConfigVersion: routerConfig.GetVersion(), @@ -274,9 +271,11 @@ type graphMux struct { validationCache *ristretto.Cache[uint64, bool] operationHashCache *ristretto.Cache[uint64, string] accessLogsFileLogger *logging.BufferedLogger + runtimeMetrics *rmetric.RuntimeMetrics + metricStore rmetric.Store } -func (s *graphMux) Shutdown(_ context.Context) error { +func (s *graphMux) Shutdown(ctx context.Context) error { var err error @@ -304,6 +303,18 @@ func (s *graphMux) Shutdown(_ context.Context) error { } } + if s.metricStore != nil { + if aErr := s.metricStore.Shutdown(ctx); aErr != nil { + err = errors.Join(err, aErr) + } + } + + if s.runtimeMetrics != nil { + if aErr := s.runtimeMetrics.Shutdown(); aErr != nil { + err = errors.Join(err, aErr) + } + } + return err } @@ -316,7 +327,9 @@ func (s *graphServer) buildGraphMux(ctx context.Context, engineConfig *nodev1.EngineConfiguration, configSubgraphs []*nodev1.Subgraph) (*graphMux, error) { - gm := &graphMux{} + gm := &graphMux{ + metricStore: rmetric.NewNoopMetrics(), + } httpRouter := chi.NewRouter() @@ -331,19 +344,19 @@ func (s *graphServer) buildGraphMux(ctx context.Context, if s.metricConfig.OpenTelemetry.RouterRuntime { // Create runtime metrics exported to OTEL - s.runtimeMetrics = rmetric.NewRuntimeMetrics( + gm.runtimeMetrics = rmetric.NewRuntimeMetrics( s.logger, s.otlpMeterProvider, // We track runtime metrics with base router config version // even when we have multiple feature flags append([]attribute.KeyValue{ otel.WgRouterConfigVersion.String(s.baseRouterConfigVersion), - }, s.baseOtelAttributes...), + }, baseOtelAttributes...), s.processStartTime, ) // Start runtime metrics - if err := s.runtimeMetrics.Start(); err != nil { + if err := gm.runtimeMetrics.Start(); err != nil { return nil, err } } @@ -356,6 +369,7 @@ func (s *graphServer) buildGraphMux(ctx context.Context, m, err := rmetric.NewStore( rmetric.WithPromMeterProvider(s.promMeterProvider), rmetric.WithOtlpMeterProvider(s.otlpMeterProvider), + rmetric.WithBaseAttributes(baseOtelAttributes), rmetric.WithLogger(s.logger), rmetric.WithProcessStartTime(s.processStartTime), ) @@ -363,7 +377,7 @@ func (s *graphServer) buildGraphMux(ctx context.Context, return nil, fmt.Errorf("failed to create metric handler: %w", err) } - s.metricStore = m + gm.metricStore = m } subgraphs, err := configureSubgraphOverwrites( @@ -479,7 +493,7 @@ func (s *graphServer) buildGraphMux(ctx context.Context, } metrics := NewRouterMetrics(&routerMetricsConfig{ - metrics: s.metricStore, + metrics: gm.metricStore, gqlMetricsExporter: s.gqlMetricsExporter, exportEnabled: s.graphqlMetricsConfig.Enabled, routerConfigVersion: routerConfigVersion, @@ -566,7 +580,7 @@ func (s *graphServer) buildGraphMux(ctx context.Context, reqContext := getRequestContext(r.Context()) - reqContext.telemetry.addCommonAttribute(baseOtelAttributes...) + reqContext.telemetry.addCommonTraceAttribute(baseOtelAttributes...) if commonAttrRequestMapper != nil { reqContext.telemetry.addCommonAttribute(commonAttrRequestMapper(r)...) @@ -669,7 +683,7 @@ func (s *graphServer) buildGraphMux(ctx context.Context, RequestTimeout: s.subgraphTransportOptions.RequestTimeout, PreHandlers: s.preOriginHandlers, PostHandlers: s.postOriginHandlers, - MetricStore: s.metricStore, + MetricStore: gm.metricStore, RetryOptions: retrytransport.RetryOptions{ Enabled: s.retryOptions.Enabled, MaxRetryCount: s.retryOptions.MaxRetryCount, @@ -737,7 +751,7 @@ func (s *graphServer) buildGraphMux(ctx context.Context, TracerProvider: s.tracerProvider, Authorizer: NewCosmoAuthorizer(authorizerOptions), SubgraphErrorPropagation: s.subgraphErrorPropagation, - EngineLoaderHooks: NewEngineRequestHooks(s.metricStore), + EngineLoaderHooks: NewEngineRequestHooks(gm.metricStore), } if s.redisClient != nil { @@ -1011,20 +1025,6 @@ func (s *graphServer) Shutdown(ctx context.Context) error { ctx = newCtx } - if s.metricStore != nil { - if err := s.metricStore.Shutdown(ctx); err != nil { - s.logger.Error("Failed to shutdown metric store", zap.Error(err)) - finalErr = errors.Join(finalErr, err) - } - } - - if s.runtimeMetrics != nil { - if err := s.runtimeMetrics.Shutdown(); err != nil { - s.logger.Error("Failed to shutdown runtime metrics", zap.Error(err)) - finalErr = errors.Join(finalErr, err) - } - } - if s.pubSubProviders != nil { s.logger.Debug("Shutting down pubsub providers") diff --git a/router/pkg/metric/metric_store.go b/router/pkg/metric/metric_store.go index dff06d340b..105dc80c55 100644 --- a/router/pkg/metric/metric_store.go +++ b/router/pkg/metric/metric_store.go @@ -77,6 +77,8 @@ type ( processStartTime time.Time otlpRequestMetrics Provider promRequestMetrics Provider + baseAttributes []attribute.KeyValue + baseAttributesOpt otelmetric.MeasurementOption logger *zap.Logger } @@ -118,6 +120,8 @@ func NewStore(opts ...Option) (Store, error) { opt(h) } + h.baseAttributesOpt = otelmetric.WithAttributes(h.baseAttributes...) + // Create OTLP metrics exported to OTEL oltpMetrics, err := NewOtlpMetricStore(h.logger, h.otelMeterProvider) if err != nil { @@ -140,16 +144,24 @@ func NewStore(opts ...Option) (Store, error) { func (h *Metrics) MeasureInFlight(ctx context.Context, sliceAttr []attribute.KeyValue, opt otelmetric.AddOption) func() { handlers := make([]func(), 0, 2) + opts := []otelmetric.AddOption{h.baseAttributesOpt, opt} + + // Explode for prometheus metrics + if len(sliceAttr) == 0 { - handlers = append(handlers, h.promRequestMetrics.MeasureInFlight(ctx, opt)) + handlers = append(handlers, h.promRequestMetrics.MeasureInFlight(ctx, opts...)) } else { explodeAddInstrument(ctx, sliceAttr, func(ctx context.Context, newOpts ...otelmetric.AddOption) { - newOpts = append(newOpts, opt) + newOpts = append(newOpts, opts...) handlers = append(handlers, h.promRequestMetrics.MeasureInFlight(ctx, newOpts...)) }) } - handlers = append(handlers, h.otlpRequestMetrics.MeasureInFlight(ctx, otelmetric.WithAttributes(sliceAttr...), opt)) + // OTEL metrics + + opts = append(opts, otelmetric.WithAttributes(sliceAttr...)) + + handlers = append(handlers, h.otlpRequestMetrics.MeasureInFlight(ctx, opts...)) return func() { for _, h := range handlers { @@ -159,88 +171,135 @@ func (h *Metrics) MeasureInFlight(ctx context.Context, sliceAttr []attribute.Key } func (h *Metrics) MeasureRequestCount(ctx context.Context, sliceAttr []attribute.KeyValue, opt otelmetric.AddOption) { + opts := []otelmetric.AddOption{h.baseAttributesOpt, opt} + + // Explode for prometheus metrics + if len(sliceAttr) == 0 { - h.promRequestMetrics.MeasureRequestCount(ctx, opt) + h.promRequestMetrics.MeasureRequestCount(ctx, opts...) } else { explodeAddInstrument(ctx, sliceAttr, func(ctx context.Context, newOpts ...otelmetric.AddOption) { - newOpts = append(newOpts, opt) + newOpts = append(newOpts, opts...) h.promRequestMetrics.MeasureRequestCount(ctx, newOpts...) }) } - h.otlpRequestMetrics.MeasureRequestCount(ctx, otelmetric.WithAttributes(sliceAttr...), opt) + // OTEL metrics + + opts = append(opts, otelmetric.WithAttributes(sliceAttr...)) + + h.otlpRequestMetrics.MeasureRequestCount(ctx, opts...) } func (h *Metrics) MeasureRequestSize(ctx context.Context, contentLength int64, sliceAttr []attribute.KeyValue, opt otelmetric.AddOption) { + opts := []otelmetric.AddOption{h.baseAttributesOpt, opt} + + // Explode for prometheus metrics + if len(sliceAttr) == 0 { - h.promRequestMetrics.MeasureRequestSize(ctx, contentLength, opt) + h.promRequestMetrics.MeasureRequestSize(ctx, contentLength, opts...) } else { explodeAddInstrument(ctx, sliceAttr, func(ctx context.Context, newOpts ...otelmetric.AddOption) { - newOpts = append(newOpts, opt) + newOpts = append(newOpts, opts...) h.promRequestMetrics.MeasureRequestSize(ctx, contentLength, newOpts...) }) } - h.otlpRequestMetrics.MeasureRequestSize(ctx, contentLength, otelmetric.WithAttributes(sliceAttr...), opt) + // OTEL metrics + + opts = append(opts, otelmetric.WithAttributes(sliceAttr...)) + + h.otlpRequestMetrics.MeasureRequestSize(ctx, contentLength, opts...) } func (h *Metrics) MeasureResponseSize(ctx context.Context, size int64, sliceAttr []attribute.KeyValue, opt otelmetric.AddOption) { + opts := []otelmetric.AddOption{h.baseAttributesOpt, opt} + + // Explode for prometheus metrics + if len(sliceAttr) == 0 { - h.promRequestMetrics.MeasureResponseSize(ctx, size, opt) + h.promRequestMetrics.MeasureResponseSize(ctx, size, opts...) } else { explodeAddInstrument(ctx, sliceAttr, func(ctx context.Context, newOpts ...otelmetric.AddOption) { - newOpts = append(newOpts, opt) + newOpts = append(newOpts, opts...) h.promRequestMetrics.MeasureResponseSize(ctx, size, newOpts...) }) } - h.otlpRequestMetrics.MeasureResponseSize(ctx, size, otelmetric.WithAttributes(sliceAttr...), opt) + // OTEL metrics + + opts = append(opts, otelmetric.WithAttributes(sliceAttr...)) + + h.otlpRequestMetrics.MeasureResponseSize(ctx, size, opts...) } func (h *Metrics) MeasureLatency(ctx context.Context, latency time.Duration, sliceAttr []attribute.KeyValue, opt otelmetric.RecordOption) { + opts := []otelmetric.RecordOption{h.baseAttributesOpt, opt} // Use floating point division here for higher precision (instead of Millisecond method). latencyTime := float64(latency) / float64(time.Millisecond) + // Explode for prometheus metrics + if len(sliceAttr) == 0 { - h.promRequestMetrics.MeasureLatency(ctx, latencyTime, opt) + h.promRequestMetrics.MeasureLatency(ctx, latencyTime, opts...) } else { explodeRecordInstrument(ctx, sliceAttr, func(ctx context.Context, newOpts ...otelmetric.RecordOption) { - newOpts = append(newOpts, opt) + newOpts = append(newOpts, opts...) h.promRequestMetrics.MeasureLatency(ctx, latencyTime, newOpts...) }) } - h.otlpRequestMetrics.MeasureLatency(ctx, latencyTime, otelmetric.WithAttributes(sliceAttr...), opt) + // OTEL metrics + + opts = append(opts, otelmetric.WithAttributes(sliceAttr...)) + + h.otlpRequestMetrics.MeasureLatency(ctx, latencyTime, opts...) } func (h *Metrics) MeasureRequestError(ctx context.Context, sliceAttr []attribute.KeyValue, opt otelmetric.AddOption) { + opts := []otelmetric.AddOption{h.baseAttributesOpt, opt} + + // Explode for prometheus metrics + if len(sliceAttr) == 0 { - h.promRequestMetrics.MeasureRequestError(ctx, opt) + h.promRequestMetrics.MeasureRequestError(ctx, opts...) } else { explodeAddInstrument(ctx, sliceAttr, func(ctx context.Context, newOpts ...otelmetric.AddOption) { - newOpts = append(newOpts, opt) + newOpts = append(newOpts, opts...) h.promRequestMetrics.MeasureRequestError(ctx, newOpts...) }) } - h.otlpRequestMetrics.MeasureRequestError(ctx, otelmetric.WithAttributes(sliceAttr...), opt) + // OTEL metrics + + opts = append(opts, otelmetric.WithAttributes(sliceAttr...)) + + h.otlpRequestMetrics.MeasureRequestError(ctx, opts...) } func (h *Metrics) MeasureOperationPlanningTime(ctx context.Context, planningTime time.Duration, sliceAttr []attribute.KeyValue, opt otelmetric.RecordOption) { + opts := []otelmetric.RecordOption{h.baseAttributesOpt, opt} + // Use floating point division here for higher precision (instead of Millisecond method). elapsedTime := float64(planningTime) / float64(time.Millisecond) + // Explode for prometheus metrics + if len(sliceAttr) == 0 { - h.promRequestMetrics.MeasureOperationPlanningTime(ctx, elapsedTime, opt) + h.promRequestMetrics.MeasureOperationPlanningTime(ctx, elapsedTime, opts...) } else { explodeRecordInstrument(ctx, sliceAttr, func(ctx context.Context, newOpts ...otelmetric.RecordOption) { - newOpts = append(newOpts, opt) + newOpts = append(newOpts, opts...) h.promRequestMetrics.MeasureOperationPlanningTime(ctx, elapsedTime, newOpts...) }) } - h.otlpRequestMetrics.MeasureOperationPlanningTime(ctx, elapsedTime, otelmetric.WithAttributes(sliceAttr...), opt) + // OTEL metrics + + opts = append(opts, otelmetric.WithAttributes(sliceAttr...)) + + h.otlpRequestMetrics.MeasureOperationPlanningTime(ctx, elapsedTime, opts...) } // Flush flushes the metrics to the backend synchronously. @@ -282,6 +341,12 @@ func WithOtlpMeterProvider(otelMeterProvider *metric.MeterProvider) Option { } } +func WithBaseAttributes(baseAttributes []attribute.KeyValue) Option { + return func(h *Metrics) { + h.baseAttributes = baseAttributes + } +} + func WithPromMeterProvider(promMeterProvider *metric.MeterProvider) Option { return func(h *Metrics) { h.promMeterProvider = promMeterProvider From d9744659334f41e8572cf1acf576c0ce2ade9c78 Mon Sep 17 00:00:00 2001 From: starptech Date: Tue, 26 Nov 2024 15:44:03 +0100 Subject: [PATCH 2/3] chore: remove unused function --- router/core/context.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/router/core/context.go b/router/core/context.go index 147d486f44..087b299859 100644 --- a/router/core/context.go +++ b/router/core/context.go @@ -211,13 +211,6 @@ func (r *requestTelemetryAttributes) addCommonTraceAttribute(vals ...attribute.K r.traceAttrs = append(r.traceAttrs, vals...) } -func (r *requestTelemetryAttributes) addCommonMetricAttribute(vals ...attribute.KeyValue) { - if !r.metricsEnabled { - return - } - r.metricAttrs = append(r.metricAttrs, vals...) -} - func (r *requestTelemetryAttributes) addMetricAttribute(vals ...attribute.KeyValue) { if !r.metricsEnabled { return From ba958af13aa56bfb30801786b390a6ba90fd13c0 Mon Sep 17 00:00:00 2001 From: starptech Date: Tue, 26 Nov 2024 19:20:28 +0100 Subject: [PATCH 3/3] chore: remove runtime metric from graph mux, add tests --- router-tests/telemetry_test.go | 394 ++++++++++++++++++ router-tests/testenv/testenv.go | 3 +- router/core/graph_server.go | 51 ++- ...e_metrics.go => router_runtime_metrics.go} | 40 +- 4 files changed, 442 insertions(+), 46 deletions(-) rename router/pkg/metric/{runtime_metrics.go => router_runtime_metrics.go} (97%) diff --git a/router-tests/telemetry_test.go b/router-tests/telemetry_test.go index 8726466c93..c50430cf4d 100644 --- a/router-tests/telemetry_test.go +++ b/router-tests/telemetry_test.go @@ -4,6 +4,7 @@ import ( "context" "net/http" "regexp" + "runtime" "testing" "github.com/stretchr/testify/require" @@ -24,6 +25,381 @@ import ( "github.com/wundergraph/cosmo/router/pkg/trace/tracetest" ) +func TestRuntimeTelemetry(t *testing.T) { + t.Parallel() + + const employeesIDData = `{"data":{"employees":[{"id":1},{"id":2},{"id":3},{"id":4},{"id":5},{"id":7},{"id":8},{"id":10},{"id":11},{"id":12}]}}` + + t.Run("Trace unnamed GraphQL operation and validate all runtime metrics / including a feature graph", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + exporter := tracetest.NewInMemoryExporter(t) + + testenv.Run(t, &testenv.Config{ + TraceExporter: exporter, + MetricReader: metricReader, + EnableRuntimeMetrics: true, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query { employees { id } }`, + }) + require.JSONEq(t, employeesIDData, res.Body) + + rm := metricdata.ResourceMetrics{} + err := metricReader.Collect(context.Background(), &rm) + require.NoError(t, err) + + require.Len(t, rm.ScopeMetrics, 2) + + // Runtime metrics + + sm := getMetricScopeByName(rm.ScopeMetrics, "cosmo.router.runtime") + require.NotNil(t, sm) + + require.Len(t, sm.Metrics, 15) + + runtimeScope := getMetricScopeByName(rm.ScopeMetrics, "cosmo.router.runtime") + require.NotNil(t, runtimeScope) + + runtimeUptimeMetric := metricdata.Metrics{ + Name: "runtime.uptime", + Description: "Seconds since application was initialized", + Unit: "s", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Value: 0, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, runtimeUptimeMetric, *getMetricByName(runtimeScope, "runtime.uptime"), metricdatatest.IgnoreTimestamp()) + + processCpuUsageMetric := metricdata.Metrics{ + Name: "process.cpu.usage", + Description: "Total CPU usage of this process in percentage of host total CPU capacity", + Unit: "percent", + Data: metricdata.Gauge[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + Attributes: attribute.NewSet( + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Value: 0, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, processCpuUsageMetric, *getMetricByName(runtimeScope, "process.cpu.usage"), metricdatatest.IgnoreTimestamp()) + + serverUptimeMetric := metricdata.Metrics{ + Name: "server.uptime", + Description: "Seconds since the server started. Resets between router config changes.", + Unit: "s", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Value: 0, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, serverUptimeMetric, *getMetricByName(runtimeScope, "server.uptime"), metricdatatest.IgnoreTimestamp()) + + processRuntimeGoMemHeapAllocMetric := metricdata.Metrics{ + Name: "process.runtime.go.mem.heap_alloc", + Description: "Bytes of allocated heap objects", + Unit: "By", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Value: 0, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, processRuntimeGoMemHeapAllocMetric, *getMetricByName(runtimeScope, "process.runtime.go.mem.heap_alloc"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + + processRuntimeGoMemHeapIdleMetric := metricdata.Metrics{ + Name: "process.runtime.go.mem.heap_idle", + Description: "Bytes in idle (unused) spans", + Unit: "By", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Value: 0, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, processRuntimeGoMemHeapIdleMetric, *getMetricByName(runtimeScope, "process.runtime.go.mem.heap_idle"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + + processRuntimeGoMemHeapInUseMetric := metricdata.Metrics{ + Name: "process.runtime.go.mem.heap_inuse", + Description: "Bytes in in-use spans", + Unit: "By", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Value: 0, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, processRuntimeGoMemHeapInUseMetric, *getMetricByName(runtimeScope, "process.runtime.go.mem.heap_inuse"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + + processRuntimeGoMemHeapObjectsMetric := metricdata.Metrics{ + Name: "process.runtime.go.mem.heap_objects", + Description: "Number of allocated heap objects", + Unit: "", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Value: 0, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, processRuntimeGoMemHeapObjectsMetric, *getMetricByName(runtimeScope, "process.runtime.go.mem.heap_objects"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + + processRuntimeGoMemHeapReleasedMetric := metricdata.Metrics{ + Name: "process.runtime.go.mem.heap_released", + Description: "Bytes of idle spans whose physical memory has been returned to the OS", + Unit: "By", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Value: 0, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, processRuntimeGoMemHeapReleasedMetric, *getMetricByName(runtimeScope, "process.runtime.go.mem.heap_released"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + + processRuntimeGoMemHeapSysMetric := metricdata.Metrics{ + Name: "process.runtime.go.mem.heap_sys", + Description: "Bytes of heap memory obtained from the OS", + Unit: "By", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Value: 0, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, processRuntimeGoMemHeapSysMetric, *getMetricByName(runtimeScope, "process.runtime.go.mem.heap_sys"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + + processRuntimeGoMemLiveObjectsMetric := metricdata.Metrics{ + Name: "process.runtime.go.mem.live_objects", + Description: "Number of live objects is the number of cumulative Mallocs - Frees", + Unit: "", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Value: 0, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, processRuntimeGoMemLiveObjectsMetric, *getMetricByName(runtimeScope, "process.runtime.go.mem.live_objects"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + + processRuntimeGoGcCountMetric := metricdata.Metrics{ + Name: "process.runtime.go.gc.count", + Description: "Number of completed garbage collection cycles", + Unit: "", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Value: 0, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, processRuntimeGoGcCountMetric, *getMetricByName(runtimeScope, "process.runtime.go.gc.count"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + + processRuntimeGoGoRoutinesCountMetric := metricdata.Metrics{ + Name: "process.runtime.go.goroutines.count", + Description: "Number of goroutines that currently exist", + Unit: "", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Value: 0, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, processRuntimeGoGoRoutinesCountMetric, *getMetricByName(runtimeScope, "process.runtime.go.goroutines.count"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + + processRuntimeGoInfoMetric := metricdata.Metrics{ + Name: "process.runtime.go.info", + Description: "Information about the Go runtime environment", + Unit: "", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + attribute.String("version", runtime.Version()), + ), + Value: 0, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, processRuntimeGoInfoMetric, *getMetricByName(runtimeScope, "process.runtime.go.info"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + + processRuntimeGoGcPauseTotalMetric := metricdata.Metrics{ + Name: "process.runtime.go.gc.pause_total", + Description: "Cumulative nanoseconds in GC stop-the-world pauses since the program started", + Unit: "ns", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + otel.WgRouterClusterName.String(""), + otel.WgFederatedGraphID.String("graph"), + otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain()), + otel.WgRouterVersion.String("dev"), + ), + Value: 0, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, processRuntimeGoGcPauseTotalMetric, *getMetricByName(runtimeScope, "process.runtime.go.gc.pause_total"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + + processRuntimeGoGcPauseMetric := metricdata.Metrics{ + Name: "process.runtime.go.gc.pause", + Description: "Amount of nanoseconds in GC stop-the-world pauses", + Unit: "ns", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + {}, + }, + }, + } + + metricdatatest.AssertEqual(t, processRuntimeGoGcPauseMetric, *getMetricByName(runtimeScope, "process.runtime.go.gc.pause"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + }) + }) +} + func TestTelemetry(t *testing.T) { t.Parallel() @@ -3635,3 +4011,21 @@ func assertAttributeNotInSet(t *testing.T, set attribute.Set, attr attribute.Key _, ok := set.Value(attr.Key) require.False(t, ok) } + +func getMetricByName(scopeMetric *metricdata.ScopeMetrics, name string) *metricdata.Metrics { + for _, m := range scopeMetric.Metrics { + if m.Name == name { + return &m + } + } + return nil +} + +func getMetricScopeByName(metrics []metricdata.ScopeMetrics, name string) *metricdata.ScopeMetrics { + for _, m := range metrics { + if m.Scope.Name == name { + return &m + } + } + return nil +} diff --git a/router-tests/testenv/testenv.go b/router-tests/testenv/testenv.go index 413fb87428..7ea78c160e 100644 --- a/router-tests/testenv/testenv.go +++ b/router-tests/testenv/testenv.go @@ -146,6 +146,7 @@ type Config struct { Logger *zap.Logger AccessLogger *zap.Logger AccessLogFields []config.CustomAttribute + EnableRuntimeMetrics bool } type SubgraphsConfig struct { @@ -766,7 +767,7 @@ func configureRouter(listenerAddr string, testConfig *Config, routerConfig *node }, OTLP: config.MetricsOTLP{ Enabled: true, - RouterRuntime: false, + RouterRuntime: testConfig.EnableRuntimeMetrics, ExcludeMetrics: testConfig.MetricExclusions.ExcludedOTLPMetrics, ExcludeMetricLabels: testConfig.MetricExclusions.ExcludedOTLPMetricLabels, }, diff --git a/router/core/graph_server.go b/router/core/graph_server.go index 17ade5b907..7d399c92d2 100644 --- a/router/core/graph_server.go +++ b/router/core/graph_server.go @@ -86,6 +86,7 @@ type ( // does not include websocket (hijacked) connections inFlightRequests *atomic.Uint64 graphMuxes []*graphMux + runtimeMetrics *rmetric.RuntimeMetrics } ) @@ -124,6 +125,23 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC s.baseOtelAttributes = baseOtelAttributes + if s.metricConfig.OpenTelemetry.RouterRuntime { + s.runtimeMetrics = rmetric.NewRuntimeMetrics( + s.logger, + s.otlpMeterProvider, + // We track runtime metrics with base router config version + append([]attribute.KeyValue{ + otel.WgRouterConfigVersion.String(s.baseRouterConfigVersion), + }, baseOtelAttributes...), + s.processStartTime, + ) + + // Start runtime metrics + if err := s.runtimeMetrics.Start(); err != nil { + return nil, err + } + } + if s.registrationInfo != nil { publicKey, err := jwt.ParseECPublicKeyFromPEM([]byte(s.registrationInfo.GetGraphPublicKey())) if err != nil { @@ -271,7 +289,6 @@ type graphMux struct { validationCache *ristretto.Cache[uint64, bool] operationHashCache *ristretto.Cache[uint64, string] accessLogsFileLogger *logging.BufferedLogger - runtimeMetrics *rmetric.RuntimeMetrics metricStore rmetric.Store } @@ -309,12 +326,6 @@ func (s *graphMux) Shutdown(ctx context.Context) error { } } - if s.runtimeMetrics != nil { - if aErr := s.runtimeMetrics.Shutdown(); aErr != nil { - err = errors.Join(err, aErr) - } - } - return err } @@ -342,25 +353,6 @@ func (s *graphServer) buildGraphMux(ctx context.Context, baseOtelAttributes = append(baseOtelAttributes, otel.WgFeatureFlag.String(featureFlagName)) } - if s.metricConfig.OpenTelemetry.RouterRuntime { - // Create runtime metrics exported to OTEL - gm.runtimeMetrics = rmetric.NewRuntimeMetrics( - s.logger, - s.otlpMeterProvider, - // We track runtime metrics with base router config version - // even when we have multiple feature flags - append([]attribute.KeyValue{ - otel.WgRouterConfigVersion.String(s.baseRouterConfigVersion), - }, baseOtelAttributes...), - s.processStartTime, - ) - - // Start runtime metrics - if err := gm.runtimeMetrics.Start(); err != nil { - return nil, err - } - } - metricsEnabled := s.metricConfig.IsEnabled() traceEnabled := s.traceConfig.Enabled @@ -1025,6 +1017,13 @@ func (s *graphServer) Shutdown(ctx context.Context) error { ctx = newCtx } + if s.runtimeMetrics != nil { + if err := s.runtimeMetrics.Shutdown(); err != nil { + s.logger.Error("Failed to shutdown runtime metrics", zap.Error(err)) + finalErr = errors.Join(finalErr, err) + } + } + if s.pubSubProviders != nil { s.logger.Debug("Shutting down pubsub providers") diff --git a/router/pkg/metric/runtime_metrics.go b/router/pkg/metric/router_runtime_metrics.go similarity index 97% rename from router/pkg/metric/runtime_metrics.go rename to router/pkg/metric/router_runtime_metrics.go index f920fca32b..91a0115477 100644 --- a/router/pkg/metric/runtime_metrics.go +++ b/router/pkg/metric/router_runtime_metrics.go @@ -30,8 +30,10 @@ type RuntimeMetrics struct { logger *zap.Logger } +// NewRuntimeMetrics creates a new instance of RuntimeMetrics. Runtime metrics are metrics that are collected from the Go and process runtime. +// These metrics are shared across feature flags. func NewRuntimeMetrics(logger *zap.Logger, meterProvider *metric.MeterProvider, baseAttributes []attribute.KeyValue, processStartTime time.Time) *RuntimeMetrics { - // Used to export metrics to OpenTelemetry backend. + // Calling meter with the same name and version will return the same instance of the meter. meter := meterProvider.Meter(cosmoRouterRuntimeMeterName, otelmetric.WithInstrumentationVersion(cosmoRouterRuntimeMeterVersion), ) @@ -81,15 +83,6 @@ func (r *RuntimeMetrics) Start() error { return err } - serverUptime, err := r.meter.Int64ObservableCounter( - "server.uptime", - otelmetric.WithUnit("s"), - otelmetric.WithDescription("Seconds since the server started. Resets between router config changes."), - ) - if err != nil { - return err - } - if processCPUUsage, err = r.meter.Float64ObservableGauge( "process.cpu.usage", otelmetric.WithUnit("percent"), @@ -196,13 +189,22 @@ func (r *RuntimeMetrics) Start() error { return err } - now := time.Now() + serverUptime, err := r.meter.Int64ObservableCounter( + "server.uptime", + otelmetric.WithUnit("s"), + otelmetric.WithDescription("Seconds since the server started. Resets between router config changes."), + ) + if err != nil { + return err + } p, err := process.NewProcess(int32(os.Getpid())) if err != nil { return err } + now := time.Now() + rc, err := r.meter.RegisterCallback( func(ctx context.Context, o otelmetric.Observer) error { lock.Lock() @@ -222,13 +224,6 @@ func (r *RuntimeMetrics) Start() error { ) } - /* - * Process uptime - */ - o.ObserveInt64(runtimeUptime, int64(time.Since(r.processStartTime).Seconds()), - otelmetric.WithAttributes(r.baseAttributes...), - ) - /** * Server uptime. Everytime the store is reloaded, the server uptime is reset. */ @@ -238,6 +233,13 @@ func (r *RuntimeMetrics) Start() error { otelmetric.WithAttributes(r.baseAttributes...), ) + /* + * Process uptime + */ + o.ObserveInt64(runtimeUptime, int64(time.Since(r.processStartTime).Seconds()), + otelmetric.WithAttributes(r.baseAttributes...), + ) + /** * Go runtime metrics */ @@ -288,8 +290,8 @@ func (r *RuntimeMetrics) Start() error { pauseTotalNs, processCPUUsage, - serverUptime, runtimeUptime, + serverUptime, ) if err != nil {