Skip to content

Commit

Permalink
refactor: bpfSupportedMetrics usage in NewStats()
Browse files Browse the repository at this point in the history
Remove the need to pass bpfSupportedMetrics through
n layers of function calls that lead to NewStats()
by adding a reasonable default set of bpfmetrics
that the exporter will/can then override when it's
started.

Signed-off-by: Maryam Tahhan <[email protected]>
  • Loading branch information
maryamtahhan committed Sep 9, 2024
1 parent 214961f commit 718d171
Show file tree
Hide file tree
Showing 26 changed files with 74 additions and 130 deletions.
3 changes: 0 additions & 3 deletions cmd/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/build"
"github.com/sustainable-computing-io/kepler/pkg/collector/stats"
"github.com/sustainable-computing-io/kepler/pkg/config"
"github.com/sustainable-computing-io/kepler/pkg/manager"
"github.com/sustainable-computing-io/kepler/pkg/metrics"
Expand Down Expand Up @@ -147,8 +146,6 @@ func main() {
platform.InitPowerImpl()
defer platform.StopPower()

stats.InitAvailableParamAndMetrics()

if config.EnabledGPU() {
r := accelerator.GetRegistry()
if a, err := accelerator.New(accelerator.GPU, true); err == nil {
Expand Down
13 changes: 2 additions & 11 deletions pkg/bpf/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type exporter struct {

func NewExporter() (Exporter, error) {
e := &exporter{
enabledHardwareCounters: sets.New[string](),
enabledSoftwareCounters: sets.New[string](),
enabledHardwareCounters: sets.New[string](config.BPFHwCounters...),
enabledSoftwareCounters: sets.New[string](config.BPFSwCounters...),
}
err := e.attach()
if err != nil {
Expand Down Expand Up @@ -110,7 +110,6 @@ func (e *exporter) attach() error {
if err != nil {
return fmt.Errorf("error attaching sched_switch tracepoint: %v", err)
}
e.enabledSoftwareCounters[config.CPUTime] = struct{}{}

if config.ExposeIRQCounterMetrics() {
e.irqLink, err = link.AttachTracing(link.TracingOptions{
Expand All @@ -120,9 +119,6 @@ func (e *exporter) attach() error {
if err != nil {
return fmt.Errorf("could not attach irq/softirq_entry: %w", err)
}
e.enabledSoftwareCounters[config.IRQNetTXLabel] = struct{}{}
e.enabledSoftwareCounters[config.IRQNetRXLabel] = struct{}{}
e.enabledSoftwareCounters[config.IRQBlockLabel] = struct{}{}
}

group := "writeback"
Expand All @@ -143,8 +139,6 @@ func (e *exporter) attach() error {
})
if err != nil {
klog.Warningf("failed to attach fentry/mark_page_accessed: %v. Kepler will not collect page cache read events. This will affect the DRAM power model estimation on VMs.", err)
} else if !e.enabledSoftwareCounters.Has(config.PageCacheHit) {
e.enabledSoftwareCounters[config.PageCacheHit] = struct{}{}
}

// Return early if hardware counters are not enabled
Expand All @@ -162,9 +156,6 @@ func (e *exporter) attach() error {
if err != nil {
return nil
}
e.enabledHardwareCounters[config.CPUCycle] = struct{}{}
e.enabledHardwareCounters[config.CPUInstruction] = struct{}{}
e.enabledHardwareCounters[config.CacheMiss] = struct{}{}

return nil
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/bpf/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@ func DefaultSupportedMetrics() SupportedMetrics {
}

func defaultHardwareCounters() sets.Set[string] {
return sets.New(config.CPUCycle, config.CPUInstruction, config.CacheMiss)
return sets.New(config.BPFHwCounters...)
}

func defaultSoftwareCounters() sets.Set[string] {
swCounters := sets.New(config.CPUTime, config.PageCacheHit)
if config.ExposeIRQCounterMetrics() {
swCounters.Insert(config.IRQNetTXLabel, config.IRQNetRXLabel, config.IRQBlockLabel)
}
swCounters := sets.New(config.BPFSwCounters...)
return swCounters
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/collector/metric_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Collector struct {
func NewCollector(bpfExporter bpf.Exporter) *Collector {
bpfSupportedMetrics := bpfExporter.SupportedMetrics()
c := &Collector{
NodeStats: *stats.NewNodeStats(bpfSupportedMetrics),
NodeStats: *stats.NewNodeStats(),
ContainerStats: map[string]*stats.ContainerStats{},
ProcessStats: map[uint64]*stats.ProcessStats{},
VMStats: map[string]*stats.VMStats{},
Expand All @@ -78,10 +78,9 @@ func (c *Collector) Initialize() error {
// For local estimator, there is endpoint provided, thus we should let
// model component decide whether/how to init
model.CreatePowerEstimatorModels(
stats.GetProcessFeatureNames(c.bpfSupportedMetrics),
stats.GetProcessFeatureNames(),
stats.NodeMetadataFeatureNames(),
stats.NodeMetadataFeatureValues(),
c.bpfSupportedMetrics,
)

return nil
Expand Down Expand Up @@ -163,7 +162,7 @@ func (c *Collector) updateProcessResourceUtilizationMetrics(wg *sync.WaitGroup)
resourceBpf.UpdateProcessBPFMetrics(c.bpfExporter, c.ProcessStats)
if config.EnabledGPU() {
if acc.GetRegistry().ActiveAcceleratorByType(acc.GPU) != nil {
accelerator.UpdateProcessGPUUtilizationMetrics(c.ProcessStats, c.bpfSupportedMetrics)
accelerator.UpdateProcessGPUUtilizationMetrics(c.ProcessStats)
}
}
}
Expand Down Expand Up @@ -194,7 +193,7 @@ func (c *Collector) AggregateProcessResourceUtilizationMetrics() {
if config.IsExposeVMStatsEnabled() {
if process.VMID != "" {
if _, ok := c.VMStats[process.VMID]; !ok {
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID, c.bpfSupportedMetrics)
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID)
}
c.VMStats[process.VMID].ResourceUsage[metricName].AddDeltaStat(id, delta)
foundVM[process.VMID] = true
Expand Down Expand Up @@ -279,7 +278,7 @@ func (c *Collector) AggregateProcessEnergyUtilizationMetrics() {
if config.IsExposeVMStatsEnabled() {
if process.VMID != "" {
if _, ok := c.VMStats[process.VMID]; !ok {
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID, c.bpfSupportedMetrics)
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID)
}
c.VMStats[process.VMID].EnergyUsage[metricName].AddDeltaStat(id, delta)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/collector/metric_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ var _ = Describe("Test Collector Unit", func() {
bpfExporter := bpf.NewMockExporter(bpf.DefaultSupportedMetrics())
metricCollector := newMockCollector(bpfExporter)
// The default estimator model is the ratio
bpfSupportedMetrics := bpfExporter.SupportedMetrics()
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(bpfSupportedMetrics), stats.NodeMetadataFeatureNames(), stats.NodeMetadataFeatureValues(), bpfSupportedMetrics)
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(), stats.NodeMetadataFeatureNames(), stats.NodeMetadataFeatureValues())
// update container and node metrics
metricCollector.UpdateProcessEnergyUtilizationMetrics()
metricCollector.AggregateProcessEnergyUtilizationMetrics()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"os"
"time"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/cgroup"
"github.com/sustainable-computing-io/kepler/pkg/collector/stats"
"github.com/sustainable-computing-io/kepler/pkg/config"
Expand All @@ -43,7 +42,7 @@ var (
)

// UpdateProcessGPUUtilizationMetrics reads the GPU metrics of each process using the GPU
func UpdateProcessGPUUtilizationMetrics(processStats map[uint64]*stats.ProcessStats, bpfSupportedMetrics bpf.SupportedMetrics) {
func UpdateProcessGPUUtilizationMetrics(processStats map[uint64]*stats.ProcessStats) {
if gpu := acc.GetRegistry().ActiveAcceleratorByType(acc.GPU); gpu != nil {
d := gpu.Device()
migDevices := d.DeviceInstances()
Expand All @@ -54,17 +53,17 @@ func UpdateProcessGPUUtilizationMetrics(processStats map[uint64]*stats.ProcessSt
for _, migDevice := range migDevices[_device.(dev.GPUDevice).ID] {
// device.ID is equal to migDevice.ParentID
// we add the process metrics with the parent GPU ID, so that the Ratio power model will use this data to split the GPU power among the process
addGPUUtilizationToProcessStats(d, processStats, migDevice.(dev.GPUDevice), migDevice.(dev.GPUDevice).ParentID, bpfSupportedMetrics)
addGPUUtilizationToProcessStats(d, processStats, migDevice.(dev.GPUDevice), migDevice.(dev.GPUDevice).ParentID)
}
} else {
addGPUUtilizationToProcessStats(d, processStats, _device.(dev.GPUDevice), _device.(dev.GPUDevice).ID, bpfSupportedMetrics)
addGPUUtilizationToProcessStats(d, processStats, _device.(dev.GPUDevice), _device.(dev.GPUDevice).ID)
}
}
}
lastUtilizationTimestamp = time.Now()
}

func addGPUUtilizationToProcessStats(ai dev.Device, processStats map[uint64]*stats.ProcessStats, d dev.GPUDevice, gpuID int, bpfSupportedMetrics bpf.SupportedMetrics) {
func addGPUUtilizationToProcessStats(ai dev.Device, processStats map[uint64]*stats.ProcessStats, d dev.GPUDevice, gpuID int) {
var err error
var processesUtilization map[uint32]any

Expand Down Expand Up @@ -97,7 +96,7 @@ func addGPUUtilizationToProcessStats(ai dev.Device, processStats map[uint64]*sta
}
}
}
processStats[uintPid] = stats.NewProcessStats(uintPid, uint64(0), containerID, vmID, command, bpfSupportedMetrics)
processStats[uintPid] = stats.NewProcessStats(uintPid, uint64(0), containerID, vmID, command)
}
gpuName := fmt.Sprintf("%d", gpuID) // GPU ID or Parent GPU ID for MIG slices
processStats[uintPid].ResourceUsage[config.GPUComputeUtilization].AddDeltaStat(gpuName, uint64(processUtilization.(dev.GPUProcessUtilizationSample).ComputeUtil))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func UpdateProcessBPFMetrics(bpfExporter bpf.Exporter, processStats map[uint64]*
var ok bool
var pStat *stats.ProcessStats
if pStat, ok = processStats[mapKey]; !ok {
pStat = stats.NewProcessStats(ct.Pid, ct.CgroupId, containerID, vmID, comm, bpfSupportedMetrics)
pStat = stats.NewProcessStats(ct.Pid, ct.CgroupId, containerID, vmID, comm)
processStats[mapKey] = pStat
} else if pStat.Command == "" {
pStat.Command = comm
Expand Down
3 changes: 1 addition & 2 deletions pkg/collector/stats/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ func benchmarkNtesting(b *testing.B, processNumber int) {
metricCollector.AggregateProcessResourceUtilizationMetrics()

// The default estimator model is the ratio
bpfSupportedMetrics := bpf.DefaultSupportedMetrics()
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(bpfSupportedMetrics), stats.NodeMetadataFeatureNames(), stats.NodeMetadataFeatureValues(), bpfSupportedMetrics)
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(), stats.NodeMetadataFeatureNames(), stats.NodeMetadataFeatureValues())

// update container and node metrics
b.ReportAllocs()
Expand Down
6 changes: 2 additions & 4 deletions pkg/collector/stats/container_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package stats

import (
"fmt"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
)

type ContainerStats struct {
Expand All @@ -33,9 +31,9 @@ type ContainerStats struct {
}

// NewContainerStats creates a new ContainerStats instance
func NewContainerStats(containerName, podName, podNamespace, containerID string, bpfSupportedMetrics bpf.SupportedMetrics) *ContainerStats {
func NewContainerStats(containerName, podName, podNamespace, containerID string) *ContainerStats {
c := &ContainerStats{
Stats: *NewStats(bpfSupportedMetrics),
Stats: *NewStats(),
PIDS: make(map[uint64]bool),
ContainerID: containerID,
PodName: podName,
Expand Down
3 changes: 1 addition & 2 deletions pkg/collector/stats/container_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package stats
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/config"
)

var _ = Describe("Test Container Metric", func() {

It("Test ResetDeltaValues", func() {
SetMockedCollectorMetrics()
c := NewContainerStats("containerA", "podA", "test", "containerIDA", bpf.DefaultSupportedMetrics())
c := NewContainerStats("containerA", "podA", "test", "containerIDA")
c.ResourceUsage[config.CPUCycle].SetDeltaStat(MockedSocketID, 30000)
c.ResourceUsage[config.CPUInstruction].SetDeltaStat(MockedSocketID, 30000)
c.ResourceUsage[config.CacheMiss].SetDeltaStat(MockedSocketID, 30000)
Expand Down
5 changes: 2 additions & 3 deletions pkg/collector/stats/node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package stats
import (
"fmt"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/config"
acc "github.com/sustainable-computing-io/kepler/pkg/sensors/accelerator"
"github.com/sustainable-computing-io/kepler/pkg/utils"
Expand Down Expand Up @@ -52,9 +51,9 @@ func NodeMetadataFeatureValues() []string {
return []string{NodeCPUArchitecture()}
}

func NewNodeStats(bpfSupportedMetrics bpf.SupportedMetrics) *NodeStats {
func NewNodeStats() *NodeStats {
return &NodeStats{
Stats: *NewStats(bpfSupportedMetrics),
Stats: *NewStats(),
IdleResUtilization: map[string]uint64{},
}
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/collector/stats/process_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package stats

import (
"fmt"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
)

type ProcessStats struct {
Expand All @@ -33,14 +31,14 @@ type ProcessStats struct {
}

// NewProcessStats creates a new ProcessStats instance
func NewProcessStats(pid, cGroupID uint64, containerID, vmID, command string, bpfSupportedMetrics bpf.SupportedMetrics) *ProcessStats {
func NewProcessStats(pid, cGroupID uint64, containerID, vmID, command string) *ProcessStats {
p := &ProcessStats{
PID: pid,
CGroupID: cGroupID,
ContainerID: containerID,
VMID: vmID,
Command: command,
Stats: *NewStats(bpfSupportedMetrics),
Stats: *NewStats(),
}
return p
}
Expand Down
29 changes: 17 additions & 12 deletions pkg/collector/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,31 @@ import (
"fmt"
"strings"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/collector/stats/types"
"github.com/sustainable-computing-io/kepler/pkg/config"
acc "github.com/sustainable-computing-io/kepler/pkg/sensors/accelerator"
"k8s.io/klog/v2"
)

var (
// Default metric sets. In addition, each metric set is used by the model
// package to estimate different power usage metrics.
// AvailableAbsEnergyMetrics holds a list of absolute energy metrics
AvailableAbsEnergyMetrics []string
AvailableAbsEnergyMetrics = []string{
config.AbsEnergyInCore, config.AbsEnergyInDRAM, config.AbsEnergyInUnCore, config.AbsEnergyInPkg,
config.AbsEnergyInGPU, config.AbsEnergyInOther, config.AbsEnergyInPlatform,
}
// AvailableDynEnergyMetrics holds a list of dynamic energy metrics
AvailableDynEnergyMetrics []string
AvailableDynEnergyMetrics = []string{
config.DynEnergyInCore, config.DynEnergyInDRAM, config.DynEnergyInUnCore, config.DynEnergyInPkg,
config.DynEnergyInGPU, config.DynEnergyInOther, config.DynEnergyInPlatform,
}
// AvailableIdleEnergyMetrics holds a list of idle energy metrics
AvailableIdleEnergyMetrics []string
AvailableIdleEnergyMetrics = []string{
config.IdleEnergyInCore, config.IdleEnergyInDRAM, config.IdleEnergyInUnCore, config.IdleEnergyInPkg,
config.IdleEnergyInGPU, config.IdleEnergyInOther, config.IdleEnergyInPlatform,
}
AvailableBPFMetrics = append(config.BPFHwCounters, config.BPFSwCounters...)
)

type Stats struct {
Expand All @@ -42,7 +53,7 @@ type Stats struct {
}

// NewStats creates a new Stats instance
func NewStats(bpfSupportedMetrics bpf.SupportedMetrics) *Stats {
func NewStats() *Stats {
m := &Stats{
ResourceUsage: make(map[string]types.UInt64StatCollection),
EnergyUsage: make(map[string]types.UInt64StatCollection),
Expand All @@ -58,13 +69,7 @@ func NewStats(bpfSupportedMetrics bpf.SupportedMetrics) *Stats {
}

// initialize the resource utilization metrics in the map
resMetrics := []string{}
for metricName := range bpfSupportedMetrics.HardwareCounters {
resMetrics = append(resMetrics, metricName)
}
for metricName := range bpfSupportedMetrics.SoftwareCounters {
resMetrics = append(resMetrics, metricName)
}
resMetrics := append([]string{}, AvailableBPFMetrics...)
for _, metricName := range resMetrics {
m.ResourceUsage[metricName] = types.NewUInt64StatCollection()
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/collector/stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@ package stats
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/config"
)

var _ = Describe("Stats", func() {
It("Test InitAvailableParamAndMetrics", func() {
config.GetConfig()
config.SetEnabledHardwareCounterMetrics(false)
supportedMetrics := bpf.DefaultSupportedMetrics()
InitAvailableParamAndMetrics()
exp := []string{}
Expect(len(GetProcessFeatureNames(supportedMetrics)) >= len(exp)).To(BeTrue())
Expect(len(GetProcessFeatureNames()) >= len(exp)).To(BeTrue())
})
})
Loading

0 comments on commit 718d171

Please sign in to comment.