diff --git a/bin/desi_use_reservation b/bin/desi_use_reservation new file mode 100755 index 000000000..eb874b304 --- /dev/null +++ b/bin/desi_use_reservation @@ -0,0 +1,202 @@ +#!/usr/bin/env python + +""" +Utility for moving jobs from the regular queue into a reservation, with limits +""" + +import os, sys +import numpy as np +import subprocess +import json +import time + +from desispec.workflow.queue import get_jobs_in_queue +from desiutil.log import get_logger + +def get_reservation_info(name): + """ + Return dictionary of reservation info from "scontrol show res NAME" + """ + log = get_logger() + + cmd = f"scontrol show res {name} --json" + log.info(f'Getting reservation info with: {cmd}') + resinfo = json.loads(subprocess.check_output(cmd.split())) + + if len(resinfo['reservations']) == 0: + log.critical(f'reservation {reservation} not found') + sys.exit(1) + elif len(resinfo['reservations']) > 1: + log.critical(f'"{cmd}" returned info for more than one reservation') + sys.exit(2) + + resinfo = resinfo['reservations'][0] + assert resinfo['name'] == name + + return resinfo + +def use_reservation(name=None, resinfo=None, extra_nodes=0, dry_run=False): + """ + Move jobs from regular queue into a reservation + + Options: + name (str): name of the reservation + resinfo (dict): dictionary of reservation info from get_reservation_info + extra_nodes (int): over-fill the reservation by this many jobs + dry_run (bool): if True, print scontrol commands but don't move jobs + + Must provide name or pre-cached resinfo=get_reservation_info(name). + + This will move eligible jobs from the regular queue into the + requested reservation, up to the reservation size + extra_nodes. + It auto-detects CPU vs. GPU reservations, and prioritizes what type + of jobs are most important to move first (e.g. psfnight jobs because + those block other jobs more than a single arc job does). + + It does not move jobs that are still waiting on dependencies so that + they don't fill up a spot in the reservation without being able to run. + """ + + ## NOTE: + ## "scontrol show res kibo26_cpu --json" returns a "partition" that + ## seems to match the "PARTITION" of "squeue -u desi -o '%i,%P,%v,%j,%u,%t,%M,%D,%R'" + ## for jobs that are in the regular queue, but jobs that are in the reservation have + ## and squeue reported "PARTITION" of "resv", not the parition of the reservation... + + log = get_logger() + + if resinfo is None: + resinfo = get_reservation_info(name) + + if name is None: + name = resinfo['name'] + + ressize = resinfo['node_count'] + + #- job types for CPU and GPU, in the order they should be considered for the reservation + cpujobs = ['linkcal', 'ccdcalib', 'psfnight', 'arc'] + gpujobs = ['nightlyflat', 'flat', 'ztile', 'tilenight', 'zpix'] + + #- which regular queue partition is eligible for this reservation? + regular_partition = resinfo['partition'] + + #- Determine CPU vs. GPU reservation + #- NOTE: some NERSC Perlmutter-specific hardcoding here + if resinfo['partition'].startswith('gpu'): + restype = 'GPU' + jobtypes = gpujobs + elif resinfo['partition'].startswith('regular'): + restype = 'CPU' + jobtypes = cpujobs + else: + log.critical(f'Unrecognized reservation type partition={resinfo["partition"]}') + sys.exit(3) + + #- Get currently running jobs and filter to those that are eligible for this reservation + jobs = get_jobs_in_queue() + + jobs_in_reservation = jobs[jobs['RESERVATION'] == name] + + eligible_for_reservation = jobs['RESERVATION'] == 'null' + eligible_for_reservation &= jobs['PARTITION'] == regular_partition + + #- TBD: only move jobs that are currently eligible to run, not those waiting for dependencies + eligible_for_reservation &= jobs['ST'] == 'PD' + eligible_for_reservation &= jobs['NODELISTREASON'] != 'Dependency' + + jobs_eligible = jobs[eligible_for_reservation] + + njobs_in_reservation = len(jobs_in_reservation) + njobnodes_in_reservation = np.sum(jobs_in_reservation['NODES']) + + njobs_eligible = len(jobs_eligible) + njobnodes_eligible = np.sum(jobs_eligible['NODES']) + + njobnodes_to_add = max(0, ressize + extra_nodes - njobnodes_in_reservation) + njobnodes_to_add = min(njobnodes_to_add, njobnodes_eligible) + + log.info(f'At {time.asctime()}, {name} ({ressize} nodes) has {njobs_in_reservation} jobs using {njobnodes_in_reservation} nodes') + log.info(f'{njobs_eligible} {restype} jobs using {njobnodes_eligible} nodes are eligible to be moved into the reservation') + + if njobnodes_to_add == 0: + log.info('Reservation full, no need to add more jobs at this time') + return + + log.info(f'Adding jobs to use {njobnodes_to_add} additional nodes') + jobnodes_added = 0 + jobids = list() + for jobtype in jobtypes: + ii = np.char.startswith(jobs_eligible['NAME'], jobtype) + for row in jobs_eligible[ii]: + jobname = row['NAME'] + log.info(f'Move {jobname} to {name}') + jobnodes_added += row['NODES'] + jobids.append(row['JOBID']) + + if jobnodes_added >= njobnodes_to_add: + break + + if jobnodes_added >= njobnodes_to_add: + break + + if len(jobids) > 0: + if dry_run: + log.info('Dry run mode; will print what to do but not actually run the commands') + else: + log.info('Running scontrol commands') + + #- Update queue in batches of batch_size jobs + batch_size = 10 + for i in range(0, len(jobids), batch_size): + jobids_csv = ','.join([str(x) for x in jobids[i:i+batch_size]]) + cmd = f'scontrol update ReservationName={name} JobID={jobids_csv}' + if dry_run: + #- Purposefully print, not log, to make it easier to cut-and-paste + print(cmd) + else: + log.info(cmd) + subprocess.run(cmd.split()) + +#-------------------------------------------------------------------- + +def main(): + import argparse + import datetime + p = argparse.ArgumentParser() + p.add_argument('-r', '--reservation', required=True, help="batch reservation name") + p.add_argument('-n', '--extra-nodes', type=int, default=0, + help="Add jobs for this number of additional nodes beyond reservation size") + p.add_argument('--sleep', type=int, help="Sleep this number of minutes between checks") + p.add_argument('--until', type=str, help="Keep running until this YEAR-MM-DDThh:mm(:ss) time") + p.add_argument('--dry-run', action="store_true", help="Print what to do, but don't actually move jobs to reservation") + args = p.parse_args() + + log = get_logger() + + if args.until is not None: + datetime_until = datetime.datetime.fromisoformat(args.until) + log.info(f'Will keep checking until {args.until}') + + #- arg.until implies sleeping in a loop, so make sure that is set + if args.sleep is None: + args.sleep = 5 + else: + datetime_until = None + + + #- Cache reservation information so that we don't have to look it up every loop + resinfo = get_reservation_info(args.reservation) + + #- Move jobs into the reservation; optionally sleep, repeat + while (datetime_until is None) or datetime.datetime.now() < datetime_until: + use_reservation(resinfo=resinfo, extra_nodes=args.extra_nodes, dry_run=args.dry_run) + if args.sleep is None: + break + else: + log.info(f'Sleeping {args.sleep} minutes before next check') + time.sleep(60*args.sleep) + + log.info(f'Done checking at {time.asctime()}') + +if __name__ == '__main__': + main() diff --git a/py/desispec/workflow/queue.py b/py/desispec/workflow/queue.py index 95e500726..a51796fb0 100644 --- a/py/desispec/workflow/queue.py +++ b/py/desispec/workflow/queue.py @@ -498,7 +498,7 @@ def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0): Returns ------- Table - Table with the columns JOBID, PARTITION, NAME, USER, ST, TIME, NODES, + Table with the columns JOBID, PARTITION, RESERVATION, NAME, USER, ST, TIME, NODES, NODELIST(REASON) for the specified user. """ log = get_logger() @@ -508,29 +508,29 @@ def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0): else: user = 'desi' - cmd = f'squeue -u {user} -o "%i,%P,%j,%u,%t,%M,%D,%R"' + cmd = f'squeue -u {user} -o "%i,%P,%v,%j,%u,%t,%M,%D,%R"' cmd_as_list = cmd.split() if dry_run_level > 0: log.info("Dry run, would have otherwise queried Slurm with the" +f" following: {' '.join(cmd_as_list)}") - string = 'JOBID,PARTITION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)' - string += f"27650097,cron,scron_ar,{user},PD,0:00,1,(BeginTime)" - string += f"27650100,cron,scron_nh,{user},PD,0:00,1,(BeginTime)" - string += f"27650098,cron,scron_up,{user},PD,0:00,1,(BeginTime)" - string += f"29078887,gpu_ss11,tilenight-20230413-24315,{user},PD,0:00,1,(Priority)" - string += f"29078892,gpu_ss11,tilenight-20230413-21158,{user},PD,0:00,1,(Priority)" - string += f"29079325,gpu_ss11,tilenight-20240309-24526,{user},PD,0:00,1,(Dependency)" - string += f"29079322,gpu_ss11,ztile-22959-thru20240309,{user},PD,0:00,1,(Dependency)" - string += f"29078883,gpu_ss11,tilenight-20230413-21187,{user},R,10:18,1,nid003960" - string += f"29079242,regular_milan_ss11,arc-20240309-00229483-a0123456789,{user},PD,0:00,3,(Priority)" - string += f"29079246,regular_milan_ss11,arc-20240309-00229484-a0123456789,{user},PD,0:00,3,(Priority)" + string = 'JOBID,PARTITION,RESERVATION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)' + string += f"27650097,cron,(null),scron_ar,{user},PD,0:00,1,(BeginTime)" + string += f"27650100,cron,(null),scron_nh,{user},PD,0:00,1,(BeginTime)" + string += f"27650098,cron,(null),scron_up,{user},PD,0:00,1,(BeginTime)" + string += f"29078887,gpu_ss11,(null),tilenight-20230413-24315,{user},PD,0:00,1,(Priority)" + string += f"29078892,gpu_ss11,(null),tilenight-20230413-21158,{user},PD,0:00,1,(Priority)" + string += f"29079325,gpu_ss11,(null),tilenight-20240309-24526,{user},PD,0:00,1,(Dependency)" + string += f"29079322,gpu_ss11,(null),ztile-22959-thru20240309,{user},PD,0:00,1,(Dependency)" + string += f"29078883,gpu_ss11,(null),tilenight-20230413-21187,{user},R,10:18,1,nid003960" + string += f"29079242,regular_milan_ss11,(null),arc-20240309-00229483-a0123456789,{user},PD,0:00,3,(Priority)" + string += f"29079246,regular_milan_ss11,(null),arc-20240309-00229484-a0123456789,{user},PD,0:00,3,(Priority)" # create command to run to exercise subprocess -> stdout parsing cmd = 'echo ' + string cmd_as_list = ['echo', string] else: - log.info(f"Querying Slurm with the following: {' '.join(cmd_as_list)}") + log.info(f"Querying jobs in queue with: {' '.join(cmd_as_list)}") #- sacct sometimes fails; try several times before giving up max_attempts = 3