Skip to content

Commit

Permalink
feat: add support for live job statistics, like sstat (#355)
Browse files Browse the repository at this point in the history
* add code to support live job stats retrieval
* switch linking to libslurmfull
   - will make a few things easier, without so many function
  reimplementations
   - is required to implement getting live-performance stats of jobs in the
  first place
  - nothing changes from a user perspective

* fix [user|system|total]_cpu_time not being calculated correctly
* add tests for live job statistics
* xcollections: add to_dict method
* separate Job and Step statistics
* core.job: rename cpu_time to elapsed_cpu_time
* db.job: return 1 for cpus() when the job is still pending
* stats: fix logic how collection/step stats are added up
* setup: enhanced search for slurm lib
* stats: make sure cpu time values are int
* update CHANGELOG
  • Loading branch information
tazend authored Dec 17, 2024
1 parent e60e506 commit aefb458
Show file tree
Hide file tree
Showing 18 changed files with 729 additions and 230 deletions.
55 changes: 55 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,67 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased on the [24.5.x](https://github.com/PySlurm/pyslurm/tree/24.5.x) branch

### Added

- New Classes to interact with Database Associations (WIP)
- `pyslurm.db.Association`
- `pyslurm.db.Associations`
- New Classes to interact with Database QoS (WIP)
- `pyslurm.db.QualityOfService`
- `pyslurm.db.QualitiesOfService`
- Added `stats` attribute to both `pyslurm.Job`, `pyslurm.Jobs` and
`pyslurm.db.Jobs`
- Added `pids` attribute to `pyslurm.Job` which contains Process-IDs of the Job
organized by node-name
- Added `load_stats` method to `pyslurm.Job` and `pyslurm.Jobs` classes.
Together with the `stats` and `pids` attributes mentioned above, it is now
possible to fetch live statistics (like sstat)
- Switch to link with `libslurmfull.so` instead of `libslurm.so`<br>
This change really has no impact from a user perspective. Everything will
keep working the same, except that Slurms more internal library
`libslurmfull.so` is linked with (which is located alongside the plugins
inside the `slurm` directory, which itself is next to `libslurm.so`)<br>
Why the change? Because it will likely make development easier. It allows
access to more functions that might be needed in some places, without
completely having to implement them on our own. Implementing the
live-statistics feature, so basically `sstat`, is for example not possible
with `libslurm.so` <br>
You can keep providing the directory where `libslurm.so` resided as
`$SLURM_LIB_DIR` to pyslurm, and it will automatically find `libslurmfull.so`
from there.

### Fixed

- Fixed `total_cpu_time`, `system_cpu_time` and `user_cpu_time` not getting
calculated correctly for Job statistics
- Actually make sure that `avg_cpu_time`, `min_cpu_time`, `total_cpu_time`,
`system_cpu_time` and `user_cpu_time` are integers, not float.

### Changed

- Breaking: rename `cpu_time` to `elapsed_cpu_time` in `pyslurm.Job` and
`pyslurm.Jobs` classes
- Breaking: removed the following attributes from `pyslurm.db.Jobs`:<br>
* `consumed_energy`
* `disk_read`
* `disk_write`
* `page_faults`
* `resident_memory`
* `virtual_memory`
* `elapsed_cpu_time`
* `total_cpu_time`
* `user_cpu_time`
* `system_cpu_time`
- The removed attributes above are now all available within the `stats`
attribute, which is of type `pyslurm.db.JobStatistics`
- Renamed the original class of `pyslurm.db.JobStatistics` to
`pyslurm.db.JobStepStatistics`.<br>
All this class contains is really mostly applicable only to Steps, but
doesn't fully apply at the Job Level.<br>
Therefore, the new `pyslurm.db.JobStatistics` class only contains all
statistics that make sense at the Job-level.
- return `1` as a value for the `cpus` attribute in `pyslurm.db.Job` when there
is no value set from Slurm's side.

## [24.5.0](https://github.com/PySlurm/pyslurm/releases/tag/v24.5.0) - 2024-11-16

Expand Down
4 changes: 2 additions & 2 deletions pyslurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
"""
from __future__ import absolute_import

import ctypes
import os
import sys

sys.setdlopenflags(sys.getdlopenflags() | ctypes.RTLD_GLOBAL)
sys.setdlopenflags(sys.getdlopenflags() | os.RTLD_GLOBAL | os.RTLD_DEEPBIND )

# Initialize slurm api
from pyslurm.api import slurm_init, slurm_fini
Expand Down
35 changes: 26 additions & 9 deletions pyslurm/core/job/job.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pyslurm.utils cimport cstr, ctime
from pyslurm.utils.uint cimport *
from pyslurm.utils.ctime cimport time_t
from pyslurm.db.stats cimport JobStatistics
from libc.string cimport memcpy, memset
from libc.stdint cimport uint8_t, uint16_t, uint32_t, uint64_t, int64_t
from libc.stdlib cimport free
Expand Down Expand Up @@ -77,16 +78,16 @@ cdef class Jobs(MultiClusterMap):
Attributes:
memory (int):
Total amount of memory for all Jobs in this collection, in
Mebibytes
Total amount of memory requested for all Jobs in this collection,
in Mebibytes
cpus (int):
Total amount of cpus for all Jobs in this collection.
Total amount of cpus requested for all Jobs in this collection.
ntasks (int):
Total amount of tasks for all Jobs in this collection.
cpu_time (int):
Total amount of tasks requested for all Jobs in this collection.
elapsed_cpu_time (int):
Total amount of CPU-Time used by all the Jobs in the collection.
This is the result of multiplying the run_time with the amount of
cpus for each job.
cpus requested for each job.
frozen (bool):
If this is set to True and the `reload()` method is called, then
*ONLY* Jobs that already exist in this collection will be
Expand All @@ -95,13 +96,18 @@ cdef class Jobs(MultiClusterMap):
Slurm controllers memory will not be removed either.
The default is False, so old jobs will be removed, and new Jobs
will be added - basically the same behaviour as doing Jobs.load().
stats (JobStatistics):
Real-time statistics of all Jobs in this collection.
Before you can access the stats data for this, you have to call
the `load_stats` method on this collection.
"""
cdef:
job_info_msg_t *info
slurm_job_info_t tmp_info

cdef public:
frozen
JobStatistics stats


cdef class Job:
Expand All @@ -119,6 +125,14 @@ cdef class Job:
Before you can access the Steps data for a Job, you have to call
the `reload()` method of a Job instance or the `load_steps()`
method of a Jobs collection.
stats (JobStatistics):
Real-time statistics of a Job.
Before you can access the stats data for a Job, you have to call
the `load_stats` method of a Job instance or the Jobs collection.
pids (dict[str, list]):
Current Process-IDs of the Job, organized by node name.
Before you can access the pids data for a Job, you have to call
the `load_stats` method of a Job instance or the Jobs collection.
name (str):
Name of the Job
id (int):
Expand Down Expand Up @@ -362,17 +376,20 @@ cdef class Job:
Whether this Job is a cronjob.
cronjob_time (str):
The time specification for the Cronjob.
cpu_time (int):
elapsed_cpu_time (int):
Amount of CPU-Time used by the Job so far.
This is the result of multiplying the run_time with the amount of
cpus.
cpus requested.
"""
cdef:
slurm_job_info_t *ptr
dict passwd
dict groups

cdef public JobSteps steps
cdef public:
JobSteps steps
JobStatistics stats
dict pids

cdef _calc_run_time(self)

Expand Down
93 changes: 88 additions & 5 deletions pyslurm/core/job/job.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ cdef class Jobs(MultiClusterMap):

def __init__(self, jobs=None, frozen=False):
self.frozen = frozen
self.stats = JobStatistics()
super().__init__(data=jobs,
typ="Jobs",
val_type=Job,
Expand Down Expand Up @@ -161,15 +162,48 @@ cdef class Jobs(MultiClusterMap):
Pending Jobs will be ignored, since they don't have any Steps yet.
Raises:
RPCError: When retrieving the Job information for all the Steps
failed.
RPCError: When retrieving the information for all the Steps failed.
"""
cdef dict steps = JobSteps.load_all()
for job in self.values():
jid = job.id
if jid in steps:
job.steps = steps[jid]

def load_stats(self):
"""Load realtime stats for this collection of Jobs.
This function additionally fills in the `stats` attribute for all Jobs
in the collection, and also populates its own `stats` attribute.
Implicitly calls `load_steps()`.
!!! note
Pending Jobs will be ignored, since they don't have any Stats yet.
Returns:
(JobStatistics): The statistics of this job collection.
Raises:
RPCError: When retrieving the stats for all the Jobs failed.
Examples:
>>> import pyslurm
>>> jobs = pyslurm.Jobs.load()
>>> stats = jobs.load_stats()
>>>
>>> # Print the CPU Time Used
>>> print(stats.total_cpu_time)
"""
self.load_steps()
stats = JobStatistics()
for job in self.values():
job.load_stats()
stats.add(job.stats)

self.stats = stats
return self.stats

@property
def memory(self):
return xcollections.sum_property(self, Job.memory)
Expand All @@ -183,7 +217,7 @@ cdef class Jobs(MultiClusterMap):
return xcollections.sum_property(self, Job.ntasks)

@property
def cpu_time(self):
def elapsed_cpu_time(self):
return xcollections.sum_property(self, Job.cpu_time)


Expand All @@ -199,6 +233,8 @@ cdef class Job:
self.groups = {}
cstr.fmalloc(&self.ptr.cluster, LOCAL_CLUSTER)
self.steps = JobSteps()
self.stats = JobStatistics()
self.pids = {}

def _alloc_impl(self):
if not self.ptr:
Expand All @@ -225,7 +261,7 @@ cdef class Job:
!!! note
If the Job is not pending, the related Job steps will also be
loaded.
loaded. Job statistics are however not loaded automatically.
Args:
job_id (int):
Expand Down Expand Up @@ -276,6 +312,8 @@ cdef class Job:
wrap.passwd = {}
wrap.groups = {}
wrap.steps = JobSteps.__new__(JobSteps)
wrap.stats = JobStatistics()
wrap.pids = {}
memcpy(wrap.ptr, in_ptr, sizeof(slurm_job_info_t))
return wrap

Expand All @@ -297,6 +335,8 @@ cdef class Job:
"""
cdef dict out = instance_to_dict(self)
out["steps"] = self.steps.to_dict()
out["stats"] = self.stats.to_dict()
out["pids"] = self.pids
return out

def send_signal(self, signal, steps="children", hurry=False):
Expand Down Expand Up @@ -516,6 +556,49 @@ cdef class Job:
"""
verify_rpc(slurm_notify_job(self.id, msg))

def load_stats(self):
"""Load realtime statistics for a Job and its steps.
Calling this function returns the Job statistics, and additionally
populates the `stats` and `pids` attribute of the instance.
Returns:
(JobStatistics): The statistics of the job.
Raises:
RPCError: When receiving the Statistics was not successful.
Examples:
>>> import pyslurm
>>> job = pyslurm.Job.load(9999)
>>> stats = job.load_stats()
>>>
>>> # Print the CPU Time Used
>>> print(stats.total_cpu_time)
>>>
>>> # Print the Process-IDs for the whole Job, organized by hostname
>>> print(job.pids)
"""
if not self.steps:
job = Job.load(self.id)
self.steps = job.steps

all_pids = {}
for step in self.steps.values():
step.load_stats()
self.stats._sum_steps(step.stats)

for node, pids in step.pids.items():
if node not in all_pids:
all_pids[node] = []

all_pids[node].extend(pids)

self.stats.elapsed_cpu_time = self.run_time * self.cpus

self.pids = all_pids
return self.stats

def get_batch_script(self):
"""Return the content of the script for a Batch-Job.
Expand Down Expand Up @@ -1186,7 +1269,7 @@ cdef class Job:
return cstr.to_unicode(self.ptr.cronspec)

@property
def cpu_time(self):
def elapsed_cpu_time(self):
return self.cpus * self.run_time

@property
Expand Down
Loading

0 comments on commit aefb458

Please sign in to comment.