Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

desi_use_reservation for prods #2346

Merged
merged 5 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions bin/desi_use_reservation
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
#!/usr/bin/env python

"""
Utility for moving jobs from the regular queue into a reservation, with limits
"""

import os, sys
import numpy as np
import subprocess
import json
import time

from desispec.workflow.queue import get_jobs_in_queue
from desiutil.log import get_logger

def get_reservation_info(name):
"""
Return dictionary of reservation info from "scontrol show res NAME"
"""
log = get_logger()

cmd = f"scontrol show res {name} --json"
log.info(f'Getting reservation info with: {cmd}')
resinfo = json.loads(subprocess.check_output(cmd.split()))

if len(resinfo['reservations']) == 0:
log.critical(f'reservation {name} not found')
sys.exit(1)
elif len(resinfo['reservations']) > 1:
log.critical(f'"{cmd}" returned info for more than one reservation')
sys.exit(2)

resinfo = resinfo['reservations'][0]
assert resinfo['name'] == name

return resinfo

def use_reservation(name=None, resinfo=None, extra_nodes=0, dry_run=False):
"""
Move jobs from regular queue into a reservation

Options:
name (str): name of the reservation
resinfo (dict): dictionary of reservation info from get_reservation_info
extra_nodes (int): over-fill the reservation by this many jobs
dry_run (bool): if True, print scontrol commands but don't move jobs

Must provide name or pre-cached resinfo=get_reservation_info(name).

This will move eligible jobs from the regular queue into the
requested reservation, up to the reservation size + extra_nodes.
It auto-detects CPU vs. GPU reservations, and prioritizes what type
of jobs are most important to move first (e.g. psfnight jobs because
those block other jobs more than a single arc job does).

It does not move jobs that are still waiting on dependencies so that
they don't fill up a spot in the reservation without being able to run.
"""

## NOTE:
## "scontrol show res kibo26_cpu --json" returns a "partition" that
## seems to match the "PARTITION" of "squeue -u desi -o '%i,%P,%v,%j,%u,%t,%M,%D,%R'"
## for jobs that are in the regular queue, but jobs that are in the reservation have
## and squeue reported "PARTITION" of "resv", not the parition of the reservation...

log = get_logger()

if resinfo is None:
akremin marked this conversation as resolved.
Show resolved Hide resolved
resinfo = get_reservation_info(name)

if name is None:
name = resinfo['name']

ressize = resinfo['node_count']

#- job types for CPU and GPU, in the order they should be considered for the reservation
cpujobs = ['linkcal', 'ccdcalib', 'psfnight', 'arc']
gpujobs = ['nightlyflat', 'flat', 'ztile', 'tilenight', 'zpix']

#- which regular queue partition is eligible for this reservation?
regular_partition = resinfo['partition']

#- Determine CPU vs. GPU reservation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in person, this should be improved in the future to "future proof" it for later systems. For now it works fine on Perlmutter and we don't yet know how things may change for NERSC 10, so this is fine to leave as-is for now.

#- NOTE: some NERSC Perlmutter-specific hardcoding here
if resinfo['partition'].startswith('gpu'):
restype = 'GPU'
jobtypes = gpujobs
elif resinfo['partition'].startswith('regular'):
restype = 'CPU'
jobtypes = cpujobs
else:
log.critical(f'Unrecognized reservation type partition={resinfo["partition"]}')
sys.exit(3)

#- Get currently running jobs and filter to those that are eligible for this reservation
jobs = get_jobs_in_queue()

#- Sort by name so that earlier nights are prioritized over later nights
jobs.sort('NAME')

#- Filter which jobs are in reservation vs. eligible to move into reservation
jobs_in_reservation = jobs[jobs['RESERVATION'] == name]

eligible_for_reservation = jobs['RESERVATION'] == 'null'
eligible_for_reservation &= jobs['PARTITION'] == regular_partition

#- Only move jobs that are currently eligible to run, not those waiting for dependencies
#- so that we don't fill reservation with jobs that can't run
eligible_for_reservation &= jobs['ST'] == 'PD'
eligible_for_reservation &= jobs['NODELISTREASON'] != 'Dependency'
jobs_eligible = jobs[eligible_for_reservation]

#- Counting jobs and nodes in and out of the reservation
njobs_in_reservation = len(jobs_in_reservation)
njobnodes_in_reservation = np.sum(jobs_in_reservation['NODES'])

njobs_eligible = len(jobs_eligible)
njobnodes_eligible = np.sum(jobs_eligible['NODES'])

if njobs_eligible == 0:
log.info('No available jobs to add')
return

njobnodes_to_add = max(0, ressize + extra_nodes - njobnodes_in_reservation)
njobnodes_to_add = min(njobnodes_to_add, njobnodes_eligible)

log.info(f'At {time.asctime()}, {name} ({ressize} nodes) has {njobs_in_reservation} jobs using {njobnodes_in_reservation} nodes')
log.info(f'{njobs_eligible} {restype} jobs using {njobnodes_eligible} nodes are eligible to be moved into the reservation')

if njobnodes_to_add == 0:
log.info('Reservation full, no need to add more jobs at this time')
return

log.info(f'Adding jobs to use {njobnodes_to_add} additional nodes')
jobnodes_added = 0
jobids = list()
for jobtype in jobtypes:
ii = np.char.startswith(jobs_eligible['NAME'], jobtype)
for row in jobs_eligible[ii]:
jobname = row['NAME']
log.info(f'Move {jobname} to {name}')
jobnodes_added += row['NODES']
jobids.append(row['JOBID'])

if jobnodes_added >= njobnodes_to_add:
break

if jobnodes_added >= njobnodes_to_add:
break

if len(jobids) > 0:
if dry_run:
log.info('Dry run mode; will print what to do but not actually run the commands')
else:
log.info('Running scontrol commands')

#- Update queue in batches of batch_size jobs
batch_size = 10
for i in range(0, len(jobids), batch_size):
jobids_csv = ','.join([str(x) for x in jobids[i:i+batch_size]])
cmd = f'scontrol update ReservationName={name} JobID={jobids_csv}'
if dry_run:
#- Purposefully print, not log, to make it easier to cut-and-paste
print(cmd)
else:
log.info(cmd)
subprocess.run(cmd.split())
akremin marked this conversation as resolved.
Show resolved Hide resolved

#--------------------------------------------------------------------

def main():
import argparse
import datetime
p = argparse.ArgumentParser()
p.add_argument('-r', '--reservation', required=True, help="batch reservation name")
p.add_argument('-n', '--extra-nodes', type=int, default=0,
help="Add jobs for this number of additional nodes beyond reservation size")
p.add_argument('--sleep', type=int, help="Sleep this number of minutes between checks")
p.add_argument('--until', type=str, help="Keep running until this YEAR-MM-DDThh:mm(:ss) time")
p.add_argument('--dry-run', action="store_true", help="Print what to do, but don't actually move jobs to reservation")
args = p.parse_args()

log = get_logger()

if args.until is not None:
datetime_until = datetime.datetime.fromisoformat(args.until)
log.info(f'Will keep checking until {args.until}')

#- arg.until implies sleeping in a loop, so make sure that is set
if args.sleep is None:
args.sleep = 5
else:
datetime_until = None


#- Cache reservation information so that we don't have to look it up every loop
resinfo = get_reservation_info(args.reservation)

#- Move jobs into the reservation; optionally sleep, repeat
while (datetime_until is None) or datetime.datetime.now() < datetime_until:
use_reservation(resinfo=resinfo, extra_nodes=args.extra_nodes, dry_run=args.dry_run)
if args.sleep is None:
break
else:
log.info(f'Sleeping {args.sleep} minutes before next check')
time.sleep(60*args.sleep)

log.info(f'Done checking at {time.asctime()}')

if __name__ == '__main__':
main()
28 changes: 14 additions & 14 deletions py/desispec/workflow/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0):
Returns
-------
Table
Table with the columns JOBID, PARTITION, NAME, USER, ST, TIME, NODES,
Table with the columns JOBID, PARTITION, RESERVATION, NAME, USER, ST, TIME, NODES,
NODELIST(REASON) for the specified user.
"""
log = get_logger()
Expand All @@ -508,29 +508,29 @@ def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0):
else:
user = 'desi'

cmd = f'squeue -u {user} -o "%i,%P,%j,%u,%t,%M,%D,%R"'
cmd = f'squeue -u {user} -o "%i,%P,%v,%j,%u,%t,%M,%D,%R"'
cmd_as_list = cmd.split()

if dry_run_level > 0:
log.info("Dry run, would have otherwise queried Slurm with the"
+f" following: {' '.join(cmd_as_list)}")
string = 'JOBID,PARTITION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)'
string += f"27650097,cron,scron_ar,{user},PD,0:00,1,(BeginTime)"
string += f"27650100,cron,scron_nh,{user},PD,0:00,1,(BeginTime)"
string += f"27650098,cron,scron_up,{user},PD,0:00,1,(BeginTime)"
string += f"29078887,gpu_ss11,tilenight-20230413-24315,{user},PD,0:00,1,(Priority)"
string += f"29078892,gpu_ss11,tilenight-20230413-21158,{user},PD,0:00,1,(Priority)"
string += f"29079325,gpu_ss11,tilenight-20240309-24526,{user},PD,0:00,1,(Dependency)"
string += f"29079322,gpu_ss11,ztile-22959-thru20240309,{user},PD,0:00,1,(Dependency)"
string += f"29078883,gpu_ss11,tilenight-20230413-21187,{user},R,10:18,1,nid003960"
string += f"29079242,regular_milan_ss11,arc-20240309-00229483-a0123456789,{user},PD,0:00,3,(Priority)"
string += f"29079246,regular_milan_ss11,arc-20240309-00229484-a0123456789,{user},PD,0:00,3,(Priority)"
string = 'JOBID,PARTITION,RESERVATION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)'
string += f"27650097,cron,(null),scron_ar,{user},PD,0:00,1,(BeginTime)"
string += f"27650100,cron,(null),scron_nh,{user},PD,0:00,1,(BeginTime)"
string += f"27650098,cron,(null),scron_up,{user},PD,0:00,1,(BeginTime)"
string += f"29078887,gpu_ss11,(null),tilenight-20230413-24315,{user},PD,0:00,1,(Priority)"
string += f"29078892,gpu_ss11,(null),tilenight-20230413-21158,{user},PD,0:00,1,(Priority)"
string += f"29079325,gpu_ss11,(null),tilenight-20240309-24526,{user},PD,0:00,1,(Dependency)"
string += f"29079322,gpu_ss11,(null),ztile-22959-thru20240309,{user},PD,0:00,1,(Dependency)"
string += f"29078883,gpu_ss11,(null),tilenight-20230413-21187,{user},R,10:18,1,nid003960"
string += f"29079242,regular_milan_ss11,(null),arc-20240309-00229483-a0123456789,{user},PD,0:00,3,(Priority)"
string += f"29079246,regular_milan_ss11,(null),arc-20240309-00229484-a0123456789,{user},PD,0:00,3,(Priority)"

# create command to run to exercise subprocess -> stdout parsing
cmd = 'echo ' + string
cmd_as_list = ['echo', string]
else:
log.info(f"Querying Slurm with the following: {' '.join(cmd_as_list)}")
log.info(f"Querying jobs in queue with: {' '.join(cmd_as_list)}")

#- sacct sometimes fails; try several times before giving up
max_attempts = 3
Expand Down
Loading