Skip to content

Commit

Permalink
desi_use_reservation for prods
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephen Bailey authored and Stephen Bailey committed Aug 26, 2024
1 parent 019d6fa commit e5d683a
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 14 deletions.
202 changes: 202 additions & 0 deletions bin/desi_use_reservation
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
#!/usr/bin/env python

"""
Utility for moving jobs from the regular queue into a reservation, with limits
"""

import os, sys
import numpy as np
import subprocess
import json
import time

from desispec.workflow.queue import get_jobs_in_queue
from desiutil.log import get_logger

def get_reservation_info(name):
"""
Return dictionary of reservation info from "scontrol show res NAME"
"""
log = get_logger()

cmd = f"scontrol show res {name} --json"
log.info(f'Getting reservation info with: {cmd}')
resinfo = json.loads(subprocess.check_output(cmd.split()))

if len(resinfo['reservations']) == 0:
log.critical(f'reservation {reservation} not found')
sys.exit(1)
elif len(resinfo['reservations']) > 1:
log.critical(f'"{cmd}" returned info for more than one reservation')
sys.exit(2)

resinfo = resinfo['reservations'][0]
assert resinfo['name'] == name

return resinfo

def use_reservation(name=None, resinfo=None, extra_nodes=0, dry_run=False):
"""
Move jobs from regular queue into a reservation
Options:
name (str): name of the reservation
resinfo (dict): dictionary of reservation info from get_reservation_info
extra_nodes (int): over-fill the reservation by this many jobs
dry_run (bool): if True, print scontrol commands but don't move jobs
Must provide name or pre-cached resinfo=get_reservation_info(name).
This will move eligible jobs from the regular queue into the
requested reservation, up to the reservation size + extra_nodes.
It auto-detects CPU vs. GPU reservations, and prioritizes what type
of jobs are most important to move first (e.g. psfnight jobs because
those block other jobs more than a single arc job does).
It does not move jobs that are still waiting on dependencies so that
they don't fill up a spot in the reservation without being able to run.
"""

## NOTE:
## "scontrol show res kibo26_cpu --json" returns a "partition" that
## seems to match the "PARTITION" of "squeue -u desi -o '%i,%P,%v,%j,%u,%t,%M,%D,%R'"
## for jobs that are in the regular queue, but jobs that are in the reservation have
## and squeue reported "PARTITION" of "resv", not the parition of the reservation...

log = get_logger()

if resinfo is None:
resinfo = get_reservation_info(name)

if name is None:
name = resinfo['name']

ressize = resinfo['node_count']

#- job types for CPU and GPU, in the order they should be considered for the reservation
cpujobs = ['linkcal', 'ccdcalib', 'psfnight', 'arc']
gpujobs = ['nightlyflat', 'flat', 'ztile', 'tilenight', 'zpix']

#- which regular queue partition is eligible for this reservation?
regular_partition = resinfo['partition']

#- Determine CPU vs. GPU reservation
#- NOTE: some NERSC Perlmutter-specific hardcoding here
if resinfo['partition'].startswith('gpu'):
restype = 'GPU'
jobtypes = gpujobs
elif resinfo['partition'].startswith('regular'):
restype = 'CPU'
jobtypes = cpujobs
else:
log.critical(f'Unrecognized reservation type partition={resinfo["partition"]}')
sys.exit(3)

#- Get currently running jobs and filter to those that are eligible for this reservation
jobs = get_jobs_in_queue()

jobs_in_reservation = jobs[jobs['RESERVATION'] == name]

eligible_for_reservation = jobs['RESERVATION'] == 'null'
eligible_for_reservation &= jobs['PARTITION'] == regular_partition

#- TBD: only move jobs that are currently eligible to run, not those waiting for dependencies
eligible_for_reservation &= jobs['ST'] == 'PD'
eligible_for_reservation &= jobs['NODELISTREASON'] != 'Dependency'

jobs_eligible = jobs[eligible_for_reservation]

njobs_in_reservation = len(jobs_in_reservation)
njobnodes_in_reservation = np.sum(jobs_in_reservation['NODES'])

njobs_eligible = len(jobs_eligible)
njobnodes_eligible = np.sum(jobs_eligible['NODES'])

njobnodes_to_add = max(0, ressize + extra_nodes - njobnodes_in_reservation)
njobnodes_to_add = min(njobnodes_to_add, njobnodes_eligible)

log.info(f'At {time.asctime()}, {name} ({ressize} nodes) has {njobs_in_reservation} jobs using {njobnodes_in_reservation} nodes')
log.info(f'{njobs_eligible} {restype} jobs using {njobnodes_eligible} nodes are eligible to be moved into the reservation')

if njobnodes_to_add == 0:
log.info('Reservation full, no need to add more jobs at this time')
return

log.info(f'Adding jobs to use {njobnodes_to_add} additional nodes')
jobnodes_added = 0
jobids = list()
for jobtype in jobtypes:
ii = np.char.startswith(jobs_eligible['NAME'], jobtype)
for row in jobs_eligible[ii]:
jobname = row['NAME']
log.info(f'Move {jobname} to {name}')
jobnodes_added += row['NODES']
jobids.append(row['JOBID'])

if jobnodes_added >= njobnodes_to_add:
break

if jobnodes_added >= njobnodes_to_add:
break

if len(jobids) > 0:
if dry_run:
log.info('Dry run mode; will print what to do but not actually run the commands')
else:
log.info('Running scontrol commands')

#- Update queue in batches of batch_size jobs
batch_size = 10
for i in range(0, len(jobids), batch_size):
jobids_csv = ','.join([str(x) for x in jobids[i:i+batch_size]])
cmd = f'scontrol update ReservationName={name} JobID={jobids_csv}'
if dry_run:
#- Purposefully print, not log, to make it easier to cut-and-paste
print(cmd)
else:
log.info(cmd)
subprocess.run(cmd.split())

#--------------------------------------------------------------------

def main():
import argparse
import datetime
p = argparse.ArgumentParser()
p.add_argument('-r', '--reservation', required=True, help="batch reservation name")
p.add_argument('-n', '--extra-nodes', type=int, default=0,
help="Add jobs for this number of additional nodes beyond reservation size")
p.add_argument('--sleep', type=int, help="Sleep this number of minutes between checks")
p.add_argument('--until', type=str, help="Keep running until this YEAR-MM-DDThh:mm(:ss) time")
p.add_argument('--dry-run', action="store_true", help="Print what to do, but don't actually move jobs to reservation")
args = p.parse_args()

log = get_logger()

if args.until is not None:
datetime_until = datetime.datetime.fromisoformat(args.until)
log.info(f'Will keep checking until {args.until}')

#- arg.until implies sleeping in a loop, so make sure that is set
if args.sleep is None:
args.sleep = 5
else:
datetime_until = None


#- Cache reservation information so that we don't have to look it up every loop
resinfo = get_reservation_info(args.reservation)

#- Move jobs into the reservation; optionally sleep, repeat
while (datetime_until is None) or datetime.datetime.now() < datetime_until:
use_reservation(resinfo=resinfo, extra_nodes=args.extra_nodes, dry_run=args.dry_run)
if args.sleep is None:
break
else:
log.info(f'Sleeping {args.sleep} minutes before next check')
time.sleep(60*args.sleep)

log.info(f'Done checking at {time.asctime()}')

if __name__ == '__main__':
main()
28 changes: 14 additions & 14 deletions py/desispec/workflow/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0):
Returns
-------
Table
Table with the columns JOBID, PARTITION, NAME, USER, ST, TIME, NODES,
Table with the columns JOBID, PARTITION, RESERVATION, NAME, USER, ST, TIME, NODES,
NODELIST(REASON) for the specified user.
"""
log = get_logger()
Expand All @@ -508,29 +508,29 @@ def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0):
else:
user = 'desi'

cmd = f'squeue -u {user} -o "%i,%P,%j,%u,%t,%M,%D,%R"'
cmd = f'squeue -u {user} -o "%i,%P,%v,%j,%u,%t,%M,%D,%R"'
cmd_as_list = cmd.split()

if dry_run_level > 0:
log.info("Dry run, would have otherwise queried Slurm with the"
+f" following: {' '.join(cmd_as_list)}")
string = 'JOBID,PARTITION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)'
string += f"27650097,cron,scron_ar,{user},PD,0:00,1,(BeginTime)"
string += f"27650100,cron,scron_nh,{user},PD,0:00,1,(BeginTime)"
string += f"27650098,cron,scron_up,{user},PD,0:00,1,(BeginTime)"
string += f"29078887,gpu_ss11,tilenight-20230413-24315,{user},PD,0:00,1,(Priority)"
string += f"29078892,gpu_ss11,tilenight-20230413-21158,{user},PD,0:00,1,(Priority)"
string += f"29079325,gpu_ss11,tilenight-20240309-24526,{user},PD,0:00,1,(Dependency)"
string += f"29079322,gpu_ss11,ztile-22959-thru20240309,{user},PD,0:00,1,(Dependency)"
string += f"29078883,gpu_ss11,tilenight-20230413-21187,{user},R,10:18,1,nid003960"
string += f"29079242,regular_milan_ss11,arc-20240309-00229483-a0123456789,{user},PD,0:00,3,(Priority)"
string += f"29079246,regular_milan_ss11,arc-20240309-00229484-a0123456789,{user},PD,0:00,3,(Priority)"
string = 'JOBID,PARTITION,RESERVATION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)'
string += f"27650097,cron,(null),scron_ar,{user},PD,0:00,1,(BeginTime)"
string += f"27650100,cron,(null),scron_nh,{user},PD,0:00,1,(BeginTime)"
string += f"27650098,cron,(null),scron_up,{user},PD,0:00,1,(BeginTime)"
string += f"29078887,gpu_ss11,(null),tilenight-20230413-24315,{user},PD,0:00,1,(Priority)"
string += f"29078892,gpu_ss11,(null),tilenight-20230413-21158,{user},PD,0:00,1,(Priority)"
string += f"29079325,gpu_ss11,(null),tilenight-20240309-24526,{user},PD,0:00,1,(Dependency)"
string += f"29079322,gpu_ss11,(null),ztile-22959-thru20240309,{user},PD,0:00,1,(Dependency)"
string += f"29078883,gpu_ss11,(null),tilenight-20230413-21187,{user},R,10:18,1,nid003960"
string += f"29079242,regular_milan_ss11,(null),arc-20240309-00229483-a0123456789,{user},PD,0:00,3,(Priority)"
string += f"29079246,regular_milan_ss11,(null),arc-20240309-00229484-a0123456789,{user},PD,0:00,3,(Priority)"

# create command to run to exercise subprocess -> stdout parsing
cmd = 'echo ' + string
cmd_as_list = ['echo', string]
else:
log.info(f"Querying Slurm with the following: {' '.join(cmd_as_list)}")
log.info(f"Querying jobs in queue with: {' '.join(cmd_as_list)}")

#- sacct sometimes fails; try several times before giving up
max_attempts = 3
Expand Down

0 comments on commit e5d683a

Please sign in to comment.