From 95909e58dc7a06e3ad8781e37d7abe61355987da Mon Sep 17 00:00:00 2001 From: Noam Bernstein Date: Tue, 2 Apr 2024 11:26:47 -0400 Subject: [PATCH] Add cli commands to manually create a job record and to manually fail a job --- expyre/cli/cli.py | 73 ++++++++++++++++++++++++++++++++++++++++++-- expyre/func.py | 10 ++++-- expyre/subprocess.py | 10 +++++- expyre/system.py | 10 ++++-- 4 files changed, 95 insertions(+), 8 deletions(-) diff --git a/expyre/cli/cli.py b/expyre/cli/cli.py index 1c7dd2c..79890b1 100644 --- a/expyre/cli/cli.py +++ b/expyre/cli/cli.py @@ -3,6 +3,7 @@ import subprocess import warnings from pathlib import Path +import pickle import numpy as np @@ -116,7 +117,7 @@ def cli_rm(ctx, id, name, status, system, yes, clean): # ask user answer = None while answer not in ['y', 'n']: - answer = input(f'Deleting "{xpr}"\nEnter n to reject or y to accept: ') + answer = input(f'Delete "{xpr}"\nEnter n to reject or y to accept: ') if answer != 'y': sys.stderr.write(f'Not deleting "{xpr.id}"\n') sys.stderr.write('\n') @@ -135,8 +136,11 @@ def cli_rm(ctx, id, name, status, system, yes, clean): @click.option("--name", "-n", help="comma separated list of regexps for entire job name") @click.option("--status", "-s", help="comma separated list of status values to include, or '*' for all", default='ongoing') @click.option("--system", "-S", help="comma separated list of regexps for entire system name") +@click.option("--delete", "-d", is_flag=True, help="delete local files that aren't in remote dir, e.g. if remote job got an " + "error, was synced, but was then manually restarted and finished without an error") +@click.option("--verbose", "-v", is_flag=True, help="verbose output") @click.pass_context -def cli_sync(ctx, id, name, status, system): +def cli_sync(ctx, id, name, status, system, delete, verbose): """Sync remote status and results for jobs fitting criteria (at least one criterion required) """ if status == '*': @@ -147,7 +151,7 @@ def cli_sync(ctx, id, name, status, system): if len(jobs) == 0: warnings.warn(f"sync found no jobs with status {status} to sync") - ExPyRe._sync_remote_results_status_ll(jobs, cli=True) + ExPyRe._sync_remote_results_status_ll(jobs, cli=True, delete=delete, verbose=verbose) @cli.command("db_unlock") @@ -185,3 +189,66 @@ def cli_reset_status(ctx, id, name, status, system, new_status): for job in jobs: config.db.update(job['id'], status=new_status) + + +@cli.command("create_job") +@click.option("--from_dir", "-d", type=click.Path(exists=True, file_okay=False, dir_okay=True, + path_type=Path), + help="local stage directory of job to create", required=True) +@click.option("--system", "-S", help="system of job to create", required=True) +@click.option("--id", "-i", help="id of job to create, deduced from from_dir if not specified") +@click.option("--name", "-n", help="name of job to create, deduced from id if not specified") +@click.pass_context +def cli_reset_status(ctx, from_dir, system, id, name): + """Create a job database entry manually + """ + + if id is None: + id = from_dir.name[4:] + if name is None: + name = id[:-54] + + config.db.add(id=id, name=name, from_dir=str(from_dir.absolute()), status="created", system=system, remote_id="NA", remote_status="unknown") + + +@cli.command("fail_job") +@click.option("--id", "-i", help="comma separated list of regexps for entire job id") +@click.option("--name", "-n", help="comma separated list of regexps for entire job name") +@click.option("--status", "-s", help="comma separated list of status values to include, or '*' for all") +@click.option("--system", "-S", help="comma separated list of regexps for entire system name") +@click.option("--yes", "-y", is_flag=True, help="assume 'yes' for all confirmations, i.e. delete job and stage dirs without asking user") +@click.pass_context +def cli_fail_job(ctx, id, name, status, system, yes): + """Mark a job as failed + """ + if id is None and name is None and status is None and system is None: + sys.stderr.write('At least one selection criterion is required\n\n') + sys.stderr.write(ctx.get_help()+'\n') + sys.exit(1) + + if status == '*': + status = None + + jobs = _get_jobs(id=id, name=name, status=status, system=system) + + if len(jobs) == 0: + warnings.warn(f"sync found no jobs with status {status} to reset") + + for job in jobs: + if not yes: + # ask user + answer = None + while answer not in ['y', 'n']: + answer = input(f'Fail "{job["id"]}"\nEnter n to reject or y to accept: ') + if answer != 'y': + sys.stderr.write(f'Not failing "{job["id"]}"\n') + sys.stderr.write('\n') + continue + + from_dir = Path(job['from_dir']) + + with open(from_dir / "_expyre_job_error", "w") as fout: + fout.write("xpr fail_job\n") + with open(from_dir / "_expyre_job_exception", "wb") as fout: + pickle.dump(RuntimeError("xpr fail_job"), fout) + (from_dir / "_expyre_job_succeeded").unlink(missing_ok=True) diff --git a/expyre/func.py b/expyre/func.py index 82fb0dc..54a40ed 100644 --- a/expyre/func.py +++ b/expyre/func.py @@ -500,7 +500,7 @@ def sync_remote_results_status(self, sync_all=True, force_sync=False, verbose=Fa @classmethod - def _sync_remote_results_status_ll(cls, jobs_to_sync, n_group=250, cli=False, verbose=False): + def _sync_remote_results_status_ll(cls, jobs_to_sync, n_group=250, cli=False, delete=False, verbose=False): """Low level part of syncing jobs. Gets remote files _and_ updates 'remote_status' field in jobsdb. Note that both have to happen because other functions assume that if remote status has been updated files have been staged back as well. @@ -513,6 +513,12 @@ class for classmethod (unused) list of job dicts returned by jobsdb.jobs() n_group: int, default 250 number of jobs to do in a group with each rsync call + cli: bool, default False + command is being from from cli 'xpr sync' + delete: bool, default False + delete local files that are not in remote dir + verbose: bool, default False + verbose output """ if len(jobs_to_sync) == 0: return @@ -547,7 +553,7 @@ def _grouper(n, iterable): # update status, showing job as done (despite missing files) for job_group in _grouper(n_group, jobs_to_sync): system.get_remotes(stage_root, subdir_glob=[Path(j['from_dir']).name for j in job_group], - verbose=verbose) + delete=delete, verbose=verbose) def clean(self, wipe=False, dry_run=False, remote_only=False, verbose=False): diff --git a/expyre/subprocess.py b/expyre/subprocess.py index 06aa7f6..36c8be9 100644 --- a/expyre/subprocess.py +++ b/expyre/subprocess.py @@ -172,7 +172,8 @@ def subprocess_run(host, args, script=None, shell='bash -c', remsh_cmd=None, ret def subprocess_copy(from_files, to_file, from_host='_LOCAL_', to_host='_LOCAL_', - rcp_args='-a', rcp_cmd='rsync', remsh_cmd=None, retry=None, remsh_flags='-e', verbose=False, dry_run=False): + rcp_args='-a', rcp_cmd='rsync', remsh_cmd=None, retry=None, remsh_flags='-e', + delete=False, verbose=False, dry_run=False): """Run a remote copy (e.g. rsync) in a subprocess, optionally to/from remote machine. Exactly one machine has to be specified, and relative paths on that machine are relative to its home dir, like rsync. If the specified machine is None the copy is local, but relative paths are still relative to @@ -202,8 +203,12 @@ def subprocess_copy(from_files, to_file, from_host='_LOCAL_', to_host='_LOCAL_', passed as retry argument to subprocess_run remsh_flags: str, default '-e' flag to prefix to remsh_cmd when calling rcp_cmd + delete: bool, default False + delete target files that aren't in source with --delete option verbose: bool, default False verbose output + dry_run: bool, default False + dry run, don't actually copy """ # exactly one of from_host, to_host must be provided if from_host != '_LOCAL_' and to_host != '_LOCAL_': @@ -213,6 +218,9 @@ def subprocess_copy(from_files, to_file, from_host='_LOCAL_', to_host='_LOCAL_', remsh_cmd = os.environ.get('EXPYRE_RSH', 'ssh') rcp_args = remsh_flags + ' ' + remsh_cmd + ' ' + rcp_args + if delete: + rcp_args += ' --delete' + # make from_files plain str or Path into list if isinstance(from_files, str) or isinstance(from_files, Path): from_files = [from_files] diff --git a/expyre/system.py b/expyre/system.py index 2370cae..d09d7e8 100644 --- a/expyre/system.py +++ b/expyre/system.py @@ -182,13 +182,19 @@ def submit(self, id, stage_dir, resources, commands, header_extra=[], exact_fit= return r - def get_remotes(self, local_dir, subdir_glob=None, verbose=False): + def get_remotes(self, local_dir, subdir_glob=None, delete=False, verbose=False): """get data from directories of remotely running jobs Parameters ---------- + local_dir: str + local directory to stage to subdir_glob: str, list(str), default None only get subdirectories that much one or more globs + delete: bool, default False + delete local files that aren't in remote dir + verbose: bool, default False + verbose output """ if self.remote_rundir is None: # nothing to "get" since this ran in stage dir @@ -204,7 +210,7 @@ def get_remotes(self, local_dir, subdir_glob=None, verbose=False): subdir_glob = '/{' + ','.join(subdir_glob) + '}' subprocess_copy(self.remote_rundir + subdir_glob, local_dir, from_host=self.host, - remsh_cmd=self.remsh_cmd, verbose=verbose) + remsh_cmd=self.remsh_cmd, delete=delete, verbose=verbose)