diff --git a/CHANGELOG.md b/CHANGELOG.md index c2a78603..19f33635 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,12 +7,67 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased on the [24.5.x](https://github.com/PySlurm/pyslurm/tree/24.5.x) branch +### Added + - New Classes to interact with Database Associations (WIP) - `pyslurm.db.Association` - `pyslurm.db.Associations` - New Classes to interact with Database QoS (WIP) - `pyslurm.db.QualityOfService` - `pyslurm.db.QualitiesOfService` +- Added `stats` attribute to both `pyslurm.Job`, `pyslurm.Jobs` and + `pyslurm.db.Jobs` +- Added `pids` attribute to `pyslurm.Job` which contains Process-IDs of the Job + organized by node-name +- Added `load_stats` method to `pyslurm.Job` and `pyslurm.Jobs` classes. + Together with the `stats` and `pids` attributes mentioned above, it is now + possible to fetch live statistics (like sstat) +- Switch to link with `libslurmfull.so` instead of `libslurm.so`<br> + This change really has no impact from a user perspective. Everything will + keep working the same, except that Slurms more internal library + `libslurmfull.so` is linked with (which is located alongside the plugins + inside the `slurm` directory, which itself is next to `libslurm.so`)<br> + Why the change? Because it will likely make development easier. It allows + access to more functions that might be needed in some places, without + completely having to implement them on our own. Implementing the + live-statistics feature, so basically `sstat`, is for example not possible + with `libslurm.so` <br> + You can keep providing the directory where `libslurm.so` resided as + `$SLURM_LIB_DIR` to pyslurm, and it will automatically find `libslurmfull.so` + from there. + +### Fixed + +- Fixed `total_cpu_time`, `system_cpu_time` and `user_cpu_time` not getting + calculated correctly for Job statistics +- Actually make sure that `avg_cpu_time`, `min_cpu_time`, `total_cpu_time`, + `system_cpu_time` and `user_cpu_time` are integers, not float. + +### Changed + +- Breaking: rename `cpu_time` to `elapsed_cpu_time` in `pyslurm.Job` and + `pyslurm.Jobs` classes +- Breaking: removed the following attributes from `pyslurm.db.Jobs`:<br> + * `consumed_energy` + * `disk_read` + * `disk_write` + * `page_faults` + * `resident_memory` + * `virtual_memory` + * `elapsed_cpu_time` + * `total_cpu_time` + * `user_cpu_time` + * `system_cpu_time` +- The removed attributes above are now all available within the `stats` + attribute, which is of type `pyslurm.db.JobStatistics` +- Renamed the original class of `pyslurm.db.JobStatistics` to + `pyslurm.db.JobStepStatistics`.<br> + All this class contains is really mostly applicable only to Steps, but + doesn't fully apply at the Job Level.<br> + Therefore, the new `pyslurm.db.JobStatistics` class only contains all + statistics that make sense at the Job-level. +- return `1` as a value for the `cpus` attribute in `pyslurm.db.Job` when there + is no value set from Slurm's side. ## [24.5.0](https://github.com/PySlurm/pyslurm/releases/tag/v24.5.0) - 2024-11-16 diff --git a/pyslurm/__init__.py b/pyslurm/__init__.py index 4d3a5101..8d61e436 100644 --- a/pyslurm/__init__.py +++ b/pyslurm/__init__.py @@ -4,10 +4,10 @@ """ from __future__ import absolute_import -import ctypes +import os import sys -sys.setdlopenflags(sys.getdlopenflags() | ctypes.RTLD_GLOBAL) +sys.setdlopenflags(sys.getdlopenflags() | os.RTLD_GLOBAL | os.RTLD_DEEPBIND ) # Initialize slurm api from pyslurm.api import slurm_init, slurm_fini diff --git a/pyslurm/core/job/job.pxd b/pyslurm/core/job/job.pxd index 15d44be9..aae445a7 100644 --- a/pyslurm/core/job/job.pxd +++ b/pyslurm/core/job/job.pxd @@ -25,6 +25,7 @@ from pyslurm.utils cimport cstr, ctime from pyslurm.utils.uint cimport * from pyslurm.utils.ctime cimport time_t +from pyslurm.db.stats cimport JobStatistics from libc.string cimport memcpy, memset from libc.stdint cimport uint8_t, uint16_t, uint32_t, uint64_t, int64_t from libc.stdlib cimport free @@ -77,16 +78,16 @@ cdef class Jobs(MultiClusterMap): Attributes: memory (int): - Total amount of memory for all Jobs in this collection, in - Mebibytes + Total amount of memory requested for all Jobs in this collection, + in Mebibytes cpus (int): - Total amount of cpus for all Jobs in this collection. + Total amount of cpus requested for all Jobs in this collection. ntasks (int): - Total amount of tasks for all Jobs in this collection. - cpu_time (int): + Total amount of tasks requested for all Jobs in this collection. + elapsed_cpu_time (int): Total amount of CPU-Time used by all the Jobs in the collection. This is the result of multiplying the run_time with the amount of - cpus for each job. + cpus requested for each job. frozen (bool): If this is set to True and the `reload()` method is called, then *ONLY* Jobs that already exist in this collection will be @@ -95,6 +96,10 @@ cdef class Jobs(MultiClusterMap): Slurm controllers memory will not be removed either. The default is False, so old jobs will be removed, and new Jobs will be added - basically the same behaviour as doing Jobs.load(). + stats (JobStatistics): + Real-time statistics of all Jobs in this collection. + Before you can access the stats data for this, you have to call + the `load_stats` method on this collection. """ cdef: job_info_msg_t *info @@ -102,6 +107,7 @@ cdef class Jobs(MultiClusterMap): cdef public: frozen + JobStatistics stats cdef class Job: @@ -119,6 +125,14 @@ cdef class Job: Before you can access the Steps data for a Job, you have to call the `reload()` method of a Job instance or the `load_steps()` method of a Jobs collection. + stats (JobStatistics): + Real-time statistics of a Job. + Before you can access the stats data for a Job, you have to call + the `load_stats` method of a Job instance or the Jobs collection. + pids (dict[str, list]): + Current Process-IDs of the Job, organized by node name. + Before you can access the pids data for a Job, you have to call + the `load_stats` method of a Job instance or the Jobs collection. name (str): Name of the Job id (int): @@ -362,17 +376,20 @@ cdef class Job: Whether this Job is a cronjob. cronjob_time (str): The time specification for the Cronjob. - cpu_time (int): + elapsed_cpu_time (int): Amount of CPU-Time used by the Job so far. This is the result of multiplying the run_time with the amount of - cpus. + cpus requested. """ cdef: slurm_job_info_t *ptr dict passwd dict groups - cdef public JobSteps steps + cdef public: + JobSteps steps + JobStatistics stats + dict pids cdef _calc_run_time(self) diff --git a/pyslurm/core/job/job.pyx b/pyslurm/core/job/job.pyx index 68821b75..fc0663d8 100644 --- a/pyslurm/core/job/job.pyx +++ b/pyslurm/core/job/job.pyx @@ -63,6 +63,7 @@ cdef class Jobs(MultiClusterMap): def __init__(self, jobs=None, frozen=False): self.frozen = frozen + self.stats = JobStatistics() super().__init__(data=jobs, typ="Jobs", val_type=Job, @@ -161,8 +162,7 @@ cdef class Jobs(MultiClusterMap): Pending Jobs will be ignored, since they don't have any Steps yet. Raises: - RPCError: When retrieving the Job information for all the Steps - failed. + RPCError: When retrieving the information for all the Steps failed. """ cdef dict steps = JobSteps.load_all() for job in self.values(): @@ -170,6 +170,40 @@ cdef class Jobs(MultiClusterMap): if jid in steps: job.steps = steps[jid] + def load_stats(self): + """Load realtime stats for this collection of Jobs. + + This function additionally fills in the `stats` attribute for all Jobs + in the collection, and also populates its own `stats` attribute. + Implicitly calls `load_steps()`. + + !!! note + + Pending Jobs will be ignored, since they don't have any Stats yet. + + Returns: + (JobStatistics): The statistics of this job collection. + + Raises: + RPCError: When retrieving the stats for all the Jobs failed. + + Examples: + >>> import pyslurm + >>> jobs = pyslurm.Jobs.load() + >>> stats = jobs.load_stats() + >>> + >>> # Print the CPU Time Used + >>> print(stats.total_cpu_time) + """ + self.load_steps() + stats = JobStatistics() + for job in self.values(): + job.load_stats() + stats.add(job.stats) + + self.stats = stats + return self.stats + @property def memory(self): return xcollections.sum_property(self, Job.memory) @@ -183,7 +217,7 @@ cdef class Jobs(MultiClusterMap): return xcollections.sum_property(self, Job.ntasks) @property - def cpu_time(self): + def elapsed_cpu_time(self): return xcollections.sum_property(self, Job.cpu_time) @@ -199,6 +233,8 @@ cdef class Job: self.groups = {} cstr.fmalloc(&self.ptr.cluster, LOCAL_CLUSTER) self.steps = JobSteps() + self.stats = JobStatistics() + self.pids = {} def _alloc_impl(self): if not self.ptr: @@ -225,7 +261,7 @@ cdef class Job: !!! note If the Job is not pending, the related Job steps will also be - loaded. + loaded. Job statistics are however not loaded automatically. Args: job_id (int): @@ -276,6 +312,8 @@ cdef class Job: wrap.passwd = {} wrap.groups = {} wrap.steps = JobSteps.__new__(JobSteps) + wrap.stats = JobStatistics() + wrap.pids = {} memcpy(wrap.ptr, in_ptr, sizeof(slurm_job_info_t)) return wrap @@ -297,6 +335,8 @@ cdef class Job: """ cdef dict out = instance_to_dict(self) out["steps"] = self.steps.to_dict() + out["stats"] = self.stats.to_dict() + out["pids"] = self.pids return out def send_signal(self, signal, steps="children", hurry=False): @@ -516,6 +556,49 @@ cdef class Job: """ verify_rpc(slurm_notify_job(self.id, msg)) + def load_stats(self): + """Load realtime statistics for a Job and its steps. + + Calling this function returns the Job statistics, and additionally + populates the `stats` and `pids` attribute of the instance. + + Returns: + (JobStatistics): The statistics of the job. + + Raises: + RPCError: When receiving the Statistics was not successful. + + Examples: + >>> import pyslurm + >>> job = pyslurm.Job.load(9999) + >>> stats = job.load_stats() + >>> + >>> # Print the CPU Time Used + >>> print(stats.total_cpu_time) + >>> + >>> # Print the Process-IDs for the whole Job, organized by hostname + >>> print(job.pids) + """ + if not self.steps: + job = Job.load(self.id) + self.steps = job.steps + + all_pids = {} + for step in self.steps.values(): + step.load_stats() + self.stats._sum_steps(step.stats) + + for node, pids in step.pids.items(): + if node not in all_pids: + all_pids[node] = [] + + all_pids[node].extend(pids) + + self.stats.elapsed_cpu_time = self.run_time * self.cpus + + self.pids = all_pids + return self.stats + def get_batch_script(self): """Return the content of the script for a Batch-Job. @@ -1186,7 +1269,7 @@ cdef class Job: return cstr.to_unicode(self.ptr.cronspec) @property - def cpu_time(self): + def elapsed_cpu_time(self): return self.cpus * self.run_time @property diff --git a/pyslurm/core/job/stats.pxd b/pyslurm/core/job/stats.pxd new file mode 100644 index 00000000..bb397ebe --- /dev/null +++ b/pyslurm/core/job/stats.pxd @@ -0,0 +1,152 @@ +######################################################################### +# job/stats.pxd - interface to retrieve slurm job realtime stats +######################################################################### +# Copyright (C) 2024 Toni Harzendorf <toni.harzendorf@gmail.com> +# +######################################################################### +# Note: Some struct definitions have been taken directly from various parts of +# the Slurm source code, and just translated to Cython-Syntax. The structs are +# appropriately annotated with the respective Copyright notices, and a link to +# the source-code. + +# Slurm is licensed under the GNU General Public License. For the full text of +# Slurm's License, please see here: pyslurm/slurm/SLURM_LICENSE + +######################################################################### +# This file is part of PySlurm +# +# PySlurm is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. + +# PySlurm is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with PySlurm; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# cython: c_string_type=unicode, c_string_encoding=default +# cython: language_level=3 + +from libc.stdint cimport uint8_t, uint16_t, uint32_t, uint64_t +from libc.string cimport memcpy, memset +from posix.unistd cimport pid_t +from pyslurm cimport slurm +from pyslurm.slurm cimport ( + slurm_job_step_stat, + slurmdb_step_rec_t, + slurmdb_stats_t, + slurmdb_free_slurmdb_stats_members, + job_step_stat_t, + job_step_stat_response_msg_t, + slurm_job_step_stat_response_msg_free, + jobacctinfo_t, + list_t, + xfree, + try_xmalloc, +) +from pyslurm.utils cimport cstr, ctime +from pyslurm.utils.uint cimport * +from pyslurm.utils.ctime cimport time_t +from pyslurm.db.util cimport SlurmList, SlurmListItem +from pyslurm.db.stats cimport JobStepStatistics +from pyslurm.core.job.step cimport JobStep + +cdef load_single(JobStep step) + +# The real definition for this is too long, including too many other types that +# we don't have directly access to. +cdef extern from *: + """ + typedef struct stepd_step_rec_s stepd_step_rec_t; \ + """ + ctypedef struct stepd_step_rec_t + + +# https://github.com/SchedMD/slurm/blob/slurm-24-05-3-1/src/interfaces/jobacct_gather.h#L75 +# Copyright (C) 2003 The Regents of the University of California. +# Copyright (C) 2005 Hewlett-Packard Development Company, L.P. +ctypedef struct jobacct_id_t: + uint32_t taskid + uint32_t nodeid + stepd_step_rec_t *step + + +# https://github.com/SchedMD/slurm/blob/slurm-24-05-3-1/src/interfaces/jobacct_gather.h#L81 +# Copyright (C) 2003 The Regents of the University of California. +# Copyright (C) 2005 Hewlett-Packard Development Company, L.P. +ctypedef struct jobacctinfo: + pid_t pid + uint64_t sys_cpu_sec + uint32_t sys_cpu_usec + uint64_t user_cpu_sec + uint32_t user_cpu_usec + uint32_t act_cpufreq + slurm.acct_gather_energy_t energy + double last_total_cputime + double this_sampled_cputime + uint32_t current_weighted_freq + uint32_t current_weighted_power + uint32_t tres_count + uint32_t *tres_ids + list_t *tres_list + uint64_t *tres_usage_in_max + uint64_t *tres_usage_in_max_nodeid + uint64_t *tres_usage_in_max_taskid + uint64_t *tres_usage_in_min + uint64_t *tres_usage_in_min_nodeid + uint64_t *tres_usage_in_min_taskid + uint64_t *tres_usage_in_tot + uint64_t *tres_usage_out_max + uint64_t *tres_usage_out_max_nodeid + uint64_t *tres_usage_out_max_taskid + uint64_t *tres_usage_out_min + uint64_t *tres_usage_out_min_nodeid + uint64_t *tres_usage_out_min_taskid + uint64_t *tres_usage_out_tot + + jobacct_id_t id + int dataset_id + + double last_tres_usage_in_tot + double last_tres_usage_out_tot + time_t cur_time + time_t last_time + + +# https://github.com/SchedMD/slurm/blob/slurm-24-05-3-1/src/slurmctld/locks.h#L97 +# Copyright (C) 2002 The Regents of the University of California. +ctypedef enum lock_level_t: + NO_LOCK + READ_LOCK + WRITE_LOCK + + +# https://github.com/SchedMD/slurm/blob/slurm-24-05-3-1/src/common/assoc_mgr.h#L71 +# Copyright (C) 2004-2007 The Regents of the University of California. +# Copyright (C) 2008 Lawrence Livermore National Security. +ctypedef struct assoc_mgr_lock_t: + lock_level_t assoc + lock_level_t file + lock_level_t qos + lock_level_t res + lock_level_t tres + lock_level_t user + lock_level_t wckey + + +cdef extern jobacctinfo_t *jobacctinfo_create(jobacct_id_t *jobacct_id) +cdef extern void jobacctinfo_destroy(void *object) +cdef extern void jobacctinfo_aggregate(jobacctinfo_t *dest, jobacctinfo_t *src) +cdef extern void jobacctinfo_2_stats(slurmdb_stats_t *stats, jobacctinfo_t *jobacct) + +cdef extern list_t* assoc_mgr_tres_list +cdef extern void assoc_mgr_lock(assoc_mgr_lock_t *locks) +cdef extern void assoc_mgr_unlock(assoc_mgr_lock_t *locks) +cdef extern int assoc_mgr_post_tres_list(list_t *new_list) + +cdef extern char *slurmdb_ave_tres_usage(char *tres_string, int tasks); diff --git a/pyslurm/core/job/stats.pyx b/pyslurm/core/job/stats.pyx new file mode 100644 index 00000000..39bc0b36 --- /dev/null +++ b/pyslurm/core/job/stats.pyx @@ -0,0 +1,137 @@ +######################################################################### +# job/stats.pyx - interface to retrieve slurm job realtime stats +######################################################################### +# Copyright (C) 2024 Toni Harzendorf <toni.harzendorf@gmail.com> +# Copyright (C) 2008 Lawrence Livermore National Security. +# +######################################################################### +# The main logic for this file's code was taken from: +# https://github.com/SchedMD/slurm/blob/42a05b1bb4a7719944ee26adc6bc5d73e2d36823/src/sstat/sstat.c#L109 +# +# The code has been modified a bit and translated to Cython syntax. Slurm's +# sstat was originally written by Morris Jette <jette1@llnl.gov> +# +# Slurm is licensed under the GNU General Public License. For the full text of +# Slurm's License, please see here: pyslurm/slurm/SLURM_LICENSE +# +######################################################################### +# This file is part of PySlurm +# +# PySlurm is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. + +# PySlurm is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with PySlurm; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# cython: c_string_type=unicode, c_string_encoding=default +# cython: language_level=3 + +from pyslurm.core.error import verify_rpc +from pyslurm.utils.helpers import nodelist_to_range_str + + +cdef load_single(JobStep step): + cdef: + # jobacctinfo_t is the opaque data type provided in slurm.h + # jobacctinfo is the actual (partial) re-definition of the jobacctinfo + # type + # + # This is because we need to have access to jobacct.tres_list, + # otherwise we cannot call jobacctinfo_aggregate. Also we want to have + # the values of user_cpu_sec and sys_cpu_sec. + jobacctinfo_t *total_jobacct = NULL + jobacctinfo *stat_jobacct = NULL + + job_step_stat_t *step_stat = NULL + job_step_stat_response_msg_t *stat_resp = NULL + assoc_mgr_lock_t locks + slurmdb_step_rec_t db_step + SlurmList stats_list + SlurmListItem stat_list_ptr + char *usage_tmp = NULL + int rc = slurm.SLURM_SUCCESS + int ntasks = 0 + list nodes = [] + + rc = slurm_job_step_stat(&step.ptr.step_id, NULL, + step.ptr.start_protocol_ver, &stat_resp) + if rc != slurm.SLURM_SUCCESS: + slurm_job_step_stat_response_msg_free(stat_resp) + if rc == slurm.ESLURM_INVALID_JOB_ID: + return None + else: + verify_rpc(rc) + + memset(&db_step, 0, sizeof(slurmdb_step_rec_t)) + memset(&db_step.stats, 0, sizeof(slurmdb_stats_t)) + + stats_list = SlurmList.wrap(stat_resp.stats_list, owned=False) + for stat_list_ptr in stats_list: + step_stat = <job_step_stat_t*>stat_list_ptr.data + # Casting jobacctinfo_t to jobacctinfo... hoping this is sane to do + stat_jobacct = <jobacctinfo*>step_stat.jobacct + + if not step_stat.step_pids or not step_stat.step_pids.node_name: + continue + + node = cstr.to_unicode(step_stat.step_pids.node_name) + if step_stat.step_pids.pid_cnt > 0: + for i in range(step_stat.step_pids.pid_cnt): + if node not in step.pids: + step.pids[node] = [] + + step.pids[node].append(step_stat.step_pids.pid[i]) + + nodes.append(node) + ntasks += step_stat.num_tasks + if step_stat.jobacct: + if not assoc_mgr_tres_list and stat_jobacct.tres_list: + locks.tres = WRITE_LOCK + assoc_mgr_lock(&locks) + assoc_mgr_post_tres_list(stat_jobacct.tres_list) + assoc_mgr_unlock(&locks) + stat_jobacct.tres_list = NULL + + if not total_jobacct: + total_jobacct = jobacctinfo_create(NULL) + + jobacctinfo_aggregate(total_jobacct, step_stat.jobacct) + + db_step.user_cpu_sec += stat_jobacct.user_cpu_sec + db_step.user_cpu_usec += stat_jobacct.user_cpu_usec + db_step.sys_cpu_sec += stat_jobacct.sys_cpu_sec + db_step.sys_cpu_usec += stat_jobacct.sys_cpu_usec + + if total_jobacct: + jobacctinfo_2_stats(&db_step.stats, total_jobacct) + jobacctinfo_destroy(total_jobacct) + + if ntasks: + db_step.stats.act_cpufreq /= <double>ntasks + + usage_tmp = db_step.stats.tres_usage_in_ave + db_step.stats.tres_usage_in_ave = slurmdb_ave_tres_usage(usage_tmp, ntasks) + xfree(usage_tmp) + + usage_tmp = db_step.stats.tres_usage_out_ave + db_step.stats.tres_usage_out_ave = slurmdb_ave_tres_usage(usage_tmp, ntasks) + xfree(usage_tmp) + + step.stats = JobStepStatistics.from_ptr( + &db_step, + nodes, + step.alloc_cpus if step.alloc_cpus else 0, + step.run_time if step.run_time else 0, + is_live=True, + ) + + slurm_job_step_stat_response_msg_free(stat_resp) + slurmdb_free_slurmdb_stats_members(&db_step.stats) diff --git a/pyslurm/core/job/step.pxd b/pyslurm/core/job/step.pxd index 92d7fcad..7bea049b 100644 --- a/pyslurm/core/job/step.pxd +++ b/pyslurm/core/job/step.pxd @@ -47,6 +47,8 @@ from pyslurm.utils cimport cstr, ctime from pyslurm.utils.uint cimport * from pyslurm.utils.ctime cimport time_t from pyslurm.core.job.task_dist cimport TaskDistribution +from pyslurm.db.stats cimport JobStepStatistics +from pyslurm.core.job cimport stats cdef class JobSteps(dict): @@ -64,7 +66,7 @@ cdef class JobSteps(dict): @staticmethod cdef JobSteps _load_single(Job job) cdef dict _load_data(self, uint32_t job_id, int flags) - + cdef class JobStep: """A Slurm Jobstep @@ -80,6 +82,14 @@ cdef class JobStep: Time limit in Minutes for this step. Attributes: + stats (JobStepStatistics): + Real-time statistics of a Step. + Before you can access the stats data for a Step, you have to call + the `load_stats` method of a Step instance or the Jobs collection. + pids (dict[str, list]): + Current Process-IDs of the Step, organized by node name. Before you + can access the pids data, you have to call the `load_stats` method + of a Srep instance or the Jobs collection. id (Union[str, int]): The id for this step. job_id (int): @@ -136,5 +146,9 @@ cdef class JobStep: job_step_info_t *ptr step_update_request_msg_t *umsg + cdef public: + JobStepStatistics stats + dict pids + @staticmethod cdef JobStep from_ptr(job_step_info_t *in_ptr) diff --git a/pyslurm/core/job/step.pyx b/pyslurm/core/job/step.pyx index 812d4f1d..35047566 100644 --- a/pyslurm/core/job/step.pyx +++ b/pyslurm/core/job/step.pyx @@ -157,6 +157,8 @@ cdef class JobStep: self._alloc_impl() self.job_id = job_id.id if isinstance(job_id, Job) else job_id self.id = step_id + self.stats = JobStepStatistics() + self.pids = {} cstr.fmalloc(&self.ptr.cluster, LOCAL_CLUSTER) # Initialize attributes, if any were provided @@ -251,9 +253,38 @@ cdef class JobStep: cdef JobStep from_ptr(job_step_info_t *in_ptr): cdef JobStep wrap = JobStep.__new__(JobStep) wrap._alloc_info() + wrap.stats = JobStepStatistics() + wrap.pids = {} memcpy(wrap.ptr, in_ptr, sizeof(job_step_info_t)) return wrap + def load_stats(self): + """Load realtime stats for this Step. + + Calling this function returns the live statistics of the step, and + additionally populates the `stats` and `pids` attribute of the + instance. + + Returns: + (JobStepStatistics): The statistics of the Step. + + Raises: + RPCError: When retrieving the stats for the Step failed. + + Examples: + >>> import pyslurm + >>> step = pyslurm.JobStep.load(9999, 1) + >>> stats = step.load_stats() + >>> + >>> # Print the CPU Time Used + >>> print(stats.total_cpu_time) + >>> + >>> # Print the Process-IDs for the Step, organized by hostname + >>> print(step.pids) + """ + stats.load_single(self) + return self.stats + def send_signal(self, signal): """Send a signal to a running Job step. @@ -338,6 +369,8 @@ cdef class JobStep: if dist: out["distribution"] = dist.to_dict() + out["stats"] = self.stats.to_dict() + out["pids"] = self.pids return out @property @@ -434,11 +467,11 @@ cdef class JobStep: @property def alloc_cpus(self): - return u32_parse(self.ptr.num_cpus) + return u32_parse(self.ptr.num_cpus, on_noval=1) @property def ntasks(self): - return u32_parse(self.ptr.num_tasks) + return u32_parse(self.ptr.num_tasks, on_noval=1) @property def distribution(self): diff --git a/pyslurm/db/__init__.py b/pyslurm/db/__init__.py index acd36a40..f563aff0 100644 --- a/pyslurm/db/__init__.py +++ b/pyslurm/db/__init__.py @@ -21,7 +21,7 @@ from .connection import Connection from .step import JobStep, JobSteps -from .stats import JobStatistics +from .stats import JobStatistics, JobStepStatistics from .job import ( Job, Jobs, diff --git a/pyslurm/db/job.pxd b/pyslurm/db/job.pxd index e0848e0a..0622b57c 100644 --- a/pyslurm/db/job.pxd +++ b/pyslurm/db/job.pxd @@ -166,46 +166,17 @@ cdef class Jobs(MultiClusterMap): Jobs to initialize this collection with. Attributes: - consumed_energy (int): - Total amount of energy consumed, in joules. - disk_read (int): - Total amount of bytes read. - disk_write (int): - Total amount of bytes written. - page_faults (int): - Total amount of page faults. - resident_memory (int): - Total Resident Set Size (RSS) used in bytes. - virtual_memory (int): - Total Virtual Memory Size (VSZ) used in bytes. - elapsed_cpu_time (int): - Total amount of time used (Elapsed time * cpu count) in seconds. - This is not the real CPU-Efficiency, but rather the total amount - of cpu-time the CPUs were occupied for. - total_cpu_time (int): - Sum of `user_cpu_time` and `system_cpu_time`, in seconds - user_cpu_time (int): - Total amount of Time spent in user space, in seconds - system_cpu_time (int): - Total amount of Time spent in kernel space, in seconds + stats (pyslurm.db.JobStatistics): + Utilization statistics of this Job Collection cpus (int): - Total amount of cpus. + Total amount of cpus requested. nodes (int): - Total amount of nodes. + Total amount of nodes requested. memory (int): Total amount of requested memory in Mebibytes. """ cdef public: - consumed_energy - disk_read - disk_write - page_faults - resident_memory - virtual_memory - elapsed_cpu_time - total_cpu_time - user_cpu_time - system_cpu_time + stats cpus nodes memory @@ -256,7 +227,7 @@ cdef class Job: association_id (int): ID of the Association this job runs in. block_id (str): - Name of the block used (for BlueGene Systems) + Name of the block used (for BlueGene Systems) cluster (str): Cluster this Job belongs to constraints (str): diff --git a/pyslurm/db/job.pyx b/pyslurm/db/job.pyx index 0b081a0e..66bb8afa 100644 --- a/pyslurm/db/job.pyx +++ b/pyslurm/db/job.pyx @@ -29,10 +29,6 @@ from typing import Any from pyslurm.utils.uint import * from pyslurm.settings import LOCAL_CLUSTER from pyslurm import xcollections -from pyslurm.db.stats import ( - reset_stats_for_job_collection, - add_stats_to_job_collection, -) from pyslurm.utils.ctime import ( date_to_timestamp, timestr_to_mins, @@ -71,7 +67,7 @@ cdef class JobFilter: self.ptr = <slurmdb_job_cond_t*>try_xmalloc(sizeof(slurmdb_job_cond_t)) if not self.ptr: raise MemoryError("xmalloc failed for slurmdb_job_cond_t") - + self.ptr.db_flags = slurm.SLURMDB_JOB_FLAG_NOTSET self.ptr.flags |= slurm.JOBCOND_FLAG_NO_TRUNC @@ -119,15 +115,15 @@ cdef class JobFilter: def _parse_state(self): # TODO: implement return None - + def _create(self): self._alloc() cdef: slurmdb_job_cond_t *ptr = self.ptr slurm_selected_step_t *selected_step - ptr.usage_start = date_to_timestamp(self.start_time) - ptr.usage_end = date_to_timestamp(self.end_time) + ptr.usage_start = date_to_timestamp(self.start_time) + ptr.usage_end = date_to_timestamp(self.end_time) ptr.cpus_min = u32(self.cpus, on_noval=0) ptr.cpus_max = u32(self.max_cpus, on_noval=0) ptr.nodes_min = u32(self.nodes, on_noval=0) @@ -153,7 +149,7 @@ cdef class JobFilter: if self.truncate_time: ptr.flags &= ~slurm.JOBCOND_FLAG_NO_TRUNC - + if self.ids: # These are only allowed by the slurmdbd when specific jobs are # requested. @@ -283,34 +279,38 @@ cdef class Jobs(MultiClusterMap): job = Job.from_ptr(<slurmdb_job_rec_t*>job_ptr.data) job.qos_data = qos_data job._create_steps() - job.stats = JobStatistics.from_job_steps(job) + job.stats = JobStatistics.from_steps(job.steps) + + elapsed = job.elapsed_time if job.elapsed_time else 0 + cpus = job.cpus if job.cpus else 1 + job.stats.elapsed_cpu_time = elapsed * cpus cluster = job.cluster if cluster not in out.data: out.data[cluster] = {} out[cluster][job.id] = job - add_stats_to_job_collection(out, job.stats) - out.cpus += job.cpus - out.nodes += job.num_nodes - out.memory += job.memory + out._add_stats(job) return out def _reset_stats(self): - reset_stats_for_job_collection(self) + self.stats = JobStatistics() self.cpus = 0 self.nodes = 0 self.memory = 0 + def _add_stats(self, job): + self.stats.add(job.stats) + self.cpus += job.cpus + self.nodes += job.num_nodes + self.memory += job.memory + def calc_stats(self): """(Re)Calculate Statistics for the Job Collection.""" self._reset_stats() for job in self.values(): - add_stats_to_job_collection(self, job.stats) - self.cpus += job.cpus - self.nodes += job.num_nodes - self.memory += job.memory + self._add_stats(job) @staticmethod def modify(db_filter, Job changes, db_connection=None): @@ -353,7 +353,7 @@ cdef class Jobs(MultiClusterMap): In its simplest form, you can do something like this: >>> import pyslurm - >>> + >>> >>> db_filter = pyslurm.db.JobFilter(ids=[9999]) >>> changes = pyslurm.db.Job(comment="A comment for the job") >>> modified_jobs = pyslurm.db.Jobs.modify(db_filter, changes) @@ -366,7 +366,7 @@ cdef class Jobs(MultiClusterMap): connection object: >>> import pyslurm - >>> + >>> >>> db_conn = pyslurm.db.Connection.open() >>> db_filter = pyslurm.db.JobFilter(ids=[9999]) >>> changes = pyslurm.db.Job(comment="A comment for the job") @@ -433,7 +433,7 @@ cdef class Jobs(MultiClusterMap): else: # Autodetects the last slurm error raise RPCError() - + if not db_connection: # Autocommit if no connection was explicitly specified. conn.commit() @@ -528,7 +528,7 @@ cdef class Job: SlurmList step_list SlurmListItem step_ptr - step_list = SlurmList.wrap(self.ptr.steps, owned=False) + step_list = SlurmList.wrap(self.ptr.steps, owned=False) for step_ptr in SlurmList.iter_and_pop(step_list): step = JobStep.from_ptr(<slurmdb_step_rec_t*>step_ptr.data) self.steps[step.id] = step @@ -593,7 +593,7 @@ cdef class Job: @property def num_nodes(self): - val = TrackableResources.find_count_in_str(self.ptr.tres_alloc_str, + val = TrackableResources.find_count_in_str(self.ptr.tres_alloc_str, slurm.TRES_NODE) if val is not None: # Job is already running and has nodes allocated @@ -601,7 +601,7 @@ cdef class Job: else: # Job is still pending, so we return the number of requested nodes # instead. - val = TrackableResources.find_count_in_str(self.ptr.tres_req_str, + val = TrackableResources.find_count_in_str(self.ptr.tres_req_str, slurm.TRES_NODE) return val @@ -622,7 +622,7 @@ cdef class Job: task_str = cstr.to_unicode(self.ptr.array_task_str) if not task_str: return None - + if "%" in task_str: # We don't want this % character and everything after it # in here, so remove it. @@ -730,7 +730,7 @@ cdef class Job: return cstr.to_unicode(self.ptr.jobname) # uint32_t lft - + @property def mcs_label(self): return cstr.to_unicode(self.ptr.mcs_label) @@ -757,7 +757,7 @@ cdef class Job: @property def cpus(self): - val = TrackableResources.find_count_in_str(self.ptr.tres_alloc_str, + val = TrackableResources.find_count_in_str(self.ptr.tres_alloc_str, slurm.TRES_CPU) if val is not None: # Job is already running and has cpus allocated @@ -765,11 +765,11 @@ cdef class Job: else: # Job is still pending, so we return the number of requested cpus # instead. - return u32_parse(self.ptr.req_cpus, on_noval=0, zero_is_noval=False) + return u32_parse(self.ptr.req_cpus, on_noval=1) @property def memory(self): - val = TrackableResources.find_count_in_str(self.ptr.tres_req_str, + val = TrackableResources.find_count_in_str(self.ptr.tres_req_str, slurm.TRES_MEM) return val diff --git a/pyslurm/db/stats.pxd b/pyslurm/db/stats.pxd index 9004402b..71faca88 100644 --- a/pyslurm/db/stats.pxd +++ b/pyslurm/db/stats.pxd @@ -26,6 +26,7 @@ from pyslurm cimport slurm from pyslurm.slurm cimport ( try_xmalloc, slurmdb_stats_t, + slurmdb_step_rec_t, slurmdb_job_rec_t, ) from pyslurm.db.tres cimport TrackableResources @@ -33,9 +34,48 @@ from pyslurm.db.step cimport JobStep, JobSteps from pyslurm.db.job cimport Job from pyslurm.utils cimport cstr - cdef class JobStatistics: - """Statistics for a Slurm Job or Step. + """Statistics for a Slurm Job or Job Collection. + + Attributes: + total_cpu_time (int): + Sum of user_cpu_time and system_cpu_time, in seconds + user_cpu_time (int): + Total amount of time spent in user space, in seconds + system_cpu_time (int): + Total amount of time spent in kernel space, in seconds + consumed_energy (int): + Total amount of energy consumed, in joules + elapsed_cpu_time (int): + Total amount of time used(Elapsed time * cpu count) in seconds. + This is not the real CPU-Efficiency, but rather the total amount + of cpu-time the CPUs were occupied for. + disk_read (int): + Total amount of bytes read. + disk_write (int): + Total amount of bytes written. + page_faults (int): + Total amount of page faults. + resident_memory (int): + Total Resident Set Size (RSS) used in bytes. + virtual_memory (int): + Total Virtual Memory Size (VSZ) used in bytes. + """ + cdef public: + total_cpu_time + user_cpu_time + system_cpu_time + elapsed_cpu_time + consumed_energy + disk_read + disk_write + page_faults + resident_memory + virtual_memory + + +cdef class JobStepStatistics: + """Statistics for a Slurm JobStep. !!! note @@ -105,8 +145,6 @@ cdef class JobStatistics: system_cpu_time (int): Amount of Time spent in kernel space, in seconds """ - cdef slurmdb_job_rec_t *job - cdef public: consumed_energy elapsed_cpu_time @@ -140,8 +178,8 @@ cdef class JobStatistics: system_cpu_time @staticmethod - cdef JobStatistics from_job_steps(Job job) + cdef JobStepStatistics from_step(JobStep step) @staticmethod - cdef JobStatistics from_step(JobStep step) + cdef JobStepStatistics from_ptr(slurmdb_step_rec_t *step, list nodes, cpus=*, elapsed_time=*, is_live=*) diff --git a/pyslurm/db/stats.pyx b/pyslurm/db/stats.pyx index c2da1145..c3379f7d 100644 --- a/pyslurm/db/stats.pyx +++ b/pyslurm/db/stats.pyx @@ -28,33 +28,52 @@ from pyslurm.utils.helpers import ( ) -def reset_stats_for_job_collection(jobs): - jobs.consumed_energy = 0 - jobs.disk_read = 0 - jobs.disk_write = 0 - jobs.page_faults = 0 - jobs.resident_memory = 0 - jobs.virtual_memory = 0 - jobs.elapsed_cpu_time = 0 - jobs.total_cpu_time = 0 - jobs.user_cpu_time = 0 - jobs.system_cpu_time = 0 - - -def add_stats_to_job_collection(jobs, JobStatistics js): - jobs.consumed_energy += js.consumed_energy - jobs.disk_read += js.avg_disk_read - jobs.disk_write += js.avg_disk_write - jobs.page_faults += js.avg_page_faults - jobs.resident_memory += js.avg_resident_memory - jobs.virtual_memory += js.avg_virtual_memory - jobs.elapsed_cpu_time += js.elapsed_cpu_time - jobs.total_cpu_time += js.total_cpu_time - jobs.user_cpu_time += js.user_cpu_time - jobs.system_cpu_time += js.system_cpu_time +cdef class JobStatistics: + def __init__(self): + for attr, val in instance_to_dict(self).items(): + setattr(self, attr, 0) -cdef class JobStatistics: + def to_dict(self): + return instance_to_dict(self) + + @staticmethod + def from_steps(steps): + cdef JobStatistics total_stats = JobStatistics() + for step in steps.values(): + total_stats._sum_steps(step.stats) + + return total_stats + + def _sum_steps(self, src): + self.consumed_energy += src.consumed_energy + self.disk_read += src.avg_disk_read + self.disk_write += src.avg_disk_write + self.page_faults += src.avg_page_faults + self.total_cpu_time += src.total_cpu_time + self.user_cpu_time += src.user_cpu_time + self.system_cpu_time += src.system_cpu_time + + if src.max_resident_memory > self.resident_memory: + self.resident_memory = src.max_resident_memory + + if src.max_virtual_memory > self.resident_memory: + self.virtual_memory = src.max_virtual_memory + + def add(self, src): + self.consumed_energy += src.consumed_energy + self.disk_read += src.disk_read + self.disk_write += src.disk_write + self.page_faults += src.page_faults + self.total_cpu_time += src.total_cpu_time + self.user_cpu_time += src.user_cpu_time + self.system_cpu_time += src.system_cpu_time + self.resident_memory += src.resident_memory + self.virtual_memory += src.virtual_memory + self.elapsed_cpu_time += src.elapsed_cpu_time + + +cdef class JobStepStatistics: def __init__(self): for attr, val in instance_to_dict(self).items(): @@ -77,41 +96,32 @@ cdef class JobStatistics: return instance_to_dict(self) @staticmethod - cdef JobStatistics from_job_steps(Job job): - cdef JobStatistics job_stats = JobStatistics() - - for step in job.steps.values(): - job_stats._add_base_stats(step.stats) - - job_stats._sum_cpu_time(job) - - step_count = len(job.steps) - if step_count: - job_stats.avg_cpu_frequency /= step_count - - return job_stats + cdef JobStepStatistics from_step(JobStep step): + return JobStepStatistics.from_ptr( + step.ptr, + nodelist_from_range_str(cstr.to_unicode(step.ptr.nodes)), + step.cpus if step.cpus else 0, + step.elapsed_time if step.elapsed_time else 0, + is_live=False, + ) @staticmethod - cdef JobStatistics from_step(JobStep step): - cdef JobStatistics wrap = JobStatistics() - if not &step.ptr.stats: + cdef JobStepStatistics from_ptr(slurmdb_step_rec_t *step, list nodes, cpus=0, elapsed_time=0, is_live=False): + cdef JobStepStatistics wrap = JobStepStatistics() + if not step: return wrap cdef: - list nodes = nodelist_from_range_str( - cstr.to_unicode(step.ptr.nodes)) cpu_time_adj = 1000 - slurmdb_stats_t *ptr = &step.ptr.stats + slurmdb_stats_t *ptr = &step.stats if ptr.consumed_energy != slurm.NO_VAL64: wrap.consumed_energy = ptr.consumed_energy - wrap.avg_cpu_time = TrackableResources.find_count_in_str( - ptr.tres_usage_in_ave, slurm.TRES_CPU) / cpu_time_adj + wrap.avg_cpu_time = int(TrackableResources.find_count_in_str( + ptr.tres_usage_in_ave, slurm.TRES_CPU) / cpu_time_adj) - elapsed = step.elapsed_time if step.elapsed_time else 0 - cpus = step.cpus if step.cpus else 0 - wrap.elapsed_cpu_time = elapsed * cpus + wrap.elapsed_cpu_time = elapsed_time * cpus ave_freq = int(ptr.act_cpufreq) if ave_freq != slurm.NO_VAL: @@ -127,7 +137,7 @@ cdef class JobStatistics: ptr.tres_usage_in_ave, slurm.TRES_MEM) wrap.avg_virtual_memory = TrackableResources.find_count_in_str( ptr.tres_usage_in_ave, slurm.TRES_VMEM) - + wrap.max_disk_read = TrackableResources.find_count_in_str( ptr.tres_usage_in_max, slurm.TRES_FS_DISK) max_disk_read_nodeid = TrackableResources.find_count_in_str( @@ -156,15 +166,27 @@ cdef class JobStatistics: wrap.max_virtual_memory_task = TrackableResources.find_count_in_str( ptr.tres_usage_in_max_taskid, slurm.TRES_VMEM) - wrap.min_cpu_time = TrackableResources.find_count_in_str( - ptr.tres_usage_in_min, slurm.TRES_CPU) / cpu_time_adj + wrap.min_cpu_time = int(TrackableResources.find_count_in_str( + ptr.tres_usage_in_min, slurm.TRES_CPU) / cpu_time_adj) min_cpu_time_nodeid = TrackableResources.find_count_in_str( ptr.tres_usage_in_min_nodeid, slurm.TRES_CPU) wrap.min_cpu_time_task = TrackableResources.find_count_in_str( ptr.tres_usage_in_min_taskid, slurm.TRES_CPU) - wrap.total_cpu_time = TrackableResources.find_count_in_str( - ptr.tres_usage_in_tot, slurm.TRES_CPU) + # The Total CPU-Time extracted here is only used for live-stats. + # sacct does not use it from the tres_usage_in_tot string, but instead + # the tot_cpu_sec value from the step pointer directly, so do that too. + if is_live: + wrap.total_cpu_time = int(TrackableResources.find_count_in_str( + ptr.tres_usage_in_tot, slurm.TRES_CPU) / cpu_time_adj) + elif step.tot_cpu_sec != slurm.NO_VAL64: + wrap.total_cpu_time += step.tot_cpu_sec + + if step.user_cpu_sec != slurm.NO_VAL64: + wrap.user_cpu_time += step.user_cpu_sec + + if step.sys_cpu_sec != slurm.NO_VAL64: + wrap.system_cpu_time += step.sys_cpu_sec if nodes: wrap.max_disk_write_node = nodes[max_disk_write_nodeid] @@ -173,64 +195,4 @@ cdef class JobStatistics: wrap.max_virtual_memory_node = nodes[max_virtual_memory_nodeid] wrap.min_cpu_time_node = nodes[min_cpu_time_nodeid] - if step.ptr.user_cpu_sec != slurm.NO_VAL64: - wrap.user_cpu_time = step.ptr.user_cpu_sec - - if step.ptr.sys_cpu_sec != slurm.NO_VAL64: - wrap.system_cpu_time = step.ptr.sys_cpu_sec - return wrap - - def _add_base_stats(self, JobStatistics src): - self.consumed_energy += src.consumed_energy - self.avg_cpu_time += src.avg_cpu_time - self.avg_cpu_frequency += src.avg_cpu_frequency - self.avg_disk_read += src.avg_disk_read - self.avg_disk_write += src.avg_disk_write - self.avg_page_faults += src.avg_page_faults - - if src.max_disk_read >= self.max_disk_read: - self.max_disk_read = src.max_disk_read - self.max_disk_read_node = src.max_disk_read_node - self.max_disk_read_task = src.max_disk_read_task - - if src.max_disk_write >= self.max_disk_write: - self.max_disk_write = src.max_disk_write - self.max_disk_write_node = src.max_disk_write_node - self.max_disk_write_task = src.max_disk_write_task - - if src.max_page_faults >= self.max_page_faults: - self.max_page_faults = src.max_page_faults - self.max_page_faults_node = src.max_page_faults_node - self.max_page_faults_task = src.max_page_faults_task - - if src.max_resident_memory >= self.max_resident_memory: - self.max_resident_memory = src.max_resident_memory - self.max_resident_memory_node = src.max_resident_memory_node - self.max_resident_memory_task = src.max_resident_memory_task - self.avg_resident_memory = self.max_resident_memory - - if src.max_virtual_memory >= self.max_virtual_memory: - self.max_virtual_memory = src.max_virtual_memory - self.max_virtual_memory_node = src.max_virtual_memory_node - self.max_virtual_memory_task = src.max_virtual_memory_task - self.avg_virtual_memory = self.max_virtual_memory - - if src.min_cpu_time >= self.min_cpu_time: - self.min_cpu_time = src.min_cpu_time - self.min_cpu_time_node = src.min_cpu_time_node - self.min_cpu_time_task = src.min_cpu_time_task - - def _sum_cpu_time(self, Job job): - if job.ptr.tot_cpu_sec != slurm.NO_VAL64: - self.total_cpu_time += job.ptr.tot_cpu_sec - - if job.ptr.user_cpu_sec != slurm.NO_VAL64: - self.user_cpu_time += job.ptr.user_cpu_sec - - if job.ptr.sys_cpu_sec != slurm.NO_VAL64: - self.system_cpu_time += job.ptr.sys_cpu_sec - - elapsed = job.elapsed_time if job.elapsed_time else 0 - cpus = job.cpus if job.cpus else 0 - self.elapsed_cpu_time += elapsed * cpus diff --git a/pyslurm/db/step.pxd b/pyslurm/db/step.pxd index 906be912..6b9466ce 100644 --- a/pyslurm/db/step.pxd +++ b/pyslurm/db/step.pxd @@ -39,7 +39,7 @@ from pyslurm.slurm cimport ( from pyslurm.db.util cimport SlurmList, SlurmListItem from pyslurm.db.connection cimport Connection from pyslurm.utils cimport cstr -from pyslurm.db.stats cimport JobStatistics +from pyslurm.db.stats cimport JobStepStatistics from pyslurm.db.tres cimport TrackableResources, TrackableResource @@ -52,7 +52,7 @@ cdef class JobStep: """A Slurm Database JobStep. Attributes: - stats (pyslurm.db.JobStatistics): + stats (pyslurm.db.JobStepStatistics): Utilization statistics for this Step num_nodes (int): Amount of nodes this Step has allocated @@ -96,7 +96,7 @@ cdef class JobStep: Amount of seconds the Step was suspended """ cdef slurmdb_step_rec_t *ptr - cdef public JobStatistics stats + cdef public JobStepStatistics stats @staticmethod cdef JobStep from_ptr(slurmdb_step_rec_t *step) diff --git a/pyslurm/db/step.pyx b/pyslurm/db/step.pyx index 2d71ca73..0faf4d79 100644 --- a/pyslurm/db/step.pyx +++ b/pyslurm/db/step.pyx @@ -65,7 +65,7 @@ cdef class JobStep: cdef JobStep from_ptr(slurmdb_step_rec_t *step): cdef JobStep wrap = JobStep.__new__(JobStep) wrap.ptr = step - wrap.stats = JobStatistics.from_step(wrap) + wrap.stats = JobStepStatistics.from_step(wrap) return wrap def to_dict(self): @@ -110,7 +110,7 @@ cdef class JobStep: @property def memory(self): - val = TrackableResources.find_count_in_str(self.ptr.tres_alloc_str, + val = TrackableResources.find_count_in_str(self.ptr.tres_alloc_str, slurm.TRES_MEM) return val diff --git a/pyslurm/xcollections.pyx b/pyslurm/xcollections.pyx index a0ce2e6b..4e99eaf5 100644 --- a/pyslurm/xcollections.pyx +++ b/pyslurm/xcollections.pyx @@ -21,7 +21,7 @@ # # cython: c_string_type=unicode, c_string_encoding=default # cython: language_level=3 -"""Custom Collection utilities""" +"""Custom Collection utilities""" from pyslurm.settings import LOCAL_CLUSTER import json @@ -158,7 +158,7 @@ class ItemsView(BaseView): """ return MCItemsView(self._mcm) - + class MCItemsView(BaseView): """A Multi-Cluster Items View. @@ -240,7 +240,7 @@ cdef class MultiClusterMap: cluster, key = item else: cluster, key = self._get_cluster(), item - + return cluster, key def _check_val_type(self, item): @@ -450,12 +450,18 @@ cdef class MultiClusterMap: if not self.data: return '{}' + return json.dumps(self.to_dict(multi_cluster=multi_cluster)) + + def to_dict(self, multi_cluster=False): + if not self.data: + return {} + data = multi_dict_recursive(self) if multi_cluster: - return json.dumps(data) + return data else: cluster = self._get_cluster() - return json.dumps(data[cluster]) + return data[cluster] def keys(self): """Return a View of all the Keys in this collection @@ -476,7 +482,7 @@ cdef class MultiClusterMap: ... print(cluster, key) """ return KeysView(self) - + def items(self): """Return a View of all the Values in this collection @@ -556,12 +562,12 @@ cdef class MultiClusterMap: item = self.get(key, default=default) if item is default or item == default: return default - + cluster = item.cluster del self.data[cluster][key] if not self.data[cluster]: del self.data[cluster] - + return item def update(self, data={}, **kwargs): @@ -591,7 +597,7 @@ def multi_reload(cur, frozen=True): for cluster, item in new.keys().with_cluster(): if (cluster, item) not in cur.keys().with_cluster(): cur[cluster][item] = new[cluster][item] - + return cur diff --git a/setup.py b/setup.py index 77e767cd..48c2c14d 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ CYTHON_VERSION_MIN = "0.29.37" # Keep in sync with pyproject.toml -SLURM_LIB = "libslurm" +SLURM_LIB = "libslurmfull" TOPDIR = Path(__file__).parent PYTHON_MIN_REQUIRED = (3, 6) @@ -101,6 +101,8 @@ class SlurmConfig(): def __init__(self): # Assume some defaults here self._lib_dir = Path("/usr/lib64") + self._lib = None + self._lib_dir_search_paths = [] self.inc_dir = Path("/usr/include") self._version = None @@ -110,13 +112,17 @@ def _find_hdr(self, name): raise RuntimeError(f"Cannot locate {name} in {self.inc_full_dir}") return hdr - def _find_lib(self, lib_dir): + def _search_lib(self, lib_dir): + if self._lib: + return + lib = lib_dir / f"{SLURM_LIB}.so" if not lib.exists(): - raise RuntimeError(f"Cannot locate Slurm library in {lib_dir}") - - print(f"Found {SLURM_LIB} library in {lib}") - return lib_dir + self._lib_dir_search_paths.append(str(lib_dir)) + else: + print(f"Found slurm library: {lib}") + self._lib = lib + self._lib_dir = lib_dir @property def lib_dir(self): @@ -124,11 +130,14 @@ def lib_dir(self): @lib_dir.setter def lib_dir(self, path): - lib_dir = Path(path) - if SLURM_LIB == "libslurmfull": - lib_dir /= "slurm" - - self._lib_dir = self._find_lib(lib_dir) + self._search_lib(path) + self._search_lib(path / "slurm") + self._search_lib(path / "slurm-wlm") + + if not self._lib: + searched = "\n- ".join(self._lib_dir_search_paths) + raise RuntimeError("Cannot locate Slurm library. Searched paths: " + f"\n- {searched}") @property def inc_full_dir(self): diff --git a/tests/integration/test_job.py b/tests/integration/test_job.py index 8c9d4750..a6e0c6e4 100644 --- a/tests/integration/test_job.py +++ b/tests/integration/test_job.py @@ -32,6 +32,7 @@ JobSubmitDescription, RPCError, ) +from pyslurm.db import JobStatistics, JobStepStatistics def test_parse_all(submit_job): @@ -171,6 +172,27 @@ def test_load_steps(submit_job): assert job.steps.get("batch") +def test_load_stats(submit_job): + job = submit_job() + util.wait(100) + + job = Job.load(job.id) + job.load_stats() + + assert job.state == "RUNNING" + assert job.stats + assert isinstance(job.stats, JobStatistics) + assert job.stats.elapsed_cpu_time > 0 + assert job.stats.resident_memory > 0 + + for step in job.steps.values(): + assert step.stats + assert step.state == "RUNNING" + assert isinstance(step.stats, JobStepStatistics) + assert step.stats.avg_resident_memory > 0 + assert step.stats.elapsed_cpu_time > 0 + + def test_to_json(submit_job): job_list = [submit_job() for i in range(3)] util.wait()