From e5d683a82be5110482bd27e95319349d893871b6 Mon Sep 17 00:00:00 2001 From: Stephen Bailey Date: Mon, 26 Aug 2024 16:50:06 -0700 Subject: [PATCH 1/5] desi_use_reservation for prods --- bin/desi_use_reservation | 202 ++++++++++++++++++++++++++++++++++ py/desispec/workflow/queue.py | 28 ++--- 2 files changed, 216 insertions(+), 14 deletions(-) create mode 100755 bin/desi_use_reservation diff --git a/bin/desi_use_reservation b/bin/desi_use_reservation new file mode 100755 index 000000000..eb874b304 --- /dev/null +++ b/bin/desi_use_reservation @@ -0,0 +1,202 @@ +#!/usr/bin/env python + +""" +Utility for moving jobs from the regular queue into a reservation, with limits +""" + +import os, sys +import numpy as np +import subprocess +import json +import time + +from desispec.workflow.queue import get_jobs_in_queue +from desiutil.log import get_logger + +def get_reservation_info(name): + """ + Return dictionary of reservation info from "scontrol show res NAME" + """ + log = get_logger() + + cmd = f"scontrol show res {name} --json" + log.info(f'Getting reservation info with: {cmd}') + resinfo = json.loads(subprocess.check_output(cmd.split())) + + if len(resinfo['reservations']) == 0: + log.critical(f'reservation {reservation} not found') + sys.exit(1) + elif len(resinfo['reservations']) > 1: + log.critical(f'"{cmd}" returned info for more than one reservation') + sys.exit(2) + + resinfo = resinfo['reservations'][0] + assert resinfo['name'] == name + + return resinfo + +def use_reservation(name=None, resinfo=None, extra_nodes=0, dry_run=False): + """ + Move jobs from regular queue into a reservation + + Options: + name (str): name of the reservation + resinfo (dict): dictionary of reservation info from get_reservation_info + extra_nodes (int): over-fill the reservation by this many jobs + dry_run (bool): if True, print scontrol commands but don't move jobs + + Must provide name or pre-cached resinfo=get_reservation_info(name). + + This will move eligible jobs from the regular queue into the + requested reservation, up to the reservation size + extra_nodes. + It auto-detects CPU vs. GPU reservations, and prioritizes what type + of jobs are most important to move first (e.g. psfnight jobs because + those block other jobs more than a single arc job does). + + It does not move jobs that are still waiting on dependencies so that + they don't fill up a spot in the reservation without being able to run. + """ + + ## NOTE: + ## "scontrol show res kibo26_cpu --json" returns a "partition" that + ## seems to match the "PARTITION" of "squeue -u desi -o '%i,%P,%v,%j,%u,%t,%M,%D,%R'" + ## for jobs that are in the regular queue, but jobs that are in the reservation have + ## and squeue reported "PARTITION" of "resv", not the parition of the reservation... + + log = get_logger() + + if resinfo is None: + resinfo = get_reservation_info(name) + + if name is None: + name = resinfo['name'] + + ressize = resinfo['node_count'] + + #- job types for CPU and GPU, in the order they should be considered for the reservation + cpujobs = ['linkcal', 'ccdcalib', 'psfnight', 'arc'] + gpujobs = ['nightlyflat', 'flat', 'ztile', 'tilenight', 'zpix'] + + #- which regular queue partition is eligible for this reservation? + regular_partition = resinfo['partition'] + + #- Determine CPU vs. GPU reservation + #- NOTE: some NERSC Perlmutter-specific hardcoding here + if resinfo['partition'].startswith('gpu'): + restype = 'GPU' + jobtypes = gpujobs + elif resinfo['partition'].startswith('regular'): + restype = 'CPU' + jobtypes = cpujobs + else: + log.critical(f'Unrecognized reservation type partition={resinfo["partition"]}') + sys.exit(3) + + #- Get currently running jobs and filter to those that are eligible for this reservation + jobs = get_jobs_in_queue() + + jobs_in_reservation = jobs[jobs['RESERVATION'] == name] + + eligible_for_reservation = jobs['RESERVATION'] == 'null' + eligible_for_reservation &= jobs['PARTITION'] == regular_partition + + #- TBD: only move jobs that are currently eligible to run, not those waiting for dependencies + eligible_for_reservation &= jobs['ST'] == 'PD' + eligible_for_reservation &= jobs['NODELISTREASON'] != 'Dependency' + + jobs_eligible = jobs[eligible_for_reservation] + + njobs_in_reservation = len(jobs_in_reservation) + njobnodes_in_reservation = np.sum(jobs_in_reservation['NODES']) + + njobs_eligible = len(jobs_eligible) + njobnodes_eligible = np.sum(jobs_eligible['NODES']) + + njobnodes_to_add = max(0, ressize + extra_nodes - njobnodes_in_reservation) + njobnodes_to_add = min(njobnodes_to_add, njobnodes_eligible) + + log.info(f'At {time.asctime()}, {name} ({ressize} nodes) has {njobs_in_reservation} jobs using {njobnodes_in_reservation} nodes') + log.info(f'{njobs_eligible} {restype} jobs using {njobnodes_eligible} nodes are eligible to be moved into the reservation') + + if njobnodes_to_add == 0: + log.info('Reservation full, no need to add more jobs at this time') + return + + log.info(f'Adding jobs to use {njobnodes_to_add} additional nodes') + jobnodes_added = 0 + jobids = list() + for jobtype in jobtypes: + ii = np.char.startswith(jobs_eligible['NAME'], jobtype) + for row in jobs_eligible[ii]: + jobname = row['NAME'] + log.info(f'Move {jobname} to {name}') + jobnodes_added += row['NODES'] + jobids.append(row['JOBID']) + + if jobnodes_added >= njobnodes_to_add: + break + + if jobnodes_added >= njobnodes_to_add: + break + + if len(jobids) > 0: + if dry_run: + log.info('Dry run mode; will print what to do but not actually run the commands') + else: + log.info('Running scontrol commands') + + #- Update queue in batches of batch_size jobs + batch_size = 10 + for i in range(0, len(jobids), batch_size): + jobids_csv = ','.join([str(x) for x in jobids[i:i+batch_size]]) + cmd = f'scontrol update ReservationName={name} JobID={jobids_csv}' + if dry_run: + #- Purposefully print, not log, to make it easier to cut-and-paste + print(cmd) + else: + log.info(cmd) + subprocess.run(cmd.split()) + +#-------------------------------------------------------------------- + +def main(): + import argparse + import datetime + p = argparse.ArgumentParser() + p.add_argument('-r', '--reservation', required=True, help="batch reservation name") + p.add_argument('-n', '--extra-nodes', type=int, default=0, + help="Add jobs for this number of additional nodes beyond reservation size") + p.add_argument('--sleep', type=int, help="Sleep this number of minutes between checks") + p.add_argument('--until', type=str, help="Keep running until this YEAR-MM-DDThh:mm(:ss) time") + p.add_argument('--dry-run', action="store_true", help="Print what to do, but don't actually move jobs to reservation") + args = p.parse_args() + + log = get_logger() + + if args.until is not None: + datetime_until = datetime.datetime.fromisoformat(args.until) + log.info(f'Will keep checking until {args.until}') + + #- arg.until implies sleeping in a loop, so make sure that is set + if args.sleep is None: + args.sleep = 5 + else: + datetime_until = None + + + #- Cache reservation information so that we don't have to look it up every loop + resinfo = get_reservation_info(args.reservation) + + #- Move jobs into the reservation; optionally sleep, repeat + while (datetime_until is None) or datetime.datetime.now() < datetime_until: + use_reservation(resinfo=resinfo, extra_nodes=args.extra_nodes, dry_run=args.dry_run) + if args.sleep is None: + break + else: + log.info(f'Sleeping {args.sleep} minutes before next check') + time.sleep(60*args.sleep) + + log.info(f'Done checking at {time.asctime()}') + +if __name__ == '__main__': + main() diff --git a/py/desispec/workflow/queue.py b/py/desispec/workflow/queue.py index 95e500726..a51796fb0 100644 --- a/py/desispec/workflow/queue.py +++ b/py/desispec/workflow/queue.py @@ -498,7 +498,7 @@ def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0): Returns ------- Table - Table with the columns JOBID, PARTITION, NAME, USER, ST, TIME, NODES, + Table with the columns JOBID, PARTITION, RESERVATION, NAME, USER, ST, TIME, NODES, NODELIST(REASON) for the specified user. """ log = get_logger() @@ -508,29 +508,29 @@ def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0): else: user = 'desi' - cmd = f'squeue -u {user} -o "%i,%P,%j,%u,%t,%M,%D,%R"' + cmd = f'squeue -u {user} -o "%i,%P,%v,%j,%u,%t,%M,%D,%R"' cmd_as_list = cmd.split() if dry_run_level > 0: log.info("Dry run, would have otherwise queried Slurm with the" +f" following: {' '.join(cmd_as_list)}") - string = 'JOBID,PARTITION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)' - string += f"27650097,cron,scron_ar,{user},PD,0:00,1,(BeginTime)" - string += f"27650100,cron,scron_nh,{user},PD,0:00,1,(BeginTime)" - string += f"27650098,cron,scron_up,{user},PD,0:00,1,(BeginTime)" - string += f"29078887,gpu_ss11,tilenight-20230413-24315,{user},PD,0:00,1,(Priority)" - string += f"29078892,gpu_ss11,tilenight-20230413-21158,{user},PD,0:00,1,(Priority)" - string += f"29079325,gpu_ss11,tilenight-20240309-24526,{user},PD,0:00,1,(Dependency)" - string += f"29079322,gpu_ss11,ztile-22959-thru20240309,{user},PD,0:00,1,(Dependency)" - string += f"29078883,gpu_ss11,tilenight-20230413-21187,{user},R,10:18,1,nid003960" - string += f"29079242,regular_milan_ss11,arc-20240309-00229483-a0123456789,{user},PD,0:00,3,(Priority)" - string += f"29079246,regular_milan_ss11,arc-20240309-00229484-a0123456789,{user},PD,0:00,3,(Priority)" + string = 'JOBID,PARTITION,RESERVATION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)' + string += f"27650097,cron,(null),scron_ar,{user},PD,0:00,1,(BeginTime)" + string += f"27650100,cron,(null),scron_nh,{user},PD,0:00,1,(BeginTime)" + string += f"27650098,cron,(null),scron_up,{user},PD,0:00,1,(BeginTime)" + string += f"29078887,gpu_ss11,(null),tilenight-20230413-24315,{user},PD,0:00,1,(Priority)" + string += f"29078892,gpu_ss11,(null),tilenight-20230413-21158,{user},PD,0:00,1,(Priority)" + string += f"29079325,gpu_ss11,(null),tilenight-20240309-24526,{user},PD,0:00,1,(Dependency)" + string += f"29079322,gpu_ss11,(null),ztile-22959-thru20240309,{user},PD,0:00,1,(Dependency)" + string += f"29078883,gpu_ss11,(null),tilenight-20230413-21187,{user},R,10:18,1,nid003960" + string += f"29079242,regular_milan_ss11,(null),arc-20240309-00229483-a0123456789,{user},PD,0:00,3,(Priority)" + string += f"29079246,regular_milan_ss11,(null),arc-20240309-00229484-a0123456789,{user},PD,0:00,3,(Priority)" # create command to run to exercise subprocess -> stdout parsing cmd = 'echo ' + string cmd_as_list = ['echo', string] else: - log.info(f"Querying Slurm with the following: {' '.join(cmd_as_list)}") + log.info(f"Querying jobs in queue with: {' '.join(cmd_as_list)}") #- sacct sometimes fails; try several times before giving up max_attempts = 3 From 95d079b2d0f1c0396f875346c0926ba133979434 Mon Sep 17 00:00:00 2001 From: Stephen Bailey Date: Mon, 26 Aug 2024 16:57:03 -0700 Subject: [PATCH 2/5] irony: crash-inducing typo in log message for different crash condition --- bin/desi_use_reservation | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/desi_use_reservation b/bin/desi_use_reservation index eb874b304..8bcd4d007 100755 --- a/bin/desi_use_reservation +++ b/bin/desi_use_reservation @@ -24,7 +24,7 @@ def get_reservation_info(name): resinfo = json.loads(subprocess.check_output(cmd.split())) if len(resinfo['reservations']) == 0: - log.critical(f'reservation {reservation} not found') + log.critical(f'reservation {name} not found') sys.exit(1) elif len(resinfo['reservations']) > 1: log.critical(f'"{cmd}" returned info for more than one reservation') From 20529f539177a706785a3ed475866d4d617ff078 Mon Sep 17 00:00:00 2001 From: Stephen Bailey Date: Tue, 27 Aug 2024 08:56:57 -0700 Subject: [PATCH 3/5] sort to prioritize earlier nights --- bin/desi_use_reservation | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/bin/desi_use_reservation b/bin/desi_use_reservation index 8bcd4d007..b4cb475da 100755 --- a/bin/desi_use_reservation +++ b/bin/desi_use_reservation @@ -95,23 +95,32 @@ def use_reservation(name=None, resinfo=None, extra_nodes=0, dry_run=False): #- Get currently running jobs and filter to those that are eligible for this reservation jobs = get_jobs_in_queue() + #- Sort by name so that earlier nights are prioritized over later nights + jobs.sort('NAME') + + #- Filter which jobs are in reservation vs. eligible to move into reservation jobs_in_reservation = jobs[jobs['RESERVATION'] == name] eligible_for_reservation = jobs['RESERVATION'] == 'null' eligible_for_reservation &= jobs['PARTITION'] == regular_partition - #- TBD: only move jobs that are currently eligible to run, not those waiting for dependencies + #- Only move jobs that are currently eligible to run, not those waiting for dependencies + #- so that we don't fill reservation with jobs that can't run eligible_for_reservation &= jobs['ST'] == 'PD' eligible_for_reservation &= jobs['NODELISTREASON'] != 'Dependency' - jobs_eligible = jobs[eligible_for_reservation] + #- Counting jobs and nodes in and out of the reservation njobs_in_reservation = len(jobs_in_reservation) njobnodes_in_reservation = np.sum(jobs_in_reservation['NODES']) njobs_eligible = len(jobs_eligible) njobnodes_eligible = np.sum(jobs_eligible['NODES']) + if njobs_eligible == 0: + log.info('No available jobs to add') + return + njobnodes_to_add = max(0, ressize + extra_nodes - njobnodes_in_reservation) njobnodes_to_add = min(njobnodes_to_add, njobnodes_eligible) From a8b0479d3b411c65dca1c0e906384d67edc8c98e Mon Sep 17 00:00:00 2001 From: Stephen Bailey Date: Tue, 27 Aug 2024 14:24:12 -0700 Subject: [PATCH 4/5] limit flat vs. tilenight,ztile imbalance --- bin/desi_use_reservation | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/bin/desi_use_reservation b/bin/desi_use_reservation index b4cb475da..d8bff094d 100755 --- a/bin/desi_use_reservation +++ b/bin/desi_use_reservation @@ -110,6 +110,22 @@ def use_reservation(name=None, resinfo=None, extra_nodes=0, dry_run=False): eligible_for_reservation &= jobs['NODELISTREASON'] != 'Dependency' jobs_eligible = jobs[eligible_for_reservation] + #- if there are 20x more tilenight or ztile jobs eligible than flats, + #- move them to second priority after fiberflatnight + num_eligible_tilenight = np.sum(np.char.startswith(jobs_eligible['NAME'], 'tilenight')) + num_eligible_ztile = np.sum(np.char.startswith(jobs_eligible['NAME'], 'ztile')) + num_eligible_flat = np.sum(np.char.startswith(jobs_eligible['NAME'], 'flat')) + + if num_eligible_tilenight > 0 and num_eligible_flat > 0 and num_eligible_tilenight > 20*num_eligible_flat: + log.info(f'{num_eligible_tilenight} tilenight jobs >> {num_eligible_flat} flat jobs; prioritizing tilenight') + jobtypes.remove('tilenight') + jobtypes.insert(1, 'tilenight') + + if num_eligible_ztile > 0 and num_eligible_flat > 0 and num_eligible_ztile > 20*num_eligible_flat: + log.info(f'{num_eligible_ztile} ztile jobs >> {num_eligible_flat} flat jobs; prioritizing ztile') + jobtypes.remove('ztile') + jobtypes.insert(1, 'ztile') + #- Counting jobs and nodes in and out of the reservation njobs_in_reservation = len(jobs_in_reservation) njobnodes_in_reservation = np.sum(jobs_in_reservation['NODES']) @@ -117,16 +133,16 @@ def use_reservation(name=None, resinfo=None, extra_nodes=0, dry_run=False): njobs_eligible = len(jobs_eligible) njobnodes_eligible = np.sum(jobs_eligible['NODES']) - if njobs_eligible == 0: - log.info('No available jobs to add') - return - njobnodes_to_add = max(0, ressize + extra_nodes - njobnodes_in_reservation) njobnodes_to_add = min(njobnodes_to_add, njobnodes_eligible) log.info(f'At {time.asctime()}, {name} ({ressize} nodes) has {njobs_in_reservation} jobs using {njobnodes_in_reservation} nodes') log.info(f'{njobs_eligible} {restype} jobs using {njobnodes_eligible} nodes are eligible to be moved into the reservation') + if njobs_eligible == 0: + log.info('No available jobs to add') + return + if njobnodes_to_add == 0: log.info('Reservation full, no need to add more jobs at this time') return From 61c000e69385b77ec9e087b33c16a7179b6e48ba Mon Sep 17 00:00:00 2001 From: Stephen Bailey Date: Tue, 27 Aug 2024 16:05:58 -0700 Subject: [PATCH 5/5] robustness checks --- bin/desi_use_reservation | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/bin/desi_use_reservation b/bin/desi_use_reservation index d8bff094d..5ca421b36 100755 --- a/bin/desi_use_reservation +++ b/bin/desi_use_reservation @@ -65,6 +65,11 @@ def use_reservation(name=None, resinfo=None, extra_nodes=0, dry_run=False): log = get_logger() + if resinfo is None and name is None: + msg = 'Must provide either name or resinfo' + log.critical(msg) + raise ValueError(msg) + if resinfo is None: resinfo = get_reservation_info(name) @@ -180,7 +185,11 @@ def use_reservation(name=None, resinfo=None, extra_nodes=0, dry_run=False): print(cmd) else: log.info(cmd) - subprocess.run(cmd.split()) + try: + subprocess.run(cmd.split(), check=True) + except subprocess.CalledProcessError as err: + log.error(str(err)) + log.warning('Continuing anyway') #--------------------------------------------------------------------