From c91caf696e479734214657800fe186817ce4b0e6 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Sun, 28 Jul 2024 07:24:59 -0400 Subject: [PATCH] Move client scraping and metric aggregation into a separate thread. --- receiver/dcgmreceiver/client.go | 7 +- receiver/dcgmreceiver/scraper.go | 514 ++++++++++++---------- receiver/dcgmreceiver/scraper_gpu_test.go | 10 +- receiver/dcgmreceiver/util.go | 43 ++ 4 files changed, 350 insertions(+), 224 deletions(-) diff --git a/receiver/dcgmreceiver/client.go b/receiver/dcgmreceiver/client.go index a806643ea..93e17c24e 100644 --- a/receiver/dcgmreceiver/client.go +++ b/receiver/dcgmreceiver/client.go @@ -20,6 +20,7 @@ package dcgmreceiver import ( "errors" "fmt" + "sync" "time" "github.com/NVIDIA/go-dcgm/pkg/dcgm" @@ -65,6 +66,7 @@ type dcgmMetric struct { var dcgmInit = func(args ...string) (func(), error) { return dcgm.Init(dcgm.Standalone, args...) } +var dcgmInitMutex sync.Mutex var dcgmGetLatestValuesForFields = dcgm.GetLatestValuesForFields @@ -126,7 +128,10 @@ func newClient(settings *dcgmClientSettings, logger *zap.Logger) (*dcgmClient, e // only if the connection is initialized successfully without error func initializeDcgm(endpoint string, logger *zap.Logger) (func(), error) { isSocket := "0" - dcgmCleanup, err := dcgmInit(endpoint, isSocket) + dcgmInitMutex.Lock() + dcgmInitFunc := dcgmInit + dcgmInitMutex.Unlock() + dcgmCleanup, err := dcgmInitFunc(endpoint, isSocket) if err != nil { msg := fmt.Sprintf("Unable to connect to DCGM daemon at %s on %v; Is the DCGM daemon running?", endpoint, err) logger.Sugar().Warn(msg) diff --git a/receiver/dcgmreceiver/scraper.go b/receiver/dcgmreceiver/scraper.go index 3fb6a3d2d..593427d13 100644 --- a/receiver/dcgmreceiver/scraper.go +++ b/receiver/dcgmreceiver/scraper.go @@ -18,11 +18,10 @@ package dcgmreceiver import ( - "cmp" "context" "errors" "fmt" - "slices" + "sync" "time" "go.opentelemetry.io/collector/component" @@ -34,39 +33,16 @@ import ( ) type dcgmScraper struct { - config *Config - settings receiver.CreateSettings - client *dcgmClient - mb *metadata.MetricsBuilder - // Aggregate cumulative values. - aggregates struct { - energyConsumption struct { - total *defaultMap[uint, *cumulativeTracker[int64]] - fallback *defaultMap[uint, *rateIntegrator[float64]] // ...from power usage rate. - } - pcieTotal struct { - tx *defaultMap[uint, *rateIntegrator[int64]] // ...from pcie tx. - rx *defaultMap[uint, *rateIntegrator[int64]] // ...from pcie rx. - } - nvlinkTotal struct { - tx *defaultMap[uint, *rateIntegrator[int64]] // ...from nvlink tx. - rx *defaultMap[uint, *rateIntegrator[int64]] // ...from nvlink rx. - } - throttleDuration struct { - powerViolation *defaultMap[uint, *cumulativeTracker[int64]] - thermalViolation *defaultMap[uint, *cumulativeTracker[int64]] - syncBoostViolation *defaultMap[uint, *cumulativeTracker[int64]] - boardLimitViolation *defaultMap[uint, *cumulativeTracker[int64]] - lowUtilViolation *defaultMap[uint, *cumulativeTracker[int64]] - reliabilityViolation *defaultMap[uint, *cumulativeTracker[int64]] - totalAppClocksViolation *defaultMap[uint, *cumulativeTracker[int64]] - totalBaseClocksViolation *defaultMap[uint, *cumulativeTracker[int64]] - } - eccTotal struct { - sbe *defaultMap[uint, *cumulativeTracker[int64]] - dbe *defaultMap[uint, *cumulativeTracker[int64]] - } - } + config *Config + settings receiver.CreateSettings + client *dcgmClient + stopClientPolling chan bool + mb *metadata.MetricsBuilder + // Resource set. + devices map[uint]bool + // Value trackers. + aggregates map[string]*defaultMap[uint, typedMetricTracker] + mu sync.Mutex } func newDcgmScraper(config *Config, settings receiver.CreateSettings) *dcgmScraper { @@ -114,39 +90,145 @@ func newCumulativeTracker[V int64 | float64]() *cumulativeTracker[V] { return ct } +func newGaugeTracker[V int64 | float64]() *gaugeTracker[V] { + gt := new(gaugeTracker[V]) + gt.Reset() + return gt +} + +func makeTypedMetricTracker[V int64 | float64, T metricTracker[V]](f func() T) func() typedMetricTracker { + var tmt typedMetricTracker + var m interface{} = f() + switch v := m.(type) { + case metricTracker[int64]: + tmt.i64 = v + case metricTracker[float64]: + tmt.f64 = v + } + return func() typedMetricTracker { return tmt } +} + +func newTypedMetricTrackerMap[V int64 | float64, T metricTracker[V]](f func() T) *defaultMap[uint, typedMetricTracker] { + return newDefaultMap[uint](makeTypedMetricTracker(f)) +} + func (s *dcgmScraper) start(_ context.Context, _ component.Host) error { startTime := pcommon.NewTimestampFromTime(time.Now()) mbConfig := metadata.DefaultMetricsBuilderConfig() mbConfig.Metrics = s.config.Metrics s.mb = metadata.NewMetricsBuilder( mbConfig, s.settings, metadata.WithStartTime(startTime)) - s.aggregates.energyConsumption.total = newDefaultMap[uint](newCumulativeTracker[int64]) - s.aggregates.energyConsumption.fallback = newDefaultMap[uint](newRateIntegrator[float64]) - s.aggregates.pcieTotal.tx = newDefaultMap[uint](newRateIntegrator[int64]) - s.aggregates.pcieTotal.rx = newDefaultMap[uint](newRateIntegrator[int64]) - s.aggregates.nvlinkTotal.tx = newDefaultMap[uint](newRateIntegrator[int64]) - s.aggregates.nvlinkTotal.rx = newDefaultMap[uint](newRateIntegrator[int64]) - s.aggregates.throttleDuration.powerViolation = newDefaultMap[uint](newCumulativeTracker[int64]) - s.aggregates.throttleDuration.thermalViolation = newDefaultMap[uint](newCumulativeTracker[int64]) - s.aggregates.throttleDuration.syncBoostViolation = newDefaultMap[uint](newCumulativeTracker[int64]) - s.aggregates.throttleDuration.boardLimitViolation = newDefaultMap[uint](newCumulativeTracker[int64]) - s.aggregates.throttleDuration.lowUtilViolation = newDefaultMap[uint](newCumulativeTracker[int64]) - s.aggregates.throttleDuration.reliabilityViolation = newDefaultMap[uint](newCumulativeTracker[int64]) - s.aggregates.throttleDuration.totalAppClocksViolation = newDefaultMap[uint](newCumulativeTracker[int64]) - s.aggregates.throttleDuration.totalBaseClocksViolation = newDefaultMap[uint](newCumulativeTracker[int64]) - s.aggregates.eccTotal.sbe = newDefaultMap[uint](newCumulativeTracker[int64]) - s.aggregates.eccTotal.dbe = newDefaultMap[uint](newCumulativeTracker[int64]) + s.aggregates = map[string]*defaultMap[uint, typedMetricTracker]{ + "DCGM_FI_PROF_GR_ENGINE_ACTIVE": newTypedMetricTrackerMap(newGaugeTracker[float64]), + "DCGM_FI_DEV_GPU_UTIL": newTypedMetricTrackerMap(newGaugeTracker[int64]), + "DCGM_FI_PROF_SM_ACTIVE": newTypedMetricTrackerMap(newGaugeTracker[float64]), + "DCGM_FI_PROF_SM_OCCUPANCY": newTypedMetricTrackerMap(newGaugeTracker[float64]), + "DCGM_FI_PROF_PIPE_TENSOR_ACTIVE": newTypedMetricTrackerMap(newGaugeTracker[float64]), + "DCGM_FI_PROF_PIPE_FP64_ACTIVE": newTypedMetricTrackerMap(newGaugeTracker[float64]), + "DCGM_FI_PROF_PIPE_FP32_ACTIVE": newTypedMetricTrackerMap(newGaugeTracker[float64]), + "DCGM_FI_PROF_PIPE_FP16_ACTIVE": newTypedMetricTrackerMap(newGaugeTracker[float64]), + "DCGM_FI_DEV_ENC_UTIL": newTypedMetricTrackerMap(newGaugeTracker[int64]), + "DCGM_FI_DEV_DEC_UTIL": newTypedMetricTrackerMap(newGaugeTracker[int64]), + "DCGM_FI_DEV_FB_FREE": newTypedMetricTrackerMap(newGaugeTracker[int64]), + "DCGM_FI_DEV_FB_USED": newTypedMetricTrackerMap(newGaugeTracker[int64]), + "DCGM_FI_DEV_FB_RESERVED": newTypedMetricTrackerMap(newGaugeTracker[int64]), + "DCGM_FI_PROF_DRAM_ACTIVE": newTypedMetricTrackerMap(newGaugeTracker[float64]), + "DCGM_FI_DEV_MEM_COPY_UTIL": newTypedMetricTrackerMap(newGaugeTracker[int64]), + "DCGM_FI_PROF_PCIE_TX_BYTES": newTypedMetricTrackerMap(newRateIntegrator[int64]), + "DCGM_FI_PROF_PCIE_RX_BYTES": newTypedMetricTrackerMap(newRateIntegrator[int64]), + "DCGM_FI_PROF_NVLINK_TX_BYTES": newTypedMetricTrackerMap(newRateIntegrator[int64]), + "DCGM_FI_PROF_NVLINK_RX_BYTES": newTypedMetricTrackerMap(newRateIntegrator[int64]), + "DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION": newTypedMetricTrackerMap(newCumulativeTracker[float64]), + "DCGM_FI_DEV_POWER_USAGE": newTypedMetricTrackerMap(newRateIntegrator[float64]), + "DCGM_FI_DEV_GPU_TEMP": newTypedMetricTrackerMap(newGaugeTracker[int64]), + "DCGM_FI_DEV_SM_CLOCK": newTypedMetricTrackerMap(newGaugeTracker[int64]), + "DCGM_FI_DEV_POWER_VIOLATION": newTypedMetricTrackerMap(newCumulativeTracker[int64]), + "DCGM_FI_DEV_THERMAL_VIOLATION": newTypedMetricTrackerMap(newCumulativeTracker[int64]), + "DCGM_FI_DEV_SYNC_BOOST_VIOLATION": newTypedMetricTrackerMap(newCumulativeTracker[int64]), + "DCGM_FI_DEV_BOARD_LIMIT_VIOLATION": newTypedMetricTrackerMap(newCumulativeTracker[int64]), + "DCGM_FI_DEV_LOW_UTIL_VIOLATION": newTypedMetricTrackerMap(newCumulativeTracker[int64]), + "DCGM_FI_DEV_RELIABILITY_VIOLATION": newTypedMetricTrackerMap(newCumulativeTracker[int64]), + "DCGM_FI_DEV_TOTAL_APP_CLOCKS_VIOLATION": newTypedMetricTrackerMap(newCumulativeTracker[int64]), + "DCGM_FI_DEV_TOTAL_BASE_CLOCKS_VIOLATION": newTypedMetricTrackerMap(newCumulativeTracker[int64]), + "DCGM_FI_DEV_ECC_SBE_VOL_TOTAL": newTypedMetricTrackerMap(newCumulativeTracker[int64]), + "DCGM_FI_DEV_ECC_DBE_VOL_TOTAL": newTypedMetricTrackerMap(newCumulativeTracker[int64]), + } + + err := s.initClient() + if err != nil { + return err + } + s.stopClientPolling = make(chan bool) + go s.clientPoller(10 * time.Second) // TODO return nil } func (s *dcgmScraper) stop(_ context.Context) error { +L: + for i := 1; i < 2; i++ { + select { + // Doing this in a blocking fashion causes hangs. + case s.stopClientPolling <- true: + break L + default: + s.settings.Logger.Sugar().Debug("Stop signal ignored; trying again") + } + } if s.client != nil { s.client.cleanup() } return nil } +func (s *dcgmScraper) collectClientMetrics() error { + err := s.initClient() + if err != nil || s.client == nil { + return err + } + + s.settings.Logger.Sugar().Debug("Client created, collecting metrics") + + deviceMetrics, err := s.client.collectDeviceMetrics() + if err != nil { + s.settings.Logger.Sugar().Warnf("Metrics not collected; err=%v", err) + return err + } + s.settings.Logger.Sugar().Debugf("Metrics collected: %d", len(deviceMetrics)) + + s.mu.Lock() + defer s.mu.Unlock() + for gpuIndex, gpuMetrics := range deviceMetrics { + s.settings.Logger.Sugar().Debugf("Got %d metrics for %d: %v", len(gpuMetrics), gpuIndex, gpuMetrics) + s.devices[gpuIndex] = true + for _, metric := range gpuMetrics { + tmt := s.aggregates[metric.name].Get(gpuIndex) + switch v := metric.value.(type) { + case int64: + tmt.i64.Update(metric.timestamp, v) + case float64: + tmt.f64.Update(metric.timestamp, v) + } + } + } + return nil +} + +func (s *dcgmScraper) clientPoller(interval time.Duration) { + _ = s.collectClientMetrics() + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + _ = s.collectClientMetrics() + case <-s.stopClientPolling: + s.settings.Logger.Sugar().Info("Stopping client poller") + return + } + } +} + func discoverRequestedFields(config *Config) []string { requestedFields := []string{} if config.Metrics.GpuDcgmUtilization.Enabled { @@ -220,188 +302,176 @@ func discoverRequestedFields(config *Config) []string { return requestedFields } -func (s *dcgmScraper) scrape(_ context.Context) (pmetric.Metrics, error) { - err := s.initClient() - if err != nil || s.client == nil { - return s.mb.Emit(), err - } +type point struct { + timestamp int64 + value interface{} +} - s.settings.Logger.Sugar().Debug("Client created, collecting metrics") - deviceMetrics, err := s.client.collectDeviceMetrics() - if err != nil { - s.settings.Logger.Sugar().Warnf("Metrics not collected; err=%v", err) - return s.mb.Emit(), err +func (s *dcgmScraper) snapshotClientMetrics() map[uint]map[string]point { + s.mu.Lock() + defer s.mu.Unlock() + metrics := make(map[uint]map[string]point) + for gpuIndex := range s.devices { + perDevice := make(map[string]point) + metrics[gpuIndex] = perDevice + // We have to iterate over all metrics for each device. This is not ideal. + for name, points := range s.aggregates { + if tmt, ok := points.TryGet(gpuIndex); ok { + switch { + case tmt.i64 != nil: + ts, v := tmt.i64.Value() + perDevice[name] = point{timestamp: ts, value: v} + case tmt.f64 != nil: + ts, v := tmt.f64.Value() + perDevice[name] = point{timestamp: ts, value: v} + } + } + } } - s.settings.Logger.Sugar().Debugf("Metrics collected: %d", len(deviceMetrics)) + return metrics +} - now := pcommon.NewTimestampFromTime(time.Now()) - for gpuIndex, gpuMetrics := range deviceMetrics { - metricsByName := make(map[string][]dcgmMetric) - for _, metric := range gpuMetrics { - metricsByName[metric.name] = append(metricsByName[metric.name], metric) - } - s.settings.Logger.Sugar().Debugf("Got %d unique metrics: %v", len(metricsByName), metricsByName) - metrics := make(map[string]dcgmMetric) - for name, points := range metricsByName { - slices.SortStableFunc(points, func(a, b dcgmMetric) int { - return cmp.Compare(a.timestamp, b.timestamp) - }) - metrics[name] = points[len(points)-1] - } +func (s *dcgmScraper) scrape(_ context.Context) (pmetric.Metrics, error) { + clientMetrics := s.snapshotClientMetrics() + for gpuIndex, metrics := range clientMetrics { rb := s.mb.NewResourceBuilder() rb.SetGpuNumber(fmt.Sprintf("%d", gpuIndex)) rb.SetGpuUUID(s.client.getDeviceUUID(gpuIndex)) rb.SetGpuModel(s.client.getDeviceModelName(gpuIndex)) gpuResource := rb.Emit() - if metric, ok := metrics["DCGM_FI_PROF_GR_ENGINE_ACTIVE"]; ok { - s.mb.RecordGpuDcgmUtilizationDataPoint(now, metric.asFloat64()) - } else if metric, ok := metrics["DCGM_FI_DEV_GPU_UTIL"]; ok { // fallback - gpuUtil := float64(metric.asInt64()) / 100.0 /* normalize */ - s.mb.RecordGpuDcgmUtilizationDataPoint(now, gpuUtil) - } - if metric, ok := metrics["DCGM_FI_PROF_SM_ACTIVE"]; ok { - s.mb.RecordGpuDcgmSmUtilizationDataPoint(now, metric.asFloat64()) - } - if metric, ok := metrics["DCGM_FI_PROF_SM_OCCUPANCY"]; ok { - s.mb.RecordGpuDcgmSmOccupancyDataPoint(now, metric.asFloat64()) - } - if metric, ok := metrics["DCGM_FI_PROF_PIPE_TENSOR_ACTIVE"]; ok { - s.mb.RecordGpuDcgmPipeUtilizationDataPoint(now, metric.asFloat64(), metadata.AttributeGpuPipeTensor) - } - if metric, ok := metrics["DCGM_FI_PROF_PIPE_FP64_ACTIVE"]; ok { - s.mb.RecordGpuDcgmPipeUtilizationDataPoint(now, metric.asFloat64(), metadata.AttributeGpuPipeFp64) - } - if metric, ok := metrics["DCGM_FI_PROF_PIPE_FP32_ACTIVE"]; ok { - s.mb.RecordGpuDcgmPipeUtilizationDataPoint(now, metric.asFloat64(), metadata.AttributeGpuPipeFp32) - } - if metric, ok := metrics["DCGM_FI_PROF_PIPE_FP16_ACTIVE"]; ok { - s.mb.RecordGpuDcgmPipeUtilizationDataPoint(now, metric.asFloat64(), metadata.AttributeGpuPipeFp16) - } - if metric, ok := metrics["DCGM_FI_DEV_ENC_UTIL"]; ok { - encUtil := float64(metric.asInt64()) / 100.0 /* normalize */ - s.mb.RecordGpuDcgmCodecEncoderUtilizationDataPoint(now, encUtil) - } - if metric, ok := metrics["DCGM_FI_DEV_DEC_UTIL"]; ok { - decUtil := float64(metric.asInt64()) / 100.0 /* normalize */ - s.mb.RecordGpuDcgmCodecDecoderUtilizationDataPoint(now, decUtil) - } - if metric, ok := metrics["DCGM_FI_DEV_FB_FREE"]; ok { - bytesFree := 1e6 * metric.asInt64() /* MBy to By */ - s.mb.RecordGpuDcgmMemoryBytesUsedDataPoint(now, bytesFree, metadata.AttributeGpuMemoryStateFree) - } - if metric, ok := metrics["DCGM_FI_DEV_FB_USED"]; ok { - bytesUsed := 1e6 * metric.asInt64() /* MBy to By */ - s.mb.RecordGpuDcgmMemoryBytesUsedDataPoint(now, bytesUsed, metadata.AttributeGpuMemoryStateUsed) - } - if metric, ok := metrics["DCGM_FI_DEV_FB_RESERVED"]; ok { - bytesReserved := 1e6 * metric.asInt64() /* MBy to By */ - s.mb.RecordGpuDcgmMemoryBytesUsedDataPoint(now, bytesReserved, metadata.AttributeGpuMemoryStateReserved) - } - if metric, ok := metrics["DCGM_FI_PROF_DRAM_ACTIVE"]; ok { - s.mb.RecordGpuDcgmMemoryBandwidthUtilizationDataPoint(now, metric.asFloat64()) - } else if metric, ok := metrics["DCGM_FI_DEV_MEM_COPY_UTIL"]; ok { // fallback - memCopyUtil := float64(metric.asInt64()) / 100.0 /* normalize */ - s.mb.RecordGpuDcgmMemoryBandwidthUtilizationDataPoint(now, memCopyUtil) - } - if metric, ok := metrics["DCGM_FI_PROF_PCIE_TX_BYTES"]; ok { - s.aggregates.pcieTotal.tx.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, pcieTx := s.aggregates.pcieTotal.tx.Get(gpuIndex).Value() - s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieTx, metadata.AttributeNetworkIoDirectionTransmit) - } - if metric, ok := metrics["DCGM_FI_PROF_PCIE_RX_BYTES"]; ok { - s.aggregates.pcieTotal.rx.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, pcieRx := s.aggregates.pcieTotal.rx.Get(gpuIndex).Value() - s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieRx, metadata.AttributeNetworkIoDirectionReceive) - } - if metric, ok := metrics["DCGM_FI_PROF_NVLINK_TX_BYTES"]; ok { - s.aggregates.nvlinkTotal.tx.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, nvlinkTx := s.aggregates.nvlinkTotal.tx.Get(gpuIndex).Value() - s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkTx, metadata.AttributeNetworkIoDirectionTransmit) - } - if metric, ok := metrics["DCGM_FI_PROF_NVLINK_RX_BYTES"]; ok { - s.aggregates.nvlinkTotal.rx.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, nvlinkRx := s.aggregates.nvlinkTotal.rx.Get(gpuIndex).Value() - s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkRx, metadata.AttributeNetworkIoDirectionReceive) - } - if metric, ok := metrics["DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION"]; ok { - s.aggregates.energyConsumption.total.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, value := s.aggregates.energyConsumption.total.Get(gpuIndex).Value() - energyUsed := float64(value) / 1e3 /* mJ to J */ - s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed) - } else if metric, ok := metrics["DCGM_FI_DEV_POWER_USAGE"]; ok { // fallback - s.aggregates.energyConsumption.fallback.Get(gpuIndex).Update(metric.timestamp, metric.asFloat64()) - _, energyUsed := s.aggregates.energyConsumption.fallback.Get(gpuIndex).Value() - s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed) - } - if metric, ok := metrics["DCGM_FI_DEV_GPU_TEMP"]; ok { - s.mb.RecordGpuDcgmTemperatureDataPoint(now, float64(metric.asInt64())) - } - if metric, ok := metrics["DCGM_FI_DEV_SM_CLOCK"]; ok { - clockFreq := 1e6 * float64(metric.asInt64()) /* MHz to Hz */ - s.mb.RecordGpuDcgmClockFrequencyDataPoint(now, clockFreq) - } - if metric, ok := metrics["DCGM_FI_DEV_POWER_VIOLATION"]; ok { - s.aggregates.throttleDuration.powerViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, value := s.aggregates.throttleDuration.powerViolation.Get(gpuIndex).Value() - violationTime := float64(value) / 1e6 /* us to s */ - s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationPower) - } - if metric, ok := metrics["DCGM_FI_DEV_THERMAL_VIOLATION"]; ok { - s.aggregates.throttleDuration.thermalViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, value := s.aggregates.throttleDuration.thermalViolation.Get(gpuIndex).Value() - violationTime := float64(value) / 1e6 /* us to s */ - s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationThermal) - } - if metric, ok := metrics["DCGM_FI_DEV_SYNC_BOOST_VIOLATION"]; ok { - s.aggregates.throttleDuration.syncBoostViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, value := s.aggregates.throttleDuration.syncBoostViolation.Get(gpuIndex).Value() - violationTime := float64(value) / 1e6 /* us to s */ - s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationSyncBoost) - } - if metric, ok := metrics["DCGM_FI_DEV_BOARD_LIMIT_VIOLATION"]; ok { - s.aggregates.throttleDuration.boardLimitViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, value := s.aggregates.throttleDuration.boardLimitViolation.Get(gpuIndex).Value() - violationTime := float64(value) / 1e6 /* us to s */ - s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationBoardLimit) - } - if metric, ok := metrics["DCGM_FI_DEV_LOW_UTIL_VIOLATION"]; ok { - s.aggregates.throttleDuration.lowUtilViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, value := s.aggregates.throttleDuration.lowUtilViolation.Get(gpuIndex).Value() - violationTime := float64(value) / 1e6 /* us to s */ - s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationLowUtil) - } - if metric, ok := metrics["DCGM_FI_DEV_RELIABILITY_VIOLATION"]; ok { - s.aggregates.throttleDuration.reliabilityViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, value := s.aggregates.throttleDuration.reliabilityViolation.Get(gpuIndex).Value() - violationTime := float64(value) / 1e6 /* us to s */ - s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationReliability) - } - if metric, ok := metrics["DCGM_FI_DEV_TOTAL_APP_CLOCKS_VIOLATION"]; ok { - s.aggregates.throttleDuration.totalAppClocksViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, value := s.aggregates.throttleDuration.totalAppClocksViolation.Get(gpuIndex).Value() - violationTime := float64(value) / 1e6 /* us to s */ - s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationAppClock) - } - if metric, ok := metrics["DCGM_FI_DEV_TOTAL_BASE_CLOCKS_VIOLATION"]; ok { - s.aggregates.throttleDuration.totalBaseClocksViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, value := s.aggregates.throttleDuration.totalBaseClocksViolation.Get(gpuIndex).Value() - violationTime := float64(value) / 1e6 /* us to s */ - s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationBaseClock) - } - if metric, ok := metrics["DCGM_FI_DEV_ECC_SBE_VOL_TOTAL"]; ok { - s.aggregates.eccTotal.sbe.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, sbeErrors := s.aggregates.eccTotal.sbe.Get(gpuIndex).Value() - s.mb.RecordGpuDcgmEccErrorsDataPoint(now, sbeErrors, metadata.AttributeGpuErrorTypeSbe) - } - if metric, ok := metrics["DCGM_FI_DEV_ECC_DBE_VOL_TOTAL"]; ok { - s.aggregates.eccTotal.dbe.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) - _, dbeErrors := s.aggregates.eccTotal.dbe.Get(gpuIndex).Value() - s.mb.RecordGpuDcgmEccErrorsDataPoint(now, dbeErrors, metadata.AttributeGpuErrorTypeDbe) + if p, ok := metrics["DCGM_FI_PROF_GR_ENGINE_ACTIVE"]; ok { + utilization := p.value.(float64) + s.mb.RecordGpuDcgmUtilizationDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), utilization) + } else if p, ok := metrics["DCGM_FI_DEV_GPU_UTIL"]; ok { // fallback + utilization := float64(p.value.(int64)) / 100.0 /* normalize */ + s.mb.RecordGpuDcgmUtilizationDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), utilization) + } + if p, ok := metrics["DCGM_FI_PROF_SM_ACTIVE"]; ok { + smActive := p.value.(float64) + s.mb.RecordGpuDcgmSmUtilizationDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), smActive) + } + if p, ok := metrics["DCGM_FI_PROF_SM_OCCUPANCY"]; ok { + smOccupancy := p.value.(float64) + s.mb.RecordGpuDcgmSmOccupancyDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), smOccupancy) + } + if p, ok := metrics["DCGM_FI_PROF_PIPE_TENSOR_ACTIVE"]; ok { + pipeUtil := p.value.(float64) + s.mb.RecordGpuDcgmPipeUtilizationDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), pipeUtil, metadata.AttributeGpuPipeTensor) + } + if p, ok := metrics["DCGM_FI_PROF_PIPE_FP64_ACTIVE"]; ok { + pipeUtil := p.value.(float64) + s.mb.RecordGpuDcgmPipeUtilizationDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), pipeUtil, metadata.AttributeGpuPipeFp64) + } + if p, ok := metrics["DCGM_FI_PROF_PIPE_FP32_ACTIVE"]; ok { + pipeUtil := p.value.(float64) + s.mb.RecordGpuDcgmPipeUtilizationDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), pipeUtil, metadata.AttributeGpuPipeFp32) + } + if p, ok := metrics["DCGM_FI_PROF_PIPE_FP16_ACTIVE"]; ok { + pipeUtil := p.value.(float64) + s.mb.RecordGpuDcgmPipeUtilizationDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), pipeUtil, metadata.AttributeGpuPipeFp16) + } + if p, ok := metrics["DCGM_FI_DEV_ENC_UTIL"]; ok { + encUtil := float64(p.value.(int64)) / 100.0 /* normalize */ + s.mb.RecordGpuDcgmCodecEncoderUtilizationDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), encUtil) + } + if p, ok := metrics["DCGM_FI_DEV_DEC_UTIL"]; ok { + decUtil := float64(p.value.(int64)) / 100.0 /* normalize */ + s.mb.RecordGpuDcgmCodecDecoderUtilizationDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), decUtil) + } + if p, ok := metrics["DCGM_FI_DEV_FB_FREE"]; ok { + bytesFree := 1e6 * p.value.(int64) /* MBy to By */ + s.mb.RecordGpuDcgmMemoryBytesUsedDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), bytesFree, metadata.AttributeGpuMemoryStateFree) + } + if p, ok := metrics["DCGM_FI_DEV_FB_USED"]; ok { + bytesUsed := 1e6 * p.value.(int64) /* MBy to By */ + s.mb.RecordGpuDcgmMemoryBytesUsedDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), bytesUsed, metadata.AttributeGpuMemoryStateUsed) + } + if p, ok := metrics["DCGM_FI_DEV_FB_RESERVED"]; ok { + bytesReserved := 1e6 * p.value.(int64) /* MBy to By */ + s.mb.RecordGpuDcgmMemoryBytesUsedDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), bytesReserved, metadata.AttributeGpuMemoryStateReserved) + } + if p, ok := metrics["DCGM_FI_PROF_DRAM_ACTIVE"]; ok { + memCopyUtil := p.value.(float64) + s.mb.RecordGpuDcgmMemoryBandwidthUtilizationDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), memCopyUtil) + } else if p, ok := metrics["DCGM_FI_DEV_MEM_COPY_UTIL"]; ok { // fallback + memCopyUtil := float64(p.value.(int64)) / 100.0 /* normalize */ + s.mb.RecordGpuDcgmMemoryBandwidthUtilizationDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), memCopyUtil) + } + if p, ok := metrics["DCGM_FI_PROF_PCIE_TX_BYTES"]; ok { + pcieTx := p.value.(int64) + s.mb.RecordGpuDcgmPcieIoDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), pcieTx, metadata.AttributeNetworkIoDirectionTransmit) + } + if p, ok := metrics["DCGM_FI_PROF_PCIE_RX_BYTES"]; ok { + pcieRx := p.value.(int64) + s.mb.RecordGpuDcgmPcieIoDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), pcieRx, metadata.AttributeNetworkIoDirectionReceive) + } + if p, ok := metrics["DCGM_FI_PROF_NVLINK_TX_BYTES"]; ok { + nvlinkTx := p.value.(int64) + s.mb.RecordGpuDcgmNvlinkIoDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), nvlinkTx, metadata.AttributeNetworkIoDirectionTransmit) + } + if p, ok := metrics["DCGM_FI_PROF_NVLINK_RX_BYTES"]; ok { + nvlinkRx := p.value.(int64) + s.mb.RecordGpuDcgmNvlinkIoDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), nvlinkRx, metadata.AttributeNetworkIoDirectionReceive) + } + if p, ok := metrics["DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION"]; ok { + energyUsed := float64(p.value.(int64)) / 1e3 /* mJ to J */ + s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), energyUsed) + } else if p, ok := metrics["DCGM_FI_DEV_POWER_USAGE"]; ok { // fallback + energyUsed := p.value.(float64) + s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), energyUsed) + } + if p, ok := metrics["DCGM_FI_DEV_GPU_TEMP"]; ok { + temperature := float64(p.value.(int64)) + s.mb.RecordGpuDcgmTemperatureDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), temperature) + } + if p, ok := metrics["DCGM_FI_DEV_SM_CLOCK"]; ok { + clockFreq := 1e6 * float64(p.value.(int64)) /* MHz to Hz */ + s.mb.RecordGpuDcgmClockFrequencyDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), clockFreq) + } + if p, ok := metrics["DCGM_FI_DEV_POWER_VIOLATION"]; ok { + violationTime := float64(p.value.(int64)) / 1e6 /* us to s */ + s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), violationTime, metadata.AttributeGpuClockViolationPower) + } + if p, ok := metrics["DCGM_FI_DEV_THERMAL_VIOLATION"]; ok { + violationTime := float64(p.value.(int64)) / 1e6 /* us to s */ + s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), violationTime, metadata.AttributeGpuClockViolationThermal) + } + if p, ok := metrics["DCGM_FI_DEV_SYNC_BOOST_VIOLATION"]; ok { + violationTime := float64(p.value.(int64)) / 1e6 /* us to s */ + s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), violationTime, metadata.AttributeGpuClockViolationSyncBoost) + } + if p, ok := metrics["DCGM_FI_DEV_BOARD_LIMIT_VIOLATION"]; ok { + violationTime := float64(p.value.(int64)) / 1e6 /* us to s */ + s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), violationTime, metadata.AttributeGpuClockViolationBoardLimit) + } + if p, ok := metrics["DCGM_FI_DEV_LOW_UTIL_VIOLATION"]; ok { + violationTime := float64(p.value.(int64)) / 1e6 /* us to s */ + s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), violationTime, metadata.AttributeGpuClockViolationLowUtil) + } + if p, ok := metrics["DCGM_FI_DEV_RELIABILITY_VIOLATION"]; ok { + violationTime := float64(p.value.(int64)) / 1e6 /* us to s */ + s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), violationTime, metadata.AttributeGpuClockViolationReliability) + } + if p, ok := metrics["DCGM_FI_DEV_TOTAL_APP_CLOCKS_VIOLATION"]; ok { + violationTime := float64(p.value.(int64)) / 1e6 /* us to s */ + s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), violationTime, metadata.AttributeGpuClockViolationAppClock) + } + if p, ok := metrics["DCGM_FI_DEV_TOTAL_BASE_CLOCKS_VIOLATION"]; ok { + violationTime := float64(p.value.(int64)) / 1e6 /* us to s */ + s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), violationTime, metadata.AttributeGpuClockViolationBaseClock) + } + if p, ok := metrics["DCGM_FI_DEV_ECC_SBE_VOL_TOTAL"]; ok { + sbeErrors := p.value.(int64) + s.mb.RecordGpuDcgmEccErrorsDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), sbeErrors, metadata.AttributeGpuErrorTypeSbe) + } + if p, ok := metrics["DCGM_FI_DEV_ECC_DBE_VOL_TOTAL"]; ok { + dbeErrors := p.value.(int64) + s.mb.RecordGpuDcgmEccErrorsDataPoint(pcommon.NewTimestampFromTime(time.UnixMicro(p.timestamp)), dbeErrors, metadata.AttributeGpuErrorTypeDbe) } // TODO: XID errors. - // s.mb.RecordGpuDcgmXidErrorsDataPoint(now, metric.asInt64(), xid) + // s.mb.RecordGpuDcgmXidErrorsDataPoint(now, p.value.(int64), xid) s.mb.EmitForResource(metadata.WithResource(gpuResource)) } - return s.mb.Emit(), err + return s.mb.Emit(), nil } diff --git a/receiver/dcgmreceiver/scraper_gpu_test.go b/receiver/dcgmreceiver/scraper_gpu_test.go index 55f6c1aca..8fa635e0a 100644 --- a/receiver/dcgmreceiver/scraper_gpu_test.go +++ b/receiver/dcgmreceiver/scraper_gpu_test.go @@ -58,11 +58,17 @@ func TestScrapeWithGpuPresent(t *testing.T) { } func TestScrapeWithDelayedDcgmService(t *testing.T) { + dcgmInitMutex.Lock() realDcgmInit := dcgmInit - defer func() { dcgmInit = realDcgmInit }() dcgmInit = func(args ...string) (func(), error) { return nil, fmt.Errorf("No DCGM client library *OR* No DCGM connection") } + dcgmInitMutex.Unlock() + defer func() { + dcgmInitMutex.Lock() + dcgmInit = realDcgmInit + dcgmInitMutex.Unlock() + }() var settings receiver.CreateSettings settings.Logger = zaptest.NewLogger(t) @@ -83,7 +89,9 @@ func TestScrapeWithDelayedDcgmService(t *testing.T) { assert.Equal(t, 0, metrics.MetricCount()) // Simulate DCGM becomes available + dcgmInitMutex.Lock() dcgmInit = realDcgmInit + dcgmInitMutex.Unlock() metrics, err = scraper.scrape(context.Background()) assert.NoError(t, err) diff --git a/receiver/dcgmreceiver/util.go b/receiver/dcgmreceiver/util.go index c3cf19883..174b4e77a 100644 --- a/receiver/dcgmreceiver/util.go +++ b/receiver/dcgmreceiver/util.go @@ -24,6 +24,26 @@ import ( "github.com/NVIDIA/go-dcgm/pkg/dcgm" ) +type metricTracker[V int64 | float64] interface { + Reset() + Update(int64, V) + Value() (int64, V) +} + +type typedMetricTracker struct { + i64 metricTracker[int64] + f64 metricTracker[float64] +} + +func (tmt typedMetricTracker) Reset() { + if tmt.i64 != nil { + tmt.i64.Reset() + } + if tmt.f64 != nil { + tmt.f64.Reset() + } +} + var nowUnixMicro = func() int64 { return time.Now().UnixNano() / 1e3 } // rateIntegrator converts timestamped values that represent rates into @@ -115,6 +135,29 @@ func (i *cumulativeTracker[V]) Baseline() (int64, V) { return i.baseTimestamp, i.baseline } +// gaugeTracker records gauge values as they arrive. +type gaugeTracker[V int64 | float64] struct { + lastTimestamp int64 + lastValue V // the value seen at lastTimestamp. +} + +func (i *gaugeTracker[V]) Reset() { + i.lastTimestamp = nowUnixMicro() + i.lastValue = V(0) +} + +func (i *gaugeTracker[V]) Update(ts int64, v V) { + // Drop stale points. + if ts <= i.lastTimestamp { + return + } + i.lastTimestamp, i.lastValue = ts, v +} + +func (i *gaugeTracker[V]) Value() (int64, V) { + return i.lastTimestamp, i.lastValue +} + var ( errBlankValue = fmt.Errorf("unspecified blank value") errDataNotFound = fmt.Errorf("data not found")