Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorporate Zeusd for CPU and DRAM monitoring in ZeusMonitor #150

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b75e892
Add ZeusdRAPLCPU class
michahn01 Jan 18, 2025
9959712
Change ZeusdRAPLCPU to make requests to Zeusd
michahn01 Jan 20, 2025
bf67e56
Change RAPLCPUs to use ZeusdRAPLCPU is ZEUSD_SOCK_PATH is set
michahn01 Jan 20, 2025
c57c02a
Add pathlib import
michahn01 Jan 22, 2025
c9440f3
Add handler to Zeusd for checking if CPU supports DRAM energy
michahn01 Jan 22, 2025
465c88e
Fix return type errors
michahn01 Jan 23, 2025
863d802
Fix API response shape
michahn01 Jan 23, 2025
9dff236
Fix errors in rapl.py
michahn01 Jan 23, 2025
f6b0dd1
Add docstrings and minor cleanups
michahn01 Jan 24, 2025
557f6c7
Add newline at end of file
michahn01 Jan 26, 2025
db2d474
Add test for supportsDramEnergy route
michahn01 Jan 26, 2025
ce89581
Add missing import
michahn01 Jan 26, 2025
5e57954
Apply linting
michahn01 Jan 26, 2025
ea2d8ee
Fix ruff issues
michahn01 Jan 27, 2025
97d9f0c
Remove incorrect assumption about cpu_energy_uj None possibility
michahn01 Jan 28, 2025
552fafa
Rename endpoint to follow consistent casing
michahn01 Jan 28, 2025
4ea715c
Rename DramResponse to DramAvailabilityResponse
michahn01 Jan 28, 2025
71189a8
Merge imports for Zeus exceptions
michahn01 Jan 28, 2025
4cbe58f
Fix incorrect path name in test
michahn01 Jan 28, 2025
eb601b8
Remove cpu_id from skip list
michahn01 Feb 2, 2025
8927f7b
Update ZeusdRAPLCPU to cache DRAM availability in constructor
michahn01 Feb 2, 2025
b928837
Update docstring violating ruff check
michahn01 Feb 2, 2025
edc250d
Fix linting
michahn01 Feb 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 97 additions & 2 deletions zeus/device/cpu/rapl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
import os
import time
import warnings
from pathlib import Path
from functools import lru_cache
from glob import glob
from multiprocessing.sharedctypes import Synchronized
from typing import Sequence

import httpx

import zeus.device.cpu.common as cpu_common
from zeus.device.cpu.common import CpuDramMeasurement
from zeus.device.exception import ZeusBaseCPUError
from zeus.device.exception import ZeusBaseCPUError, ZeusdError
from zeus.utils.logging import get_logger

logger = get_logger(name=__name__)
Expand Down Expand Up @@ -261,6 +264,78 @@ def supportsGetDramEnergyConsumption(self) -> bool:
return self.dram is not None


class ZeusdRAPLCPU(RAPLCPU):
"""A RAPLCPU that interfaces with RAPL via zeusd.

The parent RAPLCPU class requires root privileges to interface with RAPL.
ZeusdRAPLCPU (this class) overrides RAPLCPU's methods so that they instead send requests
to the Zeus daemon, which will interface with RAPL on behalf of ZeusdRAPLCPU. As a result,
ZeusdRAPLCPU does not need root privileges to monitor CPU and DRAM energy consumption.

See [here](https://ml.energy/zeus/getting_started/#system-privileges)
for details on system privileges required.
"""

def __init__(
self,
cpu_index: int,
zeusd_sock_path: str = "/var/run/zeusd.sock",
) -> None:
"""Initialize the Intel CPU with a specified index."""
self.cpu_index = cpu_index
jaywonchung marked this conversation as resolved.
Show resolved Hide resolved

self._client = httpx.Client(transport=httpx.HTTPTransport(uds=zeusd_sock_path))
self._url_prefix = f"http://zeusd/cpu/{cpu_index}"

self.dram_available = self._supportsGetDramEnergyConsumption()

def _supportsGetDramEnergyConsumption(self) -> bool:
"""Calls zeusd to return if the specified CPU supports DRAM energy monitoring."""
resp = self._client.get(
self._url_prefix + "/supports_dram_energy",
)
if resp.status_code != 200:
raise ZeusdError(
f"Failed to get whether DRAM energy is supported: {resp.text}"
)
data = resp.json()
dram_available = data.get("dram_available")
if dram_available is None:
raise ZeusdError("Failed to get whether DRAM energy is supported.")
return dram_available

def getTotalEnergyConsumption(self) -> CpuDramMeasurement:
"""Returns the total energy consumption of the specified powerzone. Units: mJ."""
resp = self._client.post(
self._url_prefix + "/get_index_energy",
json={
"cpu": True,
"dram": True,
},
)
if resp.status_code != 200:
raise ZeusdError(f"Failed to get total energy consumption: {resp.text}")

data = resp.json()
cpu_mj = data["cpu_energy_uj"] / 1000

dram_mj = None
dram_uj = data.get("dram_energy_uj")
if dram_uj is None:
if self.dram_available:
raise ZeusdError(
"DRAM energy should be available but no measurement was found"
)
else:
dram_mj = dram_uj / 1000

return CpuDramMeasurement(cpu_mj=cpu_mj, dram_mj=dram_mj)

def supportsGetDramEnergyConsumption(self) -> bool:
"""Returns True if the specified CPU powerzone supports retrieving the subpackage energy consumption."""
return self.dram_available


class RAPLCPUs(cpu_common.CPUs):
"""RAPL CPU Manager object, containing individual RAPLCPU objects, abstracting RAPL calls and handling related exceptions."""

Expand All @@ -281,13 +356,33 @@ def _init_cpus(self) -> None:
"""Initialize all Intel CPUs."""
self._cpus = []

cpu_indices = []

def sort_key(dir):
return int(dir.split(":")[1])

for dir in sorted(glob(f"{self.rapl_dir}/intel-rapl:*"), key=sort_key):
parts = dir.split(":")
if len(parts) > 1 and parts[1].isdigit():
self._cpus.append(RAPLCPU(int(parts[1]), self.rapl_dir))
cpu_indices.append(int(parts[1]))

# If `ZEUSD_SOCK_PATH` is set, always use ZeusdRAPLCPU
if (sock_path := os.environ.get("ZEUSD_SOCK_PATH")) is not None:
if not Path(sock_path).exists():
raise ZeusdError(
f"ZEUSD_SOCK_PATH points to non-existent file: {sock_path}"
)
if not Path(sock_path).is_socket():
raise ZeusdError(f"ZEUSD_SOCK_PATH is not a socket: {sock_path}")
if not os.access(sock_path, os.W_OK):
raise ZeusdError(f"ZEUSD_SOCK_PATH is not writable: {sock_path}")
self._cpus = [
ZeusdRAPLCPU(cpu_index, sock_path) for cpu_index in cpu_indices
]
else:
self._cpus = [
RAPLCPU(cpu_index, self.rapl_dir) for cpu_index in cpu_indices
]

def __del__(self) -> None:
"""Shuts down the Intel CPU monitoring."""
Expand Down
38 changes: 30 additions & 8 deletions zeusd/src/devices/cpu/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ pub struct RaplResponse {
pub dram_energy_uj: Option<u64>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct DramAvailabilityResponse {
pub dram_available: bool,
}

/// Unified CPU response type
#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum CpuResponse {
Rapl(RaplResponse),
Dram(DramAvailabilityResponse),
}

pub trait CpuManager {
/// Get the number of CPUs available.
fn device_count() -> Result<usize, ZeusdError>;
Expand All @@ -55,7 +68,7 @@ pub trait CpuManager {

pub type CpuCommandRequest = (
CpuCommand,
Option<Sender<Result<RaplResponse, ZeusdError>>>,
Option<Sender<Result<CpuResponse, ZeusdError>>>,
Instant,
Span,
);
Expand Down Expand Up @@ -89,7 +102,7 @@ impl CpuManagementTasks {
cpu_id: usize,
command: CpuCommand,
request_start_time: Instant,
) -> Result<RaplResponse, ZeusdError> {
) -> Result<CpuResponse, ZeusdError> {
if cpu_id >= self.senders.len() {
return Err(ZeusdError::CpuNotFoundError(cpu_id));
}
Expand Down Expand Up @@ -128,6 +141,8 @@ impl CpuManagementTasks {
pub enum CpuCommand {
/// Get the CPU and DRAM energy measurement for the CPU index
GetIndexEnergy { cpu: bool, dram: bool },
/// Return if the specified CPU supports DRAM energy measurement
SupportsDramEnergy,
/// Stop the monitoring task for CPU and DRAM if they have been started.
StopMonitoring,
}
Expand Down Expand Up @@ -156,7 +171,7 @@ impl CpuCommand {
&self,
device: &mut T,
_request_arrival_time: Instant,
) -> Result<RaplResponse, ZeusdError>
) -> Result<CpuResponse, ZeusdError>
where
T: CpuManager,
{
Expand All @@ -172,17 +187,24 @@ impl CpuCommand {
} else {
None
};
Ok(RaplResponse {
// Wrap the RaplResponse in CpuResponse::Rapl
Ok(CpuResponse::Rapl(RaplResponse {
cpu_energy_uj,
dram_energy_uj,
})
}))
}
Self::SupportsDramEnergy => {
// Wrap the DramAvailabilityResponse in CpuResponse::Dram
Ok(CpuResponse::Dram(DramAvailabilityResponse {
dram_available: device.is_dram_available(),
}))
}
Self::StopMonitoring {} => {
Self::StopMonitoring => {
device.stop_monitoring();
Ok(RaplResponse {
Ok(CpuResponse::Rapl(RaplResponse {
cpu_energy_uj: Some(0),
dram_energy_uj: Some(0),
})
}))
}
}
}
Expand Down
25 changes: 24 additions & 1 deletion zeusd/src/routes/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl From<GetIndexEnergy> for CpuCommand {

#[actix_web::post("/{cpu_id}/get_index_energy")]
#[tracing::instrument(
skip(cpu_id, request, _device_tasks),
skip(request, _device_tasks),
fields(
cpu_id = %cpu_id,
cpu = %request.cpu,
Expand All @@ -48,6 +48,29 @@ async fn get_index_energy_handler(
Ok(HttpResponse::Ok().json(measurement))
}

#[actix_web::get("/{cpu_id}/supports_dram_energy")]
#[tracing::instrument(
skip(_device_tasks),
fields(
cpu_id = %cpu_id,
)
)]
async fn supports_dram_energy_handler(
cpu_id: web::Path<usize>,
_device_tasks: web::Data<CpuManagementTasks>,
) -> Result<HttpResponse, ZeusdError> {
let now = Instant::now();
tracing::info!("Received request");
let cpu_id = cpu_id.into_inner();

let answer = _device_tasks
.send_command_blocking(cpu_id, CpuCommand::SupportsDramEnergy, now)
.await?;

Ok(HttpResponse::Ok().json(answer))
}

pub fn cpu_routes(cfg: &mut web::ServiceConfig) {
cfg.service(get_index_energy_handler);
cfg.service(supports_dram_energy_handler);
}
19 changes: 19 additions & 0 deletions zeusd/tests/cpu.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod helpers;

use zeusd::devices::cpu::DramAvailabilityResponse;
use zeusd::devices::cpu::RaplResponse;
use zeusd::routes::cpu::GetIndexEnergy;

Expand Down Expand Up @@ -154,3 +155,21 @@ async fn test_invalid_requests() {
.expect("Failed to send request");
assert_eq!(resp.status(), 400);
}

#[tokio::test]
async fn test_supports_dram_energy() {
let app = TestApp::start().await;
let url = format!("http://127.0.0.1:{}/cpu/0/supports_dram_energy", app.port);
let client = reqwest::Client::new();

let resp = client
.get(url)
.send()
.await
.expect("Failed to send request");
assert_eq!(resp.status(), 200);

let dram_response: DramAvailabilityResponse = serde_json::from_str(&resp.text().await.unwrap())
.expect("Failed to deserialize response body");
assert_eq!(dram_response.dram_available, true);
}
2 changes: 1 addition & 1 deletion zeusd/tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl_zeusd_request_cpu!(GetIndexEnergy);
/// A test application that starts a server over TCP and provides helper methods
/// for sending requests and fetching what happened to the fake GPUs.
pub struct TestApp {
port: u16,
pub port: u16,
observers: Vec<TestGpuObserver>,
cpu_injectors: Vec<TestCpuInjector>,
}
Expand Down