diff --git a/README.md b/README.md index 73e8b83..96a594f 100644 --- a/README.md +++ b/README.md @@ -1,37 +1,80 @@ # mjobs -A little tool to make it easier to inspect LSF jobs. - -```shell -Usage: mjobs [-h] [-q QUEUE] [-u USER] [-r] [-a] [-d] [-G USER_GROUP] [-g GROUP] [-m HOSTS] [-p] [-e] [-f FILTER] [-t] [-nh] [--bkill] [job_id ...] - -bjobs but a bit nicer - -Positional Arguments: - job_id Specifies the jobs or job arrays that bjobs displays. - -Optional Arguments: - -h, --help show this help message and exit - -q QUEUE Displays jobs in the specified queue - -u USER Displays jobs in the specified user - -r Displays running jobs. - -a Displays information about jobs in all states, including jobs that finished recently. - -d Displays information about jobs that finished recently. - -G USER_GROUP Displays jobs associated with the specified user group. - -g GROUP Displays information about jobs attached to the specified job group. - -m HOSTS Displays jobs dispatched to the specified hosts. - -p Displays pending jobs, together with the pending reasons that caused each job not to be dispatched during the last dispatch turn. - -e Add the execution josts, output file and error file to the table. - -f FILTER Filter the jobs using the specified regex on the job name or pending reason. - -t No fancy table, a good ol' tsv - -nh Don't print the table header, useful to pipe the tsv output - --bkill Terminate found or filtered jobs with bkill. - -``` - -## Example - -![Alt text](images/mjobs-example.png?raw=true "mjobs example") +A little tool to make it easier to inspect [IBM Spectrum LSF](https://www.ibm.com/products/hpc-workload-management) (or just LSF) and [Slurm](https://slurm.schedmd.com/) jobs. + +The program will auto detect [bjobs](https://www.ibm.com/docs/en/spectrum-lsf/10.1.0?topic=bjobs-options) under [LSF](https://www.ibm.com/products/hpc-workload-management), or [squeue](https://slurm.schedmd.com/squeue.html) in [Slurm](https://slurm.schedmd.com/). + +## IBM LSF + +mjobs doesn't not support all the options of bjobs, only a subset. It also adds a the `-f FILTER`, `--bkill`, `-ts` and `-nh` options. + +
[mbc@codon-login-02 ~]$ ./mjobs -h
+Usage: mjobs [-h] [-f FILTER] [-ts] [-nh] [-q QUEUE] [-u USER] [-r] [-a] [-d] [-G USER_GROUP] [-g GROUP] [-m HOSTS] [-p] [-e] [--bkill] [job_id ...]
+
+Just like bjobs but a bit nicer
+
+Positional Arguments:
+  job_id         Specifies the jobs or job arrays that bjobs displays.
+
+Optional Arguments:
+  -h, --help     show this help message and exit
+  -f FILTER      Filter the jobs using the specified regex on the job name or pending reason.
+  -ts, --tsv     No fancy table, a good ol' tsv
+  -nh            Don't print the table header, useful to pipe the tsv output
+  -q QUEUE       Displays jobs in the specified queue
+  -u USER        Displays jobs in the specified user
+  -r             Displays running jobs.
+  -a             Displays information about jobs in all states, including jobs that finished recently.
+  -d             Displays information about jobs that finished recently.
+  -G USER_GROUP  Displays jobs associated with the specified user group.
+  -g GROUP       Displays information about jobs attached to the specified job group.
+  -m HOSTS       Displays jobs dispatched to the specified hosts.
+  -p             Displays pending jobs, together with the pending reasons that caused each job not to be dispatched during the last dispatch turn.
+  -e             Add the execution hosts, output file and error file to the table.
+  --bkill        Terminate found or filtered jobs with bkill.
+ +### Example + +
[mbc@codon-login-02 ~]$ mjobs -u emgpr_wgs -f SRR12480298_mgnify_analysis
+                                                                LSF jobs for emgpr_wgs                                                                 
+┏━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┓
+┃    JobId  Status  JobName                      JobGroup  User       Queue       Submit Time   Start Time    Finish Time     Pending reason ┃
+┡━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━┩
+│ 87346050 │ RUN   SRR12480298_mgnify_analysis │   ----   │ emgpr_wgs │ production │ Feb 25 08:00 │ Feb 25 08:03 │ Feb 28 08:03 L │      ----      │
+└──────────┴────────┴─────────────────────────────┴──────────┴───────────┴────────────┴──────────────┴──────────────┴────────────────┴────────────────┘
+
+ +## Slurm + +mjobs doesn't not support all the options of squeue, only a subset. It also adds a the `-f FILTER`, `-ts` and `-nh` options. + +
[mbc@codon-slurm-login-01 ~]$ ./mjobs -h
+Usage: mjobs [-h] [-f FILTER] [-ts] [-nh] [-p PARTITION] [-u USER]
+             [-t {pending,running,suspended,completed,cancelled,failed,timeout,node_fail,preempted,boot_fail,deadline,out_of_memory,completing,configuring,resizing,resv_del_hold,requeued,requeue_fed,requeue_hold,revoked,signaling,special_exit,stage_out,stopped} [{pending,running,suspended,completed,cancelled,failed,timeout,node_fail,preempted,boot_fail,deadline,out_of_memory,completing,configuring,resizing,resv_del_hold,requeued,requeue_fed,requeue_hold,revoked,signaling,special_exit,stage_out,stopped} ...]]
+             [-w NODELIST [NODELIST ...]] [-e]
+             [job_id ...]
+
+Just like squeue but a bit nicer
+
+Positional Arguments:
+  job_id                Specifies the jobs or job arrays that squeue displays.
+
+Optional Arguments:
+  -h, --help            show this help message and exit
+  -f FILTER             Filter the jobs using the specified regex on the job name or pending reason.
+  -ts, --tsv            No fancy table, a good ol' tsv
+  -nh                   Don't print the table header, useful to pipe the tsv output
+  -p, --partition PARTITION
+                        Specify the partitions of the jobs or steps to view. Accepts a comma separated list of partition names.
+  -u, --user USER       Request jobs or job steps from a comma separated list of users. The list can consist of user names or user id numbers. Performance of the command can be measurably improved for systems with large numbers of jobs when a
+                        single user is specified.
+  -t, --states {pending,running,suspended,completed,cancelled,failed,timeout,node_fail,preempted,boot_fail,deadline,out_of_memory,completing,configuring,resizing,resv_del_hold,requeued,requeue_fed,requeue_hold,revoked,signaling,special_exit,stage_out,stopped} [{pending,running,suspended,completed,cancelled,failed,timeout,node_fail,preempted,boot_fail,deadline,out_of_memory,completing,configuring,resizing,resv_del_hold,requeued,requeue_fed,requeue_hold,revoked,signaling,special_exit,stage_out,stopped} ...]
+                        Specify the states of jobs to view. Accepts a comma separated list of state names or 'all'. If 'all' is specified then jobs of all states will be reported. If no state is specified then pending, running, and completing jobs
+                        are reported. See the JOB STATE CODES section below for a list of valid states. Both extended and compact forms are valid. Note the <state_list> supplied is case insensitive ('pending' and 'PENDING' are equivalent).
+  -w, --nodelist NODELIST [NODELIST ...]
+                        Report only on jobs allocated to the specified node or list of nodes. This may either be the NodeName or NodeHostname as defined in slurm.conf(5) in the event that they differ. A node_name of localhost is mapped to the
+                        current host name.
+  -e                    Add the execution nodes, stdoutput file and stderror file to the table.
# Installation - bundle the app diff --git a/Taskfile.yml b/Taskfile.yml index 68729d7..5bdf4e7 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -4,3 +4,4 @@ tasks: build: cmds: - pyinstaller mjobs/main.py --onefile --clean --name mjobs + - scp dist/mjobs codon:~/ diff --git a/images/mjobs-example.png b/images/mjobs-example.png deleted file mode 100644 index 5ceac1f..0000000 Binary files a/images/mjobs-example.png and /dev/null differ diff --git a/mjobs/base.py b/mjobs/base.py new file mode 100644 index 0000000..ebd010d --- /dev/null +++ b/mjobs/base.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright 2021-2024 - Martin Beracochea +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import csv +import sys +from abc import ABC + +from rich.console import Console +from rich.table import Table +from rich_argparse import RichHelpFormatter + + +class Base(ABC): + def __init__(self, console: Console, error_console: Console) -> None: + """console is the default one to print content + error_console will be used to print error messages + """ + self.console = console + self.error_console = console + super().__init__() + + def get_args(self, implementation_name: str): + """Base arguments that every implementation should support""" + parser = argparse.ArgumentParser( + description=f"Just like {implementation_name} but a bit nicer", + formatter_class=RichHelpFormatter, + ) + parser.add_argument( + "-f", + dest="filter", + required=False, + help="Filter the jobs using the specified regex on the job name or pending reason.", + ) + parser.add_argument( + "-ts", + "--tsv", + dest="tsv", + action="store_true", + help="No fancy table, a good ol' tsv", + ) + parser.add_argument( + "-nh", + dest="no_header", + action="store_true", + help="Don't print the table header, useful to pipe the tsv output", + ) + return parser + + def render( + self, + title: str, + columns: list[dict[str, any]], + rows: list[dict[str, any]], + ): + """Render the jobs""" + if self.args.tsv: + # print with no styles + writer = csv.writer(sys.stdout, delimiter="\t") + if not self.args.no_header: + writer.writerow([c.get("header") for c in columns]) + writer.writerows(rows) + else: + # print the fancy table + table = Table( + title=title, show_lines=True, show_header=not self.args.no_header + ) + for col in columns: + table.add_column(**col) + for row in rows: + table.add_row(*row) + self.console.print(table) + + def main(self): + """Main execution point. + Should handle the logic to get jobs, build the table and any other features""" + pass diff --git a/mjobs/lsf.py b/mjobs/lsf.py index 879bbcc..c9669ee 100644 --- a/mjobs/lsf.py +++ b/mjobs/lsf.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# Copyright 2019-2021 - Martin Beracochea +# Copyright 2021-2024 - Martin Beracochea # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. @@ -14,63 +14,295 @@ # See the License for the specific language governing permissions and # limitations under the License. +import getpass import json +import re +import sys from subprocess import check_output from typing import Optional +from rich.console import Console +from rich.text import Text -def _parse_bjobs(bjobs_output_str): - """Parse records from bjobs json type output. - This snippet comes from: https://github.com/DataBiosphere/toil/blob/master/src/toil/batchSystems/lsf.py - :param bjobs_output_str: stdout of bjobs json type output - :return: list with the jobs - """ - bjobs_dict = None - bjobs_records = None - # Handle Cannot connect to LSF. Please wait ... type messages - dict_start = bjobs_output_str.find("{") - dict_end = bjobs_output_str.rfind("}") - if dict_start != -1 and dict_end != -1: - bjobs_output = bjobs_output_str[dict_start : (dict_end + 1)] - bjobs_dict = json.loads(bjobs_output) - return bjobs_dict["RECORDS"] - if bjobs_records is None: - raise ValueError(f"Could not find bjobs output json in: {bjobs_output_str}") - return [] - - -def get_jobs(job_ids: Optional[list[int]] = None, lsf_args: Optional[list[str]] = None): - """bjobs command, it uses the json output and includes [stat, name and jobid]. - Any other parameters in lsf_args will be included in the call to bjobs. - """ - fields = [ - "stat", - "name", - "jobid", - "job_group", - "user", - "queue", - "submit_time", - "start_time", - "finish_time", - "exec_host", - "command", - "exit_reason", - "exit_code", - "error_file", - "output_file", - "pend_reason", - ] - args = ["bjobs", "-json", "-o", " ".join(fields)] - if lsf_args: - args.extend(list(map(str, lsf_args))) - if job_ids: - args.extend(list(map(str, job_ids))) - bjobs_output = check_output(args, universal_newlines=True) - jobs = _parse_bjobs(bjobs_output) - return jobs - - -def bkill(job: int) -> None: - args = ["bkill", str(job)] - return check_output(args, universal_newlines=True) +from mjobs.base import Base + + +class LSF(Base): + def __init__(self, console: Console, error_console: Console): + super().__init__(console, error_console) + + def status_style(self, job_entry) -> Text: + if job_entry["STAT"] == "RUN": + return Text(job_entry["STAT"], style="bold green") + elif job_entry["STAT"] == "PEND": + return Text(job_entry["STAT"], style="dark_orange") + elif job_entry["STAT"] == "DONE": + return Text(job_entry["STAT"], style="honeydew2") + return Text(job_entry["STAT"], style="grey93") + + def get_args(self): + parser = super().get_args("bjobs") + parser.add_argument( + dest="job_id", + help="Specifies the jobs or job arrays that bjobs displays.", + nargs="*", + ) + parser.add_argument( + "-q", + dest="queue", + required=False, + help="Displays jobs in the specified queue", + ) + parser.add_argument( + "-u", + dest="user", + required=False, + help="Displays jobs in the specified user", + ) + parser.add_argument( + "-r", dest="run", action="store_true", help="Displays running jobs." + ) + parser.add_argument( + "-a", + dest="all", + action="store_true", + help="Displays information about jobs in all states, including jobs that finished recently.", + ) + parser.add_argument( + "-d", + dest="recent", + action="store_true", + help="Displays information about jobs that finished recently.", + ) + parser.add_argument( + "-G", + dest="user_group", + required=False, + help="Displays jobs associated with the specified user group.", + ) + parser.add_argument( + "-g", + dest="group", + required=False, + help="Displays information about jobs attached to the specified job group.", + ) + parser.add_argument( + "-m", + dest="hosts", + required=False, + help="Displays jobs dispatched to the specified hosts.", + ) + parser.add_argument( + "-p", + dest="pend", + action="store_true", + help=( + "Displays pending jobs, together with the pending reasons that caused each job " + "not to be dispatched during the last dispatch turn." + ), + ) + parser.add_argument( + "-e", + dest="extended", + action="store_true", + help="Add the execution hosts, output file and error file to the table.", + ) + parser.add_argument( + "--bkill", + dest="bkill", + action="store_true", + help="Terminate found or filtered jobs with bkill.", + ) + self.args = parser.parse_args() + return parser + + def parse_bjobs(self, bjobs_output_str): + """Parse records from bjobs json type output. + This snippet comes from: https://github.com/DataBiosphere/toil/blob/master/src/toil/batchSystems/lsf.py + :param bjobs_output_str: stdout of bjobs json type output + :return: list with the jobs + """ + bjobs_dict = None + bjobs_records = None + # Handle Cannot connect to LSF. Please wait ... type messages + dict_start = bjobs_output_str.find("{") + dict_end = bjobs_output_str.rfind("}") + if dict_start != -1 and dict_end != -1: + bjobs_output = bjobs_output_str[dict_start : (dict_end + 1)] + bjobs_dict = json.loads(bjobs_output) + return bjobs_dict["RECORDS"] + if bjobs_records is None: + raise ValueError(f"Could not find bjobs output json in: {bjobs_output_str}") + return [] + + def get_jobs( + self, job_ids: Optional[list[int]] = None, lsf_args: Optional[list[str]] = None + ): + """bjobs command, it uses the json output and includes [stat, name and jobid]. + Any other parameters in lsf_args will be included in the call to bjobs. + """ + fields = [ + "stat", + "name", + "jobid", + "job_group", + "user", + "queue", + "submit_time", + "start_time", + "finish_time", + "exec_host", + "command", + "exit_reason", + "exit_code", + "error_file", + "output_file", + "pend_reason", + ] + args = ["bjobs", "-json", "-o", " ".join(fields)] + if lsf_args: + args.extend(list(map(str, lsf_args))) + if job_ids: + args.extend(list(map(str, job_ids))) + bjobs_output = check_output(args, universal_newlines=True) + jobs = self.parse_bjobs(bjobs_output) + return jobs + + def bkill(self, job: int) -> None: + args = ["bkill", str(job)] + return check_output(args, universal_newlines=True) + + def main(self): + """Main execution point, should contain all the code to handle the LSF implementation""" + + self.get_args() + + jobs = [] + lsf_args = [] + + if self.args.user: + lsf_args.extend(["-u", self.args.user]) + if self.args.queue: + lsf_args.extend(["-q", self.args.queue]) + if self.args.run: + lsf_args.extend(["-r"]) + if self.args.all: + lsf_args.extend(["-a"]) + if self.args.recent: + lsf_args.extend(["-d"]) + if self.args.user_group: + lsf_args.extend(["-G", self.args.user_group]) + if self.args.group: + lsf_args.extend(["-g", self.args.group]) + if self.args.hosts: + lsf_args.extend(["-m", self.args.hosts]) + if self.args.pend: + lsf_args.extend(["-p"]) + + try: + status = self.console.status("Getting jobs from LSF...") + if not self.args.tsv: + status.start() + + jobs = self.get_jobs(self.args.job_id, lsf_args) + + if not self.args.tsv: + status.stop() + + except Exception: + if not self.args.tsv: + status.stop() + self.console.print_exception() + + if self.args.filter: + filter_regex = re.compile(self.args.filter) + jobs = list( + filter( + lambda j: filter_regex.search(j["JOB_NAME"]) + or filter_regex.search(j["PEND_REASON"]), + jobs, + ) + ) + + if not jobs: + self.console.print(Text("No jobs.", style="bold white", justify="left")) + sys.exit(0) + + title = f"LSF jobs for {self.args.user or getpass.getuser()}" + if self.args.queue: + title += f" on queue {self.args.queue}" + if self.args.hosts: + title += f" running on hosts {self.args.hosts}" + + cols = [ + {"header": "JobId", "justify": "right"}, + {"header": "Status"}, + {"header": "JobName", "overflow": "fold"}, + {"header": "JobGroup"}, + {"header": "User"}, + {"header": "Queue"}, + {"header": "Submit Time"}, + {"header": "Start Time"}, + {"header": "Finish Time"}, + {"header": "Pending reason"}, + ] + if self.args.extended: + cols.append({"header": "Exec. Host"}) + cols.append({"header": "Error File", "overflow": "fold"}) + cols.append({"header": "Output File", "overflow": "fold"}) + + rows = [] + + for job in sorted(jobs, key=lambda j: j["JOBID"]): + if "ERROR" in job: + self.error_console.log( + f"The job {job['JOBID']} has an error: {job['ERROR']}" + ) + continue + job_name = Text(job["JOB_NAME"]) + pending_reason = Text(job["PEND_REASON"]) or Text("----", justify="center") + if self.args.filter: + job_name.highlight_regex(rf"{self.args.filter}", "bold red") + pending_reason.highlight_regex(rf"{self.args.filter}", "bold red") + + row = [ + job["JOBID"], + self.status_style(job), + job_name, + job["JOB_GROUP"] or Text("----", justify="center"), + job["USER"], + job["QUEUE"], + job["SUBMIT_TIME"], + job["START_TIME"], + job["FINISH_TIME"], + pending_reason, + ] + if self.args.extended: + row.extend( + [ + Text(job["EXEC_HOST"], overflow="fold"), + job["ERROR_FILE"], + job["OUTPUT_FILE"], + ] + ) + + rows.append(row) + + self.render(title=title, columns=cols, rows=rows) + + if self.args.bkill: + self.console.rule() + self.console.print( + Text("Running bkill for each job..."), + style="bold white", + justify="center", + ) + for job in jobs: + job_id = job["JOBID"] + try: + lsf_bkill_output = self.bkill(job_id) + self.console.print(lsf_bkill_output.replace("\n", "")) + except Exception: + self.error_console.print( + Text(f"bkill for {job_id} failed"), style="bold red" + ) diff --git a/mjobs/main.py b/mjobs/main.py index 3afb486..acdaabb 100644 --- a/mjobs/main.py +++ b/mjobs/main.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# Copyright 2019-2021 - Martin Beracochea +# Copyright 2019-2024 - Martin Beracochea # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. @@ -14,256 +14,25 @@ # See the License for the specific language governing permissions and # limitations under the License. -import argparse -import csv -import getpass -import re +import shutil import sys from rich.console import Console -from rich.table import Table -from rich.text import Text -from rich_argparse import RichHelpFormatter - -from mjobs import lsf - - -def _status_style(job_entry) -> Text: - if job_entry["STAT"] == "RUN": - return Text(job_entry["STAT"], style="bold green") - elif job_entry["STAT"] == "PEND": - return Text(job_entry["STAT"], style="dark_orange") - elif job_entry["STAT"] == "DONE": - return Text(job_entry["STAT"], style="honeydew2") - return Text(job_entry["STAT"], style="grey93") - - -def _get_args(): - parser = argparse.ArgumentParser( - description="bjobs but a bit nicer", formatter_class=RichHelpFormatter - ) - parser.add_argument( - dest="job_id", - help="Specifies the jobs or job arrays that bjobs displays.", - nargs="*", - ) - parser.add_argument( - "-q", dest="queue", required=False, help="Displays jobs in the specified queue" - ) - parser.add_argument( - "-u", dest="user", required=False, help="Displays jobs in the specified user" - ) - parser.add_argument( - "-r", dest="run", action="store_true", help="Displays running jobs." - ) - parser.add_argument( - "-a", - dest="all", - action="store_true", - help="Displays information about jobs in all states, including jobs that finished recently.", - ) - parser.add_argument( - "-d", - dest="recent", - action="store_true", - help="Displays information about jobs that finished recently.", - ) - parser.add_argument( - "-G", - dest="user_group", - required=False, - help="Displays jobs associated with the specified user group.", - ) - parser.add_argument( - "-g", - dest="group", - required=False, - help="Displays information about jobs attached to the specified job group.", - ) - parser.add_argument( - "-m", - dest="hosts", - required=False, - help="Displays jobs dispatched to the specified hosts.", - ) - parser.add_argument( - "-p", - dest="pend", - action="store_true", - help=( - "Displays pending jobs, together with the pending reasons that caused each job " - "not to be dispatched during the last dispatch turn." - ), - ) - parser.add_argument( - "-e", - dest="extended", - action="store_true", - help="Add the execution josts, output file and error file to the table.", - ) - parser.add_argument( - "-f", - dest="filter", - required=False, - help="Filter the jobs using the specified regex on the job name or pending reason.", - ) - parser.add_argument( - "-t", - dest="tsv", - action="store_true", - help="No fancy table, a good ol' tsv", - ) - parser.add_argument( - "-nh", - dest="no_header", - action="store_true", - help="Don't print the table header, useful to pipe the tsv output", - ) - parser.add_argument( - "--bkill", - dest="bkill", - action="store_true", - help="Terminate found or filtered jobs with bkill.", - ) - return parser.parse_args() +from mjobs.lsf import LSF +from mjobs.slurm import Slurm if __name__ == "__main__": - args = _get_args() console = Console() - - jobs = [] - lsf_args = [] - - if args.user: - lsf_args.extend(["-u", args.user]) - if args.queue: - lsf_args.extend(["-q", args.queue]) - if args.run: - lsf_args.extend(["-r"]) - if args.all: - lsf_args.extend(["-a"]) - if args.recent: - lsf_args.extend(["-d"]) - if args.user_group: - lsf_args.extend(["-G", args.user_group]) - if args.group: - lsf_args.extend(["-g", args.group]) - if args.hosts: - lsf_args.extend(["-m", args.hosts]) - if args.pend: - lsf_args.extend(["-p"]) - - try: - status = console.status("Getting jobs from LSF...") - if not args.tsv: - status.start() - - jobs = lsf.get_jobs(args.job_id, lsf_args) - - if not args.tsv: - status.stop() - - except Exception: - console.print_exception() - - if args.filter: - filter_regex = re.compile(args.filter) - jobs = list( - filter( - lambda j: filter_regex.search(j["JOB_NAME"]) - or filter_regex.search(j["PEND_REASON"]), - jobs, - ) - ) - - if not jobs: - console.print(Text("No jobs.", style="bold white", justify="left")) - sys.exit(0) - - title = f"LSF jobs for {args.user or getpass.getuser()}" - if args.queue: - title += f" on queue {args.queue}" - if args.hosts: - title += f" running on hosts {args.hosts}" - - cols = [ - {"header": "JobId", "justify": "right"}, - {"header": "Status"}, - {"header": "JobName", "overflow": "fold"}, - {"header": "JobGroup"}, - {"header": "User"}, - {"header": "Queue"}, - {"header": "Submit Time"}, - {"header": "Start Time"}, - {"header": "Finish Time"}, - {"header": "Pending reason"}, - ] - if args.extended: - cols.append({"header": "Exec. Host"}) - cols.append({"header": "Error File", "overflow": "fold"}) - cols.append({"header": "Output File", "overflow": "fold"}) - - rows = [] - - for job in sorted(jobs, key=lambda j: j["JOBID"]): - if "ERROR" in job: - console.print(f"The job {job['JOBID']} has an error: {job['ERROR']}") - continue - job_name = Text(job["JOB_NAME"]) - pending_reason = Text(job["PEND_REASON"]) or Text("----", justify="center") - if args.filter: - job_name.highlight_regex(rf"{args.filter}", "bold red") - pending_reason.highlight_regex(rf"{args.filter}", "bold red") - - row = [ - job["JOBID"], - _status_style(job), - job_name, - job["JOB_GROUP"] or Text("----", justify="center"), - job["USER"], - job["QUEUE"], - job["SUBMIT_TIME"], - job["START_TIME"], - job["FINISH_TIME"], - pending_reason, - ] - if args.extended: - row.extend( - [ - Text(job["EXEC_HOST"], overflow="fold"), - job["ERROR_FILE"], - job["OUTPUT_FILE"], - ] - ) - - rows.append(row) - - if args.tsv: - # print with no styles - writer = csv.writer(sys.stdout, delimiter="\t") - if not args.no_header: - writer.writerow([c.get("header") for c in cols]) - writer.writerows(rows) + error_console = Console(stderr=True, style="bold red") + + if shutil.which("bjobs"): + lsf = LSF(console, error_console) + lsf.main() + elif shutil.which("squeue"): + slurm = Slurm(console, error_console) + slurm.main() else: - # print the fancy table - table = Table(title=title, show_lines=True, show_header=not args.no_header) - for col in cols: - table.add_column(**col) - for row in rows: - table.add_row(*row) - console.print(table) - - if args.bkill: - console.rule() - console.print( - Text("Running bkill for each job..."), style="bold white", justify="center" - ) - for job in jobs: - job_id = job["JOBID"] - try: - lsf_bkill_output = lsf.bkill(job_id) - console.print(lsf_bkill_output.replace("\n", "")) - except Exception: - console.print(Text(f"bkill for {job_id} failed"), style="bold red") + error_console.log("I can't find bjobs or slurm... so, I can't do anything.") + sys.exit(1) diff --git a/mjobs/slurm.py b/mjobs/slurm.py new file mode 100644 index 0000000..4251989 --- /dev/null +++ b/mjobs/slurm.py @@ -0,0 +1,260 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright 2021-2024 - Martin Beracochea +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import getpass +import json +import re +import sys +from subprocess import CalledProcessError, check_output +from typing import Optional + +from rich.console import Console +from rich.text import Text + +from mjobs.base import Base + + +class Slurm(Base): + def __init__(self, console: Console, error_console: Console): + super().__init__(console, error_console) + + def status_style(self, job_entry) -> Text: + colours = { + "RUNNING": "bold green", + "PENDING": "dark_orange", + "COMPLETED": "honeydew2", + "FAILED": "red", + } + return Text( + job_entry["job_state"], style=colours.get(job_entry["job_state"], "grey93") + ) + + def get_args(self): + JOB_STATES = [ + "pending", + "running", + "suspended", + "completed", + "cancelled", + "failed", + "timeout", + "node_fail", + "preempted", + "boot_fail", + "deadline", + "out_of_memory", + "completing", + "configuring", + "resizing", + "resv_del_hold", + "requeued", + "requeue_fed", + "requeue_hold", + "revoked", + "signaling", + "special_exit", + "stage_out", + "stopped", + ] + parser = super().get_args("squeue") + parser.add_argument( + dest="job_id", + help="Specifies the jobs or job arrays that squeue displays.", + nargs="*", + ) + parser.add_argument( + "-p", + "--partition", + dest="partition", + required=False, + help="""Specify the partitions of the jobs or steps to view. + Accepts a comma separated list of partition names.""", + ) + parser.add_argument( + "-u", + "--user", + dest="user", + required=False, + help="""Request jobs or job steps from a comma separated list of users. + The list can consist of user names or user id numbers. + Performance of the command can be measurably improved for systems with + large numbers of jobs when a single user is specified. + """, + ) + parser.add_argument( + "-t", + "--states", + dest="states", + choices=JOB_STATES, + nargs="+", + help="""Specify the states of jobs to view. Accepts a comma separated list of state names or 'all'. + If 'all' is specified then jobs of all states will be reported. + If no state is specified then pending, running, and completing jobs are reported. + See the JOB STATE CODES section below for a list of valid states. + Both extended and compact forms are valid. + Note the supplied is case insensitive ('pending' and 'PENDING' are equivalent). + """, + ) + parser.add_argument( + "-w", + "--nodelist", + dest="nodelist", + nargs="+", + help="""Report only on jobs allocated to the specified node or list of nodes. + This may either be the NodeName or NodeHostname as defined in slurm.conf(5) in the event that they differ. + A node_name of localhost is mapped to the current host name. + """, + ) + parser.add_argument( + "-e", + dest="extended", + action="store_true", + help="Add the execution nodes, stdoutput file and stderror file to the table.", + ) + self.args = parser.parse_args() + return parser + + def get_jobs( + self, job_ids: Optional[list[int]] = None, args: Optional[list[str]] = None + ): + """Call squeue to obtain the jobs, it uses the json output.""" + squeue_args = ["squeue", "--json"] + if args: + squeue_args.extend(list(map(str, args))) + if job_ids: + squeue_args.extend(["--jobs", ",".join(list(map(str, job_ids)))]) + try: + squeue_output = check_output(squeue_args, universal_newlines=True) + return json.loads(squeue_output).get("jobs", []) + except CalledProcessError as ex: + self.error_console.log( + f"squeue call failed. Arguments: {' '.join(squeue_args)}. Error {ex.output}" + ) + raise ex + + def convert_unix_timestamp(self, timestamp: Optional[int]) -> str: + if timestamp is None: + return f"Invalid timestamp. {timestamp}" + try: + dt_object = datetime.datetime.utcfromtimestamp(timestamp) + return str(dt_object) + except (TypeError, ValueError): + return f"Invalid timestamp. {timestamp}" + + def main(self): + """Main execution point, should contain all the code to handle the Slurm implementation""" + + self.get_args() + + jobs = [] + extra_args = [] + + if self.args.user: + extra_args.extend(["-u", self.args.user]) + if self.args.partition: + extra_args.extend(["-p", self.args.partition]) + for state in self.args.states or []: + extra_args.extend(["-t", state]) + for node in self.args.nodelist or []: + extra_args.extend(["-w", node]) + + try: + status = self.console.status("Getting jobs from Slurm...") + if not self.args.tsv: + status.start() + + jobs = self.get_jobs(self.args.job_id, extra_args) + + if not self.args.tsv: + status.stop() + + except CalledProcessError: + # This is handled by get_jobs + if not self.args.tsv: + status.stop() + sys.exit(1) + except Exception: + if not self.args.tsv: + status.stop() + self.console.print_exception() + + if self.args.filter: + filter_regex = re.compile(self.args.filter) + jobs = list( + filter( + lambda j: filter_regex.search(j["command"]) + or filter_regex.search(j["command"]), + jobs, + ) + ) + + if not jobs: + self.console.print(Text("No jobs.", style="bold white", justify="left")) + sys.exit(0) + + title = f"Slurm jobs for {self.args.user or getpass.getuser()}" + if self.args.partition: + title += f" on partition {self.args.partition}" + if self.args.nodelist: + title += f" running on hosts {self.args.nodelist}" + + cols = [ + {"header": "JobId", "justify": "right"}, + {"header": "Status"}, + {"header": "JobName", "overflow": "fold"}, + {"header": "User"}, + {"header": "Partition"}, + {"header": "Submit Time"}, + {"header": "Start Time"}, + {"header": "End Time"}, + # {"header": "Pending reason"}, // TODO: implement, I don't know how this looks like in the json + ] + if self.args.extended: + cols.append({"header": "Nodes"}) + cols.append({"header": "StdOut"}) + cols.append({"header": "StdErr"}) + + rows = [] + + for job in sorted(jobs, key=lambda j: j["job_id"]): + job_name = Text(job["name"]) + if self.args.filter: + job_name.highlight_regex(rf"{self.args.filter}", "bold red") + + row = [ + str(job["job_id"]), + self.status_style(job), + job_name, + job["user_name"], + job["partition"], + self.convert_unix_timestamp(job["submit_time"]), + self.convert_unix_timestamp(job["start_time"]), + self.convert_unix_timestamp(job["end_time"]), + # pending_reason, + ] + if self.args.extended: + row.extend( + [ + Text(job["nodes"], overflow="fold"), + job["standard_error"], + job["standard_output"], + ] + ) + + rows.append(row) + + self.render(title=title, columns=cols, rows=rows) diff --git a/pyproject.toml b/pyproject.toml index bbf6588..940bc22 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,9 +1,9 @@ [tool.ruff] -line-length = 110 +line-length = 120 [tool.isort] profile = "black" [tool.back] -line-length = 110 +line-length = 120 experimental-string-processing = true diff --git a/requirements-dev.txt b/requirements-dev.txt index ac2bc8f..3a7d095 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,6 +1,6 @@ -r requirements.txt pyinstaller==5.9.0 -black==23.3.0 -isort==5.12.0 -ruff==0.0.261 -pre-commit==3.1.1 +black==24.2.0 +isort==5.13.2 +ruff==0.2.2 +pre-commit==3.6.2 diff --git a/requirements.txt b/requirements.txt index 1d61bde..944635d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -rich==13.3.3 -rich_argparse==1.1.0 +rich==13.7.0 +rich-argparse==1.4.0