Skip to content

Commit

Permalink
refactor: bpfSupportedMetrics usage in NewStats()
Browse files Browse the repository at this point in the history
Remove the need to pass bpfSupportedMetrics through
n layers of function calls that lead to NewStats()
by adding a reasonable default set of bpfmetrics
that the exporter will/can then override when it's
started.

Signed-off-by: Maryam Tahhan <[email protected]>
  • Loading branch information
maryamtahhan committed Jul 31, 2024
1 parent 9d09059 commit 862315a
Show file tree
Hide file tree
Showing 23 changed files with 79 additions and 101 deletions.
3 changes: 0 additions & 3 deletions cmd/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"time"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/collector/stats"
"github.com/sustainable-computing-io/kepler/pkg/config"
"github.com/sustainable-computing-io/kepler/pkg/manager"
"github.com/sustainable-computing-io/kepler/pkg/metrics"
Expand Down Expand Up @@ -133,8 +132,6 @@ func main() {
platform.InitPowerImpl()
defer platform.StopPower()

stats.InitAvailableParamAndMetrics()

if config.EnabledGPU {
r := accelerator.GetRegistry()
if a, err := accelerator.New(accelerator.GPU, true); err == nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/bpf/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
"github.com/jaypipes/ghw"
"github.com/sustainable-computing-io/kepler/pkg/collector/stats"
"github.com/sustainable-computing-io/kepler/pkg/config"
"golang.org/x/exp/maps"
"golang.org/x/sys/unix"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -75,6 +77,7 @@ func NewExporter() (Exporter, error) {
if err != nil {
e.Detach()
}
e.registerBPFStats()
return e, err
}

Expand All @@ -85,6 +88,11 @@ func (e *exporter) SupportedMetrics() SupportedMetrics {
}
}

func (e *exporter) registerBPFStats() {
counters := append(append([]string{}, maps.Keys(e.enabledHardwareCounters)...), maps.Keys(e.enabledSoftwareCounters)...)
stats.RegisterBPFStats(counters)
}

func (e *exporter) attach() error {
// Remove resource limits for kernels <5.11.
if err := rlimit.RemoveMemlock(); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/bpf/test_utils.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package bpf

import (
"github.com/sustainable-computing-io/kepler/pkg/collector/stats"
"github.com/sustainable-computing-io/kepler/pkg/config"
"golang.org/x/exp/maps"
"k8s.io/apimachinery/pkg/util/sets"
)

Expand Down Expand Up @@ -47,6 +49,11 @@ func (m *mockExporter) SupportedMetrics() SupportedMetrics {
}
}

func (m *mockExporter) RegisterBPFStats() {
counters := append(append([]string{}, maps.Keys(m.hardwareCounters)...), maps.Keys(m.hardwareCounters)...)
stats.RegisterBPFStats(counters)
}

func (m *mockExporter) Detach() {}

func (m *mockExporter) CollectProcesses() (ProcessMetricsCollection, error) {
Expand Down
10 changes: 5 additions & 5 deletions pkg/collector/metric_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Collector struct {
func NewCollector(bpfExporter bpf.Exporter) *Collector {
bpfSupportedMetrics := bpfExporter.SupportedMetrics()
c := &Collector{
NodeStats: *stats.NewNodeStats(bpfSupportedMetrics),
NodeStats: *stats.NewNodeStats(),
ContainerStats: map[string]*stats.ContainerStats{},
ProcessStats: map[uint64]*stats.ProcessStats{},
VMStats: map[string]*stats.VMStats{},
Expand All @@ -78,7 +78,7 @@ func (c *Collector) Initialize() error {
// For local estimator, there is endpoint provided, thus we should let
// model component decide whether/how to init
model.CreatePowerEstimatorModels(
stats.GetProcessFeatureNames(c.bpfSupportedMetrics),
stats.GetProcessFeatureNames(),
stats.NodeMetadataFeatureNames,
stats.NodeMetadataFeatureValues,
c.bpfSupportedMetrics,
Expand Down Expand Up @@ -163,7 +163,7 @@ func (c *Collector) updateProcessResourceUtilizationMetrics(wg *sync.WaitGroup)
resourceBpf.UpdateProcessBPFMetrics(c.bpfExporter, c.ProcessStats)
if config.EnabledGPU {
if acc.GetRegistry().ActiveAcceleratorByType(acc.GPU) != nil {
accelerator.UpdateProcessGPUUtilizationMetrics(c.ProcessStats, c.bpfSupportedMetrics)
accelerator.UpdateProcessGPUUtilizationMetrics(c.ProcessStats)
}
}
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (c *Collector) AggregateProcessResourceUtilizationMetrics() {
if config.IsExposeVMStatsEnabled() {
if process.VMID != "" {
if _, ok := c.VMStats[process.VMID]; !ok {
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID, c.bpfSupportedMetrics)
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID)
}
c.VMStats[process.VMID].ResourceUsage[metricName].AddDeltaStat(id, delta)
foundVM[process.VMID] = true
Expand Down Expand Up @@ -279,7 +279,7 @@ func (c *Collector) AggregateProcessEnergyUtilizationMetrics() {
if config.IsExposeVMStatsEnabled() {
if process.VMID != "" {
if _, ok := c.VMStats[process.VMID]; !ok {
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID, c.bpfSupportedMetrics)
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID)
}
c.VMStats[process.VMID].EnergyUsage[metricName].AddDeltaStat(id, delta)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/collector/metric_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ var _ = Describe("Test Collector Unit", func() {
bpfExporter := bpf.NewMockExporter(bpf.DefaultSupportedMetrics())
metricCollector := newMockCollector(bpfExporter)
// The default estimator model is the ratio
bpfSupportedMetrics := bpfExporter.SupportedMetrics()
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(bpfSupportedMetrics), stats.NodeMetadataFeatureNames, stats.NodeMetadataFeatureValues, bpfSupportedMetrics)
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(), stats.NodeMetadataFeatureNames, stats.NodeMetadataFeatureValues, bpf.DefaultSupportedMetrics())
// update container and node metrics
metricCollector.UpdateProcessEnergyUtilizationMetrics()
metricCollector.AggregateProcessEnergyUtilizationMetrics()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"os"
"time"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/cgroup"
"github.com/sustainable-computing-io/kepler/pkg/collector/stats"
"github.com/sustainable-computing-io/kepler/pkg/config"
Expand All @@ -43,7 +42,7 @@ var (
)

// UpdateProcessGPUUtilizationMetrics reads the GPU metrics of each process using the GPU
func UpdateProcessGPUUtilizationMetrics(processStats map[uint64]*stats.ProcessStats, bpfSupportedMetrics bpf.SupportedMetrics) {
func UpdateProcessGPUUtilizationMetrics(processStats map[uint64]*stats.ProcessStats) {
if gpu := acc.GetRegistry().ActiveAcceleratorByType(acc.GPU); gpu != nil {
d := gpu.Device()
migDevices := d.DeviceInstances()
Expand All @@ -54,17 +53,17 @@ func UpdateProcessGPUUtilizationMetrics(processStats map[uint64]*stats.ProcessSt
for _, migDevice := range migDevices[_device.(dev.GPUDevice).ID] {
// device.ID is equal to migDevice.ParentID
// we add the process metrics with the parent GPU ID, so that the Ratio power model will use this data to split the GPU power among the process
addGPUUtilizationToProcessStats(d, processStats, migDevice.(dev.GPUDevice), migDevice.(dev.GPUDevice).ParentID, bpfSupportedMetrics)
addGPUUtilizationToProcessStats(d, processStats, migDevice.(dev.GPUDevice), migDevice.(dev.GPUDevice).ParentID)
}
} else {
addGPUUtilizationToProcessStats(d, processStats, _device.(dev.GPUDevice), _device.(dev.GPUDevice).ID, bpfSupportedMetrics)
addGPUUtilizationToProcessStats(d, processStats, _device.(dev.GPUDevice), _device.(dev.GPUDevice).ID)
}
}
}
lastUtilizationTimestamp = time.Now()
}

func addGPUUtilizationToProcessStats(ai dev.Device, processStats map[uint64]*stats.ProcessStats, d dev.GPUDevice, gpuID int, bpfSupportedMetrics bpf.SupportedMetrics) {
func addGPUUtilizationToProcessStats(ai dev.Device, processStats map[uint64]*stats.ProcessStats, d dev.GPUDevice, gpuID int) {
var err error
var processesUtilization map[uint32]any

Expand Down Expand Up @@ -97,7 +96,7 @@ func addGPUUtilizationToProcessStats(ai dev.Device, processStats map[uint64]*sta
}
}
}
processStats[uintPid] = stats.NewProcessStats(uintPid, uint64(0), containerID, vmID, command, bpfSupportedMetrics)
processStats[uintPid] = stats.NewProcessStats(uintPid, uint64(0), containerID, vmID, command)
}
gpuName := fmt.Sprintf("%d", gpuID) // GPU ID or Parent GPU ID for MIG slices
processStats[uintPid].ResourceUsage[config.GPUComputeUtilization].AddDeltaStat(gpuName, uint64(processUtilization.(dev.GPUProcessUtilizationSample).ComputeUtil))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func UpdateProcessBPFMetrics(bpfExporter bpf.Exporter, processStats map[uint64]*
var ok bool
var pStat *stats.ProcessStats
if pStat, ok = processStats[mapKey]; !ok {
pStat = stats.NewProcessStats(ct.Pid, ct.CGroupID, containerID, vmID, processComm, bpfSupportedMetrics)
pStat = stats.NewProcessStats(ct.Pid, ct.CGroupID, containerID, vmID, processComm)
processStats[mapKey] = pStat
} else if pStat.Command == "" {
pStat.Command = processComm
Expand Down
3 changes: 1 addition & 2 deletions pkg/collector/stats/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ func benchmarkNtesting(b *testing.B, processNumber int) {
metricCollector.AggregateProcessResourceUtilizationMetrics()

// The default estimator model is the ratio
bpfSupportedMetrics := bpf.DefaultSupportedMetrics()
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(bpfSupportedMetrics), stats.NodeMetadataFeatureNames, stats.NodeMetadataFeatureValues, bpfSupportedMetrics)
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(), stats.NodeMetadataFeatureNames, stats.NodeMetadataFeatureValues, bpf.DefaultSupportedMetrics())

// update container and node metrics
b.ReportAllocs()
Expand Down
6 changes: 2 additions & 4 deletions pkg/collector/stats/container_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package stats

import (
"fmt"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
)

type ContainerStats struct {
Expand All @@ -33,9 +31,9 @@ type ContainerStats struct {
}

// NewContainerStats creates a new ContainerStats instance
func NewContainerStats(containerName, podName, podNamespace, containerID string, bpfSupportedMetrics bpf.SupportedMetrics) *ContainerStats {
func NewContainerStats(containerName, podName, podNamespace, containerID string) *ContainerStats {
c := &ContainerStats{
Stats: *NewStats(bpfSupportedMetrics),
Stats: *NewStats(),
PIDS: make(map[uint64]bool),
ContainerID: containerID,
PodName: podName,
Expand Down
3 changes: 1 addition & 2 deletions pkg/collector/stats/container_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package stats
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/config"
)

var _ = Describe("Test Container Metric", func() {

It("Test ResetDeltaValues", func() {
SetMockedCollectorMetrics()
c := NewContainerStats("containerA", "podA", "test", "containerIDA", bpf.DefaultSupportedMetrics())
c := NewContainerStats("containerA", "podA", "test", "containerIDA")
c.ResourceUsage[config.CPUCycle].SetDeltaStat(MockedSocketID, 30000)
c.ResourceUsage[config.CPUInstruction].SetDeltaStat(MockedSocketID, 30000)
c.ResourceUsage[config.CacheMiss].SetDeltaStat(MockedSocketID, 30000)
Expand Down
5 changes: 2 additions & 3 deletions pkg/collector/stats/node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package stats
import (
"fmt"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/config"
acc "github.com/sustainable-computing-io/kepler/pkg/sensors/accelerator"
"github.com/sustainable-computing-io/kepler/pkg/utils"
Expand All @@ -43,9 +42,9 @@ type NodeStats struct {
IdleResUtilization map[string]uint64
}

func NewNodeStats(bpfSupportedMetrics bpf.SupportedMetrics) *NodeStats {
func NewNodeStats() *NodeStats {
return &NodeStats{
Stats: *NewStats(bpfSupportedMetrics),
Stats: *NewStats(),
IdleResUtilization: map[string]uint64{},
}
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/collector/stats/process_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package stats

import (
"fmt"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
)

type ProcessStats struct {
Expand All @@ -33,14 +31,14 @@ type ProcessStats struct {
}

// NewProcessStats creates a new ProcessStats instance
func NewProcessStats(pid, cGroupID uint64, containerID, vmID, command string, bpfSupportedMetrics bpf.SupportedMetrics) *ProcessStats {
func NewProcessStats(pid, cGroupID uint64, containerID, vmID, command string) *ProcessStats {
p := &ProcessStats{
PID: pid,
CGroupID: cGroupID,
ContainerID: containerID,
VMID: vmID,
Command: command,
Stats: *NewStats(bpfSupportedMetrics),
Stats: *NewStats(),
}
return p
}
Expand Down
32 changes: 21 additions & 11 deletions pkg/collector/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,35 @@ import (
"fmt"
"strings"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/collector/stats/types"
"github.com/sustainable-computing-io/kepler/pkg/config"
acc "github.com/sustainable-computing-io/kepler/pkg/sensors/accelerator"
"k8s.io/klog/v2"
)

var (
// Default metric sets. In addition, each metric set is used by the model
// package to estimate different power usage metrics.
// AvailableAbsEnergyMetrics holds a list of absolute energy metrics
AvailableAbsEnergyMetrics []string
AvailableAbsEnergyMetrics = []string{
config.AbsEnergyInCore, config.AbsEnergyInDRAM, config.AbsEnergyInUnCore, config.AbsEnergyInPkg,
config.AbsEnergyInGPU, config.AbsEnergyInOther, config.AbsEnergyInPlatform,
}
// AvailableDynEnergyMetrics holds a list of dynamic energy metrics
AvailableDynEnergyMetrics []string
AvailableDynEnergyMetrics = []string{
config.DynEnergyInCore, config.DynEnergyInDRAM, config.DynEnergyInUnCore, config.DynEnergyInPkg,
config.DynEnergyInGPU, config.DynEnergyInOther, config.DynEnergyInPlatform,
}
// AvailableIdleEnergyMetrics holds a list of idle energy metrics
AvailableIdleEnergyMetrics []string
AvailableIdleEnergyMetrics = []string{
config.IdleEnergyInCore, config.IdleEnergyInDRAM, config.IdleEnergyInUnCore, config.IdleEnergyInPkg,
config.IdleEnergyInGPU, config.IdleEnergyInOther, config.IdleEnergyInPlatform,
}
// AvailableBPFMetrics holds a list of reasonable default bpf metrics
AvailableBPFMetrics = []string{
config.CPUCycle, config.CPURefCycle, config.CPUInstruction, config.CacheMiss, config.CPUTime,
config.PageCacheHit, config.IRQNetTXLabel, config.IRQNetRXLabel, config.IRQBlockLabel,
}
)

type Stats struct {
Expand All @@ -42,7 +57,7 @@ type Stats struct {
}

// NewStats creates a new Stats instance
func NewStats(bpfSupportedMetrics bpf.SupportedMetrics) *Stats {
func NewStats() *Stats {
m := &Stats{
ResourceUsage: make(map[string]types.UInt64StatCollection),
EnergyUsage: make(map[string]types.UInt64StatCollection),
Expand All @@ -59,12 +74,7 @@ func NewStats(bpfSupportedMetrics bpf.SupportedMetrics) *Stats {

// initialize the resource utilization metrics in the map
resMetrics := []string{}
for metricName := range bpfSupportedMetrics.HardwareCounters {
resMetrics = append(resMetrics, metricName)
}
for metricName := range bpfSupportedMetrics.SoftwareCounters {
resMetrics = append(resMetrics, metricName)
}
resMetrics = append(resMetrics, AvailableBPFMetrics...)
for _, metricName := range resMetrics {
m.ResourceUsage[metricName] = types.NewUInt64StatCollection()
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/collector/stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ package stats
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/config"
)

var _ = Describe("Stats", func() {
It("Test InitAvailableParamAndMetrics", func() {
config.ExposeHardwareCounterMetrics = false
supportedMetrics := bpf.DefaultSupportedMetrics()
InitAvailableParamAndMetrics()
exp := []string{}
Expect(len(GetProcessFeatureNames(supportedMetrics)) >= len(exp)).To(BeTrue())
Expect(len(GetProcessFeatureNames()) >= len(exp)).To(BeTrue())
})
})
Loading

0 comments on commit 862315a

Please sign in to comment.