Skip to content

Commit

Permalink
refactor: bpfSupportedMetrics usage in NewStats()
Browse files Browse the repository at this point in the history
Remove the need to pass bpfSupportedMetrics through
n layers of function calls that lead to NewStats()
by adding a reasonable default set of bpfmetrics
that the exporter will/can then override when it's
started.

Signed-off-by: Maryam Tahhan <[email protected]>
  • Loading branch information
maryamtahhan committed Oct 17, 2024
1 parent 3c5beae commit 7ea948d
Show file tree
Hide file tree
Showing 25 changed files with 64 additions and 96 deletions.
13 changes: 2 additions & 11 deletions pkg/bpf/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type exporter struct {

func NewExporter() (Exporter, error) {
e := &exporter{
enabledHardwareCounters: sets.New[string](),
enabledSoftwareCounters: sets.New[string](),
enabledHardwareCounters: sets.New[string](config.BPFHwCounters()...),
enabledSoftwareCounters: sets.New[string](config.BPFSwCounters()...),
}
err := e.attach()
if err != nil {
Expand Down Expand Up @@ -110,7 +110,6 @@ func (e *exporter) attach() error {
if err != nil {
return fmt.Errorf("error attaching sched_switch tracepoint: %v", err)
}
e.enabledSoftwareCounters[config.CPUTime] = struct{}{}

if config.ExposeIRQCounterMetrics() {
e.irqLink, err = link.AttachTracing(link.TracingOptions{
Expand All @@ -120,9 +119,6 @@ func (e *exporter) attach() error {
if err != nil {
return fmt.Errorf("could not attach irq/softirq_entry: %w", err)
}
e.enabledSoftwareCounters[config.IRQNetTXLabel] = struct{}{}
e.enabledSoftwareCounters[config.IRQNetRXLabel] = struct{}{}
e.enabledSoftwareCounters[config.IRQBlockLabel] = struct{}{}
}

group := "writeback"
Expand All @@ -143,8 +139,6 @@ func (e *exporter) attach() error {
})
if err != nil {
klog.Warningf("failed to attach fentry/mark_page_accessed: %v. Kepler will not collect page cache read events. This will affect the DRAM power model estimation on VMs.", err)
} else if !e.enabledSoftwareCounters.Has(config.PageCacheHit) {
e.enabledSoftwareCounters[config.PageCacheHit] = struct{}{}
}

// Return early if hardware counters are not enabled
Expand All @@ -162,9 +156,6 @@ func (e *exporter) attach() error {
if err != nil {
return nil
}
e.enabledHardwareCounters[config.CPUCycle] = struct{}{}
e.enabledHardwareCounters[config.CPUInstruction] = struct{}{}
e.enabledHardwareCounters[config.CacheMiss] = struct{}{}

return nil
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/bpf/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@ func DefaultSupportedMetrics() SupportedMetrics {
}

func defaultHardwareCounters() sets.Set[string] {
return sets.New(config.CPUCycle, config.CPUInstruction, config.CacheMiss)
return sets.New(config.BPFHwCounters()...)
}

func defaultSoftwareCounters() sets.Set[string] {
swCounters := sets.New(config.CPUTime, config.PageCacheHit)
if config.ExposeIRQCounterMetrics() {
swCounters.Insert(config.IRQNetTXLabel, config.IRQNetRXLabel, config.IRQBlockLabel)
}
swCounters := sets.New(config.BPFSwCounters()...)
return swCounters
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/collector/metric_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Collector struct {
func NewCollector(bpfExporter bpf.Exporter) *Collector {
bpfSupportedMetrics := bpfExporter.SupportedMetrics()
c := &Collector{
NodeStats: *stats.NewNodeStats(bpfSupportedMetrics),
NodeStats: *stats.NewNodeStats(),
ContainerStats: map[string]*stats.ContainerStats{},
ProcessStats: map[uint64]*stats.ProcessStats{},
VMStats: map[string]*stats.VMStats{},
Expand All @@ -78,8 +78,7 @@ func (c *Collector) Initialize() error {
// For local estimator, there is endpoint provided, thus we should let
// model component decide whether/how to init
model.CreatePowerEstimatorModels(
stats.GetProcessFeatureNames(c.bpfSupportedMetrics),
c.bpfSupportedMetrics,
stats.GetProcessFeatureNames(),
)

return nil
Expand Down Expand Up @@ -161,7 +160,7 @@ func (c *Collector) updateProcessResourceUtilizationMetrics(wg *sync.WaitGroup)
resourceBpf.UpdateProcessBPFMetrics(c.bpfExporter, c.ProcessStats)
if config.EnabledGPU() {
if acc.GetRegistry().ActiveAcceleratorByType(acc.GPU) != nil {
accelerator.UpdateProcessGPUUtilizationMetrics(c.ProcessStats, c.bpfSupportedMetrics)
accelerator.UpdateProcessGPUUtilizationMetrics(c.ProcessStats)
}
}
}
Expand Down Expand Up @@ -192,7 +191,7 @@ func (c *Collector) AggregateProcessResourceUtilizationMetrics() {
if config.IsExposeVMStatsEnabled() {
if process.VMID != "" {
if _, ok := c.VMStats[process.VMID]; !ok {
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID, c.bpfSupportedMetrics)
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID)
}
c.VMStats[process.VMID].ResourceUsage[metricName].AddDeltaStat(id, delta)
foundVM[process.VMID] = true
Expand Down Expand Up @@ -277,7 +276,7 @@ func (c *Collector) AggregateProcessEnergyUtilizationMetrics() {
if config.IsExposeVMStatsEnabled() {
if process.VMID != "" {
if _, ok := c.VMStats[process.VMID]; !ok {
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID, c.bpfSupportedMetrics)
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID)
}
c.VMStats[process.VMID].EnergyUsage[metricName].AddDeltaStat(id, delta)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/collector/metric_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ var _ = Describe("Test Collector Unit", func() {
bpfExporter := bpf.NewMockExporter(bpf.DefaultSupportedMetrics())
metricCollector := newMockCollector(bpfExporter)
// The default estimator model is the ratio
bpfSupportedMetrics := bpfExporter.SupportedMetrics()
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(bpfSupportedMetrics), bpfSupportedMetrics)
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames())
// update container and node metrics
metricCollector.UpdateProcessEnergyUtilizationMetrics()
metricCollector.AggregateProcessEnergyUtilizationMetrics()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"os"
"time"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/cgroup"
"github.com/sustainable-computing-io/kepler/pkg/collector/stats"
"github.com/sustainable-computing-io/kepler/pkg/config"
Expand All @@ -43,7 +42,7 @@ var (
)

// UpdateProcessGPUUtilizationMetrics reads the GPU metrics of each process using the GPU
func UpdateProcessGPUUtilizationMetrics(processStats map[uint64]*stats.ProcessStats, bpfSupportedMetrics bpf.SupportedMetrics) {
func UpdateProcessGPUUtilizationMetrics(processStats map[uint64]*stats.ProcessStats) {
if gpu := acc.GetRegistry().ActiveAcceleratorByType(acc.GPU); gpu != nil {
d := gpu.Device()
migDevices := d.DeviceInstances()
Expand All @@ -54,17 +53,17 @@ func UpdateProcessGPUUtilizationMetrics(processStats map[uint64]*stats.ProcessSt
for _, migDevice := range migDevices[_device.(dev.GPUDevice).ID] {
// device.ID is equal to migDevice.ParentID
// we add the process metrics with the parent GPU ID, so that the Ratio power model will use this data to split the GPU power among the process
addGPUUtilizationToProcessStats(d, processStats, migDevice.(dev.GPUDevice), migDevice.(dev.GPUDevice).ParentID, bpfSupportedMetrics)
addGPUUtilizationToProcessStats(d, processStats, migDevice.(dev.GPUDevice), migDevice.(dev.GPUDevice).ParentID)
}
} else {
addGPUUtilizationToProcessStats(d, processStats, _device.(dev.GPUDevice), _device.(dev.GPUDevice).ID, bpfSupportedMetrics)
addGPUUtilizationToProcessStats(d, processStats, _device.(dev.GPUDevice), _device.(dev.GPUDevice).ID)
}
}
}
lastUtilizationTimestamp = time.Now()
}

func addGPUUtilizationToProcessStats(ai dev.Device, processStats map[uint64]*stats.ProcessStats, d dev.GPUDevice, gpuID int, bpfSupportedMetrics bpf.SupportedMetrics) {
func addGPUUtilizationToProcessStats(ai dev.Device, processStats map[uint64]*stats.ProcessStats, d dev.GPUDevice, gpuID int) {
var err error
var processesUtilization map[uint32]any

Expand Down Expand Up @@ -97,7 +96,7 @@ func addGPUUtilizationToProcessStats(ai dev.Device, processStats map[uint64]*sta
}
}
}
processStats[uintPid] = stats.NewProcessStats(uintPid, uint64(0), containerID, vmID, command, bpfSupportedMetrics)
processStats[uintPid] = stats.NewProcessStats(uintPid, uint64(0), containerID, vmID, command)
}
gpuName := fmt.Sprintf("%d", gpuID) // GPU ID or Parent GPU ID for MIG slices
processStats[uintPid].ResourceUsage[config.GPUComputeUtilization].AddDeltaStat(gpuName, uint64(processUtilization.(dev.GPUProcessUtilizationSample).ComputeUtil))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func UpdateProcessBPFMetrics(bpfExporter bpf.Exporter, processStats map[uint64]*
var ok bool
var pStat *stats.ProcessStats
if pStat, ok = processStats[mapKey]; !ok {
pStat = stats.NewProcessStats(mapKey, ct.CgroupId, containerID, vmID, process, bpfSupportedMetrics)
pStat = stats.NewProcessStats(mapKey, ct.CgroupId, containerID, vmID, process)
processStats[mapKey] = pStat
} else if pStat.Command == "" {
pStat.Command = comm
Expand Down
3 changes: 1 addition & 2 deletions pkg/collector/stats/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ func benchmarkNtesting(b *testing.B, processNumber int) {
metricCollector.AggregateProcessResourceUtilizationMetrics()

// The default estimator model is the ratio
bpfSupportedMetrics := bpf.DefaultSupportedMetrics()
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(bpfSupportedMetrics), bpfSupportedMetrics)
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames())

// update container and node metrics
b.ReportAllocs()
Expand Down
6 changes: 2 additions & 4 deletions pkg/collector/stats/container_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package stats

import (
"fmt"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
)

type ContainerStats struct {
Expand All @@ -33,9 +31,9 @@ type ContainerStats struct {
}

// NewContainerStats creates a new ContainerStats instance
func NewContainerStats(containerName, podName, podNamespace, containerID string, bpfSupportedMetrics bpf.SupportedMetrics) *ContainerStats {
func NewContainerStats(containerName, podName, podNamespace, containerID string) *ContainerStats {
c := &ContainerStats{
Stats: *NewStats(bpfSupportedMetrics),
Stats: *NewStats(),
PIDS: make(map[uint64]bool),
ContainerID: containerID,
PodName: podName,
Expand Down
3 changes: 1 addition & 2 deletions pkg/collector/stats/container_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package stats
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/config"
)

var _ = Describe("Test Container Metric", func() {

It("Test ResetDeltaValues", func() {
SetMockedCollectorMetrics()
c := NewContainerStats("containerA", "podA", "test", "containerIDA", bpf.DefaultSupportedMetrics())
c := NewContainerStats("containerA", "podA", "test", "containerIDA")
c.ResourceUsage[config.CPUCycle].SetDeltaStat(MockedSocketID, 30000)
c.ResourceUsage[config.CPUInstruction].SetDeltaStat(MockedSocketID, 30000)
c.ResourceUsage[config.CacheMiss].SetDeltaStat(MockedSocketID, 30000)
Expand Down
5 changes: 2 additions & 3 deletions pkg/collector/stats/node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package stats
import (
"fmt"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/config"
"github.com/sustainable-computing-io/kepler/pkg/node"
acc "github.com/sustainable-computing-io/kepler/pkg/sensors/accelerator"
Expand All @@ -36,9 +35,9 @@ type NodeStats struct {
nodeInfo node.Node
}

func NewNodeStats(bpfSupportedMetrics bpf.SupportedMetrics) *NodeStats {
func NewNodeStats() *NodeStats {
return &NodeStats{
Stats: *NewStats(bpfSupportedMetrics),
Stats: *NewStats(),
IdleResUtilization: map[string]uint64{},
nodeInfo: node.NewNodeInfo(),
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/collector/stats/process_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package stats

import (
"fmt"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
)

type ProcessStats struct {
Expand All @@ -33,14 +31,14 @@ type ProcessStats struct {
}

// NewProcessStats creates a new ProcessStats instance
func NewProcessStats(pid, cGroupID uint64, containerID, vmID, command string, bpfSupportedMetrics bpf.SupportedMetrics) *ProcessStats {
func NewProcessStats(pid, cGroupID uint64, containerID, vmID, command string) *ProcessStats {
p := &ProcessStats{
PID: pid,
CGroupID: cGroupID,
ContainerID: containerID,
VMID: vmID,
Command: command,
Stats: *NewStats(bpfSupportedMetrics),
Stats: *NewStats(),
}
return p
}
Expand Down
21 changes: 10 additions & 11 deletions pkg/collector/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,12 @@ import (
"fmt"
"strings"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/collector/stats/types"
"github.com/sustainable-computing-io/kepler/pkg/config"
acc "github.com/sustainable-computing-io/kepler/pkg/sensors/accelerator"
"k8s.io/klog/v2"
)

var defaultBPFMetrics = []string{
config.CPUCycle, config.CPURefCycle, config.CPUInstruction, config.CacheMiss, config.CPUTime,
config.PageCacheHit, config.IRQNetTXLabel, config.IRQNetRXLabel, config.IRQBlockLabel,
}

// metricSets stores different sets of metrics for energy and resource usage.
type metricSets struct {
absEnergyMetrics []string
Expand All @@ -40,7 +34,6 @@ type metricSets struct {
bpfMetrics []string
}

// Stats stores resource and energy usage statistics.
type Stats struct {
ResourceUsage map[string]types.UInt64StatCollection
EnergyUsage map[string]types.UInt64StatCollection
Expand All @@ -62,12 +55,12 @@ func newMetricSets() *metricSets {
config.IdleEnergyInCore, config.IdleEnergyInDRAM, config.IdleEnergyInUnCore, config.IdleEnergyInPkg,
config.IdleEnergyInGPU, config.IdleEnergyInOther, config.IdleEnergyInPlatform,
},
bpfMetrics: defaultBPFMetrics,
bpfMetrics: AvailableBPFMetrics(),
}
}

// NewStats creates a new Stats instance
func NewStats(bpfSupportedMetrics bpf.SupportedMetrics) *Stats {
func NewStats() *Stats {
stats := &Stats{
ResourceUsage: make(map[string]types.UInt64StatCollection),
EnergyUsage: make(map[string]types.UInt64StatCollection),
Expand All @@ -82,8 +75,9 @@ func NewStats(bpfSupportedMetrics bpf.SupportedMetrics) *Stats {
stats.EnergyUsage[metricName] = types.NewUInt64StatCollection()
}

// Initialize the resource utilization metrics in the map.
for _, metricName := range stats.BPFMetrics() {
// initialize the resource utilization metrics in the map
resMetrics := append([]string{}, AvailableBPFMetrics()...)
for _, metricName := range resMetrics {
stats.ResourceUsage[metricName] = types.NewUInt64StatCollection()
}

Expand Down Expand Up @@ -286,3 +280,8 @@ func (s *Stats) IdleEnergyMetrics() []string {
func (s *Stats) BPFMetrics() []string {
return s.availableMetrics.bpfMetrics
}

func AvailableBPFMetrics() []string {
metrics := append(config.BPFHwCounters(), config.BPFSwCounters()...)
return metrics
}
4 changes: 1 addition & 3 deletions pkg/collector/stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@ package stats
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/config"
)

var _ = Describe("Stats", func() {
It("Test InitAvailableParamAndMetrics", func() {
config.GetConfig()
config.SetEnabledHardwareCounterMetrics(false)
supportedMetrics := bpf.DefaultSupportedMetrics()
exp := []string{}
Expect(len(GetProcessFeatureNames(supportedMetrics)) >= len(exp)).To(BeTrue())
Expect(len(GetProcessFeatureNames()) >= len(exp)).To(BeTrue())
})
})
5 changes: 2 additions & 3 deletions pkg/collector/stats/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package stats
import (
"strconv"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/config"
acc "github.com/sustainable-computing-io/kepler/pkg/sensors/accelerator"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -54,7 +53,7 @@ func createMockedProcessMetric(idx int) *ProcessStats {
vmID := "vm" + strconv.Itoa(idx)
command := "command" + strconv.Itoa(idx)
uintPid := uint64(idx)
processMetrics := NewProcessStats(uintPid, uintPid, containerID, vmID, command, bpf.DefaultSupportedMetrics())
processMetrics := NewProcessStats(uintPid, uintPid, containerID, vmID, command)
// counter - attacher package
processMetrics.ResourceUsage[config.CPUCycle].SetDeltaStat(MockedSocketID, 30000)
processMetrics.ResourceUsage[config.CPUInstruction].SetDeltaStat(MockedSocketID, 30000)
Expand All @@ -66,7 +65,7 @@ func createMockedProcessMetric(idx int) *ProcessStats {

// CreateMockedNodeStats creates a node metric with power consumption and add the process resource utilization
func CreateMockedNodeStats() NodeStats {
nodeMetrics := NewNodeStats(bpf.DefaultSupportedMetrics())
nodeMetrics := NewNodeStats()
// add power metrics
// add first values to be the idle power
nodeMetrics.EnergyUsage[config.AbsEnergyInPkg].SetDeltaStat(MockedSocketID, 5000) // mili joules
Expand Down
Loading

0 comments on commit 7ea948d

Please sign in to comment.