Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: tidy use of bpfSupportedMetrics for NewStats() #1629

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions pkg/bpf/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type exporter struct {

func NewExporter() (Exporter, error) {
e := &exporter{
enabledHardwareCounters: sets.New[string](),
enabledSoftwareCounters: sets.New[string](),
enabledHardwareCounters: sets.New[string](config.BPFHwCounters()...),
enabledSoftwareCounters: sets.New[string](config.BPFSwCounters()...),
}
err := e.attach()
if err != nil {
Expand Down Expand Up @@ -110,7 +110,6 @@ func (e *exporter) attach() error {
if err != nil {
return fmt.Errorf("error attaching sched_switch tracepoint: %v", err)
}
e.enabledSoftwareCounters[config.CPUTime] = struct{}{}

if config.ExposeIRQCounterMetrics() {
e.irqLink, err = link.AttachTracing(link.TracingOptions{
Expand All @@ -120,9 +119,6 @@ func (e *exporter) attach() error {
if err != nil {
return fmt.Errorf("could not attach irq/softirq_entry: %w", err)
}
e.enabledSoftwareCounters[config.IRQNetTXLabel] = struct{}{}
e.enabledSoftwareCounters[config.IRQNetRXLabel] = struct{}{}
e.enabledSoftwareCounters[config.IRQBlockLabel] = struct{}{}
}

group := "writeback"
Expand All @@ -143,8 +139,6 @@ func (e *exporter) attach() error {
})
if err != nil {
klog.Warningf("failed to attach fentry/mark_page_accessed: %v. Kepler will not collect page cache read events. This will affect the DRAM power model estimation on VMs.", err)
} else if !e.enabledSoftwareCounters.Has(config.PageCacheHit) {
e.enabledSoftwareCounters[config.PageCacheHit] = struct{}{}
}

// Return early if hardware counters are not enabled
Expand All @@ -162,9 +156,6 @@ func (e *exporter) attach() error {
if err != nil {
return nil
}
e.enabledHardwareCounters[config.CPUCycle] = struct{}{}
e.enabledHardwareCounters[config.CPUInstruction] = struct{}{}
e.enabledHardwareCounters[config.CacheMiss] = struct{}{}

return nil
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/bpf/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@ func DefaultSupportedMetrics() SupportedMetrics {
}

func defaultHardwareCounters() sets.Set[string] {
return sets.New(config.CPUCycle, config.CPUInstruction, config.CacheMiss)
return sets.New(config.BPFHwCounters()...)
}

func defaultSoftwareCounters() sets.Set[string] {
swCounters := sets.New(config.CPUTime, config.PageCacheHit)
if config.ExposeIRQCounterMetrics() {
swCounters.Insert(config.IRQNetTXLabel, config.IRQNetRXLabel, config.IRQBlockLabel)
}
swCounters := sets.New(config.BPFSwCounters()...)
return swCounters
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/collector/metric_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Collector struct {
func NewCollector(bpfExporter bpf.Exporter) *Collector {
bpfSupportedMetrics := bpfExporter.SupportedMetrics()
c := &Collector{
NodeStats: *stats.NewNodeStats(bpfSupportedMetrics),
NodeStats: *stats.NewNodeStats(),
ContainerStats: map[string]*stats.ContainerStats{},
ProcessStats: map[uint64]*stats.ProcessStats{},
VMStats: map[string]*stats.VMStats{},
Expand All @@ -78,8 +78,7 @@ func (c *Collector) Initialize() error {
// For local estimator, there is endpoint provided, thus we should let
// model component decide whether/how to init
model.CreatePowerEstimatorModels(
stats.GetProcessFeatureNames(c.bpfSupportedMetrics),
c.bpfSupportedMetrics,
stats.GetProcessFeatureNames(),
)

return nil
Expand Down Expand Up @@ -161,7 +160,7 @@ func (c *Collector) updateProcessResourceUtilizationMetrics(wg *sync.WaitGroup)
resourceBpf.UpdateProcessBPFMetrics(c.bpfExporter, c.ProcessStats)
if config.EnabledGPU() {
if acc.GetRegistry().ActiveAcceleratorByType(acc.GPU) != nil {
accelerator.UpdateProcessGPUUtilizationMetrics(c.ProcessStats, c.bpfSupportedMetrics)
accelerator.UpdateProcessGPUUtilizationMetrics(c.ProcessStats)
}
}
}
Expand Down Expand Up @@ -192,7 +191,7 @@ func (c *Collector) AggregateProcessResourceUtilizationMetrics() {
if config.IsExposeVMStatsEnabled() {
if process.VMID != "" {
if _, ok := c.VMStats[process.VMID]; !ok {
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID, c.bpfSupportedMetrics)
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID)
}
c.VMStats[process.VMID].ResourceUsage[metricName].AddDeltaStat(id, delta)
foundVM[process.VMID] = true
Expand Down Expand Up @@ -277,7 +276,7 @@ func (c *Collector) AggregateProcessEnergyUtilizationMetrics() {
if config.IsExposeVMStatsEnabled() {
if process.VMID != "" {
if _, ok := c.VMStats[process.VMID]; !ok {
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID, c.bpfSupportedMetrics)
c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID)
}
c.VMStats[process.VMID].EnergyUsage[metricName].AddDeltaStat(id, delta)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/collector/metric_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ var _ = Describe("Test Collector Unit", func() {
bpfExporter := bpf.NewMockExporter(bpf.DefaultSupportedMetrics())
metricCollector := newMockCollector(bpfExporter)
// The default estimator model is the ratio
bpfSupportedMetrics := bpfExporter.SupportedMetrics()
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(bpfSupportedMetrics), bpfSupportedMetrics)
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames())
// update container and node metrics
metricCollector.UpdateProcessEnergyUtilizationMetrics()
metricCollector.AggregateProcessEnergyUtilizationMetrics()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"os"
"time"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/cgroup"
"github.com/sustainable-computing-io/kepler/pkg/collector/stats"
"github.com/sustainable-computing-io/kepler/pkg/config"
Expand All @@ -43,7 +42,7 @@ var (
)

// UpdateProcessGPUUtilizationMetrics reads the GPU metrics of each process using the GPU
func UpdateProcessGPUUtilizationMetrics(processStats map[uint64]*stats.ProcessStats, bpfSupportedMetrics bpf.SupportedMetrics) {
func UpdateProcessGPUUtilizationMetrics(processStats map[uint64]*stats.ProcessStats) {
if gpu := acc.GetRegistry().ActiveAcceleratorByType(acc.GPU); gpu != nil {
d := gpu.Device()
migDevices := d.DeviceInstances()
Expand All @@ -54,17 +53,17 @@ func UpdateProcessGPUUtilizationMetrics(processStats map[uint64]*stats.ProcessSt
for _, migDevice := range migDevices[_device.(dev.GPUDevice).ID] {
// device.ID is equal to migDevice.ParentID
// we add the process metrics with the parent GPU ID, so that the Ratio power model will use this data to split the GPU power among the process
addGPUUtilizationToProcessStats(d, processStats, migDevice.(dev.GPUDevice), migDevice.(dev.GPUDevice).ParentID, bpfSupportedMetrics)
addGPUUtilizationToProcessStats(d, processStats, migDevice.(dev.GPUDevice), migDevice.(dev.GPUDevice).ParentID)
}
} else {
addGPUUtilizationToProcessStats(d, processStats, _device.(dev.GPUDevice), _device.(dev.GPUDevice).ID, bpfSupportedMetrics)
addGPUUtilizationToProcessStats(d, processStats, _device.(dev.GPUDevice), _device.(dev.GPUDevice).ID)
}
}
}
lastUtilizationTimestamp = time.Now()
}

func addGPUUtilizationToProcessStats(ai dev.Device, processStats map[uint64]*stats.ProcessStats, d dev.GPUDevice, gpuID int, bpfSupportedMetrics bpf.SupportedMetrics) {
func addGPUUtilizationToProcessStats(ai dev.Device, processStats map[uint64]*stats.ProcessStats, d dev.GPUDevice, gpuID int) {
var err error
var processesUtilization map[uint32]any

Expand Down Expand Up @@ -97,7 +96,7 @@ func addGPUUtilizationToProcessStats(ai dev.Device, processStats map[uint64]*sta
}
}
}
processStats[uintPid] = stats.NewProcessStats(uintPid, uint64(0), containerID, vmID, command, bpfSupportedMetrics)
processStats[uintPid] = stats.NewProcessStats(uintPid, uint64(0), containerID, vmID, command)
}
gpuName := fmt.Sprintf("%d", gpuID) // GPU ID or Parent GPU ID for MIG slices
processStats[uintPid].ResourceUsage[config.GPUComputeUtilization].AddDeltaStat(gpuName, uint64(processUtilization.(dev.GPUProcessUtilizationSample).ComputeUtil))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func UpdateProcessBPFMetrics(bpfExporter bpf.Exporter, processStats map[uint64]*
var ok bool
var pStat *stats.ProcessStats
if pStat, ok = processStats[mapKey]; !ok {
pStat = stats.NewProcessStats(mapKey, ct.CgroupId, containerID, vmID, process, bpfSupportedMetrics)
pStat = stats.NewProcessStats(mapKey, ct.CgroupId, containerID, vmID, process)
processStats[mapKey] = pStat
} else if pStat.Command == "" {
pStat.Command = comm
Expand Down
3 changes: 1 addition & 2 deletions pkg/collector/stats/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ func benchmarkNtesting(b *testing.B, processNumber int) {
metricCollector.AggregateProcessResourceUtilizationMetrics()

// The default estimator model is the ratio
bpfSupportedMetrics := bpf.DefaultSupportedMetrics()
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(bpfSupportedMetrics), bpfSupportedMetrics)
model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames())

// update container and node metrics
b.ReportAllocs()
Expand Down
6 changes: 2 additions & 4 deletions pkg/collector/stats/container_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package stats

import (
"fmt"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
)

type ContainerStats struct {
Expand All @@ -33,9 +31,9 @@ type ContainerStats struct {
}

// NewContainerStats creates a new ContainerStats instance
func NewContainerStats(containerName, podName, podNamespace, containerID string, bpfSupportedMetrics bpf.SupportedMetrics) *ContainerStats {
func NewContainerStats(containerName, podName, podNamespace, containerID string) *ContainerStats {
c := &ContainerStats{
Stats: *NewStats(bpfSupportedMetrics),
Stats: *NewStats(),
PIDS: make(map[uint64]bool),
ContainerID: containerID,
PodName: podName,
Expand Down
3 changes: 1 addition & 2 deletions pkg/collector/stats/container_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package stats
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/config"
)

var _ = Describe("Test Container Metric", func() {

It("Test ResetDeltaValues", func() {
SetMockedCollectorMetrics()
c := NewContainerStats("containerA", "podA", "test", "containerIDA", bpf.DefaultSupportedMetrics())
c := NewContainerStats("containerA", "podA", "test", "containerIDA")
c.ResourceUsage[config.CPUCycle].SetDeltaStat(MockedSocketID, 30000)
c.ResourceUsage[config.CPUInstruction].SetDeltaStat(MockedSocketID, 30000)
c.ResourceUsage[config.CacheMiss].SetDeltaStat(MockedSocketID, 30000)
Expand Down
5 changes: 2 additions & 3 deletions pkg/collector/stats/node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package stats
import (
"fmt"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/config"
"github.com/sustainable-computing-io/kepler/pkg/node"
acc "github.com/sustainable-computing-io/kepler/pkg/sensors/accelerator"
Expand All @@ -36,9 +35,9 @@ type NodeStats struct {
nodeInfo node.Node
}

func NewNodeStats(bpfSupportedMetrics bpf.SupportedMetrics) *NodeStats {
func NewNodeStats() *NodeStats {
return &NodeStats{
Stats: *NewStats(bpfSupportedMetrics),
Stats: *NewStats(),
IdleResUtilization: map[string]uint64{},
nodeInfo: node.NewNodeInfo(),
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/collector/stats/process_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package stats

import (
"fmt"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
)

type ProcessStats struct {
Expand All @@ -33,14 +31,14 @@ type ProcessStats struct {
}

// NewProcessStats creates a new ProcessStats instance
func NewProcessStats(pid, cGroupID uint64, containerID, vmID, command string, bpfSupportedMetrics bpf.SupportedMetrics) *ProcessStats {
func NewProcessStats(pid, cGroupID uint64, containerID, vmID, command string) *ProcessStats {
p := &ProcessStats{
PID: pid,
CGroupID: cGroupID,
ContainerID: containerID,
VMID: vmID,
Command: command,
Stats: *NewStats(bpfSupportedMetrics),
Stats: *NewStats(),
}
return p
}
Expand Down
21 changes: 10 additions & 11 deletions pkg/collector/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,12 @@ import (
"fmt"
"strings"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/collector/stats/types"
"github.com/sustainable-computing-io/kepler/pkg/config"
acc "github.com/sustainable-computing-io/kepler/pkg/sensors/accelerator"
"k8s.io/klog/v2"
)

var defaultBPFMetrics = []string{
config.CPUCycle, config.CPURefCycle, config.CPUInstruction, config.CacheMiss, config.CPUTime,
config.PageCacheHit, config.IRQNetTXLabel, config.IRQNetRXLabel, config.IRQBlockLabel,
}

// metricSets stores different sets of metrics for energy and resource usage.
type metricSets struct {
absEnergyMetrics []string
Expand All @@ -40,7 +34,6 @@ type metricSets struct {
bpfMetrics []string
}

// Stats stores resource and energy usage statistics.
type Stats struct {
ResourceUsage map[string]types.UInt64StatCollection
EnergyUsage map[string]types.UInt64StatCollection
Expand All @@ -62,12 +55,12 @@ func newMetricSets() *metricSets {
config.IdleEnergyInCore, config.IdleEnergyInDRAM, config.IdleEnergyInUnCore, config.IdleEnergyInPkg,
config.IdleEnergyInGPU, config.IdleEnergyInOther, config.IdleEnergyInPlatform,
},
bpfMetrics: defaultBPFMetrics,
bpfMetrics: AvailableBPFMetrics(),
}
}

// NewStats creates a new Stats instance
func NewStats(bpfSupportedMetrics bpf.SupportedMetrics) *Stats {
func NewStats() *Stats {
stats := &Stats{
ResourceUsage: make(map[string]types.UInt64StatCollection),
EnergyUsage: make(map[string]types.UInt64StatCollection),
Expand All @@ -82,8 +75,9 @@ func NewStats(bpfSupportedMetrics bpf.SupportedMetrics) *Stats {
stats.EnergyUsage[metricName] = types.NewUInt64StatCollection()
}

// Initialize the resource utilization metrics in the map.
for _, metricName := range stats.BPFMetrics() {
// initialize the resource utilization metrics in the map
resMetrics := append([]string{}, AvailableBPFMetrics()...)
for _, metricName := range resMetrics {
stats.ResourceUsage[metricName] = types.NewUInt64StatCollection()
}

Expand Down Expand Up @@ -286,3 +280,8 @@ func (s *Stats) IdleEnergyMetrics() []string {
func (s *Stats) BPFMetrics() []string {
return s.availableMetrics.bpfMetrics
}

func AvailableBPFMetrics() []string {
metrics := append(config.BPFHwCounters(), config.BPFSwCounters()...)
return metrics
}
4 changes: 1 addition & 3 deletions pkg/collector/stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@ package stats
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/config"
)

var _ = Describe("Stats", func() {
It("Test InitAvailableParamAndMetrics", func() {
config.GetConfig()
config.SetEnabledHardwareCounterMetrics(false)
supportedMetrics := bpf.DefaultSupportedMetrics()
exp := []string{}
Expect(len(GetProcessFeatureNames(supportedMetrics)) >= len(exp)).To(BeTrue())
Expect(len(GetProcessFeatureNames()) >= len(exp)).To(BeTrue())
})
})
5 changes: 2 additions & 3 deletions pkg/collector/stats/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package stats
import (
"strconv"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
"github.com/sustainable-computing-io/kepler/pkg/config"
acc "github.com/sustainable-computing-io/kepler/pkg/sensors/accelerator"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -54,7 +53,7 @@ func createMockedProcessMetric(idx int) *ProcessStats {
vmID := "vm" + strconv.Itoa(idx)
command := "command" + strconv.Itoa(idx)
uintPid := uint64(idx)
processMetrics := NewProcessStats(uintPid, uintPid, containerID, vmID, command, bpf.DefaultSupportedMetrics())
processMetrics := NewProcessStats(uintPid, uintPid, containerID, vmID, command)
// counter - attacher package
processMetrics.ResourceUsage[config.CPUCycle].SetDeltaStat(MockedSocketID, 30000)
processMetrics.ResourceUsage[config.CPUInstruction].SetDeltaStat(MockedSocketID, 30000)
Expand All @@ -66,7 +65,7 @@ func createMockedProcessMetric(idx int) *ProcessStats {

// CreateMockedNodeStats creates a node metric with power consumption and add the process resource utilization
func CreateMockedNodeStats() NodeStats {
nodeMetrics := NewNodeStats(bpf.DefaultSupportedMetrics())
nodeMetrics := NewNodeStats()
// add power metrics
// add first values to be the idle power
nodeMetrics.EnergyUsage[config.AbsEnergyInPkg].SetDeltaStat(MockedSocketID, 5000) // mili joules
Expand Down
Loading
Loading