Skip to content

Commit

Permalink
Merge pull request #37 from libAtoms/cli_create_job
Browse files Browse the repository at this point in the history
xpr cli commands to manually manipulate jobs
  • Loading branch information
bernstei authored Apr 2, 2024
2 parents b9a8c04 + 95909e5 commit 4e5953a
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 8 deletions.
73 changes: 70 additions & 3 deletions expyre/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import subprocess
import warnings
from pathlib import Path
import pickle

import numpy as np

Expand Down Expand Up @@ -116,7 +117,7 @@ def cli_rm(ctx, id, name, status, system, yes, clean):
# ask user
answer = None
while answer not in ['y', 'n']:
answer = input(f'Deleting "{xpr}"\nEnter n to reject or y to accept: ')
answer = input(f'Delete "{xpr}"\nEnter n to reject or y to accept: ')
if answer != 'y':
sys.stderr.write(f'Not deleting "{xpr.id}"\n')
sys.stderr.write('\n')
Expand All @@ -135,8 +136,11 @@ def cli_rm(ctx, id, name, status, system, yes, clean):
@click.option("--name", "-n", help="comma separated list of regexps for entire job name")
@click.option("--status", "-s", help="comma separated list of status values to include, or '*' for all", default='ongoing')
@click.option("--system", "-S", help="comma separated list of regexps for entire system name")
@click.option("--delete", "-d", is_flag=True, help="delete local files that aren't in remote dir, e.g. if remote job got an "
"error, was synced, but was then manually restarted and finished without an error")
@click.option("--verbose", "-v", is_flag=True, help="verbose output")
@click.pass_context
def cli_sync(ctx, id, name, status, system):
def cli_sync(ctx, id, name, status, system, delete, verbose):
"""Sync remote status and results for jobs fitting criteria (at least one criterion required)
"""
if status == '*':
Expand All @@ -147,7 +151,7 @@ def cli_sync(ctx, id, name, status, system):
if len(jobs) == 0:
warnings.warn(f"sync found no jobs with status {status} to sync")

ExPyRe._sync_remote_results_status_ll(jobs, cli=True)
ExPyRe._sync_remote_results_status_ll(jobs, cli=True, delete=delete, verbose=verbose)


@cli.command("db_unlock")
Expand Down Expand Up @@ -185,3 +189,66 @@ def cli_reset_status(ctx, id, name, status, system, new_status):

for job in jobs:
config.db.update(job['id'], status=new_status)


@cli.command("create_job")
@click.option("--from_dir", "-d", type=click.Path(exists=True, file_okay=False, dir_okay=True,
path_type=Path),
help="local stage directory of job to create", required=True)
@click.option("--system", "-S", help="system of job to create", required=True)
@click.option("--id", "-i", help="id of job to create, deduced from from_dir if not specified")
@click.option("--name", "-n", help="name of job to create, deduced from id if not specified")
@click.pass_context
def cli_reset_status(ctx, from_dir, system, id, name):
"""Create a job database entry manually
"""

if id is None:
id = from_dir.name[4:]
if name is None:
name = id[:-54]

config.db.add(id=id, name=name, from_dir=str(from_dir.absolute()), status="created", system=system, remote_id="NA", remote_status="unknown")


@cli.command("fail_job")
@click.option("--id", "-i", help="comma separated list of regexps for entire job id")
@click.option("--name", "-n", help="comma separated list of regexps for entire job name")
@click.option("--status", "-s", help="comma separated list of status values to include, or '*' for all")
@click.option("--system", "-S", help="comma separated list of regexps for entire system name")
@click.option("--yes", "-y", is_flag=True, help="assume 'yes' for all confirmations, i.e. delete job and stage dirs without asking user")
@click.pass_context
def cli_fail_job(ctx, id, name, status, system, yes):
"""Mark a job as failed
"""
if id is None and name is None and status is None and system is None:
sys.stderr.write('At least one selection criterion is required\n\n')
sys.stderr.write(ctx.get_help()+'\n')
sys.exit(1)

if status == '*':
status = None

jobs = _get_jobs(id=id, name=name, status=status, system=system)

if len(jobs) == 0:
warnings.warn(f"sync found no jobs with status {status} to reset")

for job in jobs:
if not yes:
# ask user
answer = None
while answer not in ['y', 'n']:
answer = input(f'Fail "{job["id"]}"\nEnter n to reject or y to accept: ')
if answer != 'y':
sys.stderr.write(f'Not failing "{job["id"]}"\n')
sys.stderr.write('\n')
continue

from_dir = Path(job['from_dir'])

with open(from_dir / "_expyre_job_error", "w") as fout:
fout.write("xpr fail_job\n")
with open(from_dir / "_expyre_job_exception", "wb") as fout:
pickle.dump(RuntimeError("xpr fail_job"), fout)
(from_dir / "_expyre_job_succeeded").unlink(missing_ok=True)
10 changes: 8 additions & 2 deletions expyre/func.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ def sync_remote_results_status(self, sync_all=True, force_sync=False, verbose=Fa


@classmethod
def _sync_remote_results_status_ll(cls, jobs_to_sync, n_group=250, cli=False, verbose=False):
def _sync_remote_results_status_ll(cls, jobs_to_sync, n_group=250, cli=False, delete=False, verbose=False):
"""Low level part of syncing jobs. Gets remote files _and_ updates 'remote_status'
field in jobsdb. Note that both have to happen because other functions assume that
if remote status has been updated files have been staged back as well.
Expand All @@ -513,6 +513,12 @@ class for classmethod (unused)
list of job dicts returned by jobsdb.jobs()
n_group: int, default 250
number of jobs to do in a group with each rsync call
cli: bool, default False
command is being from from cli 'xpr sync'
delete: bool, default False
delete local files that are not in remote dir
verbose: bool, default False
verbose output
"""
if len(jobs_to_sync) == 0:
return
Expand Down Expand Up @@ -547,7 +553,7 @@ def _grouper(n, iterable):
# update status, showing job as done (despite missing files)
for job_group in _grouper(n_group, jobs_to_sync):
system.get_remotes(stage_root, subdir_glob=[Path(j['from_dir']).name for j in job_group],
verbose=verbose)
delete=delete, verbose=verbose)


def clean(self, wipe=False, dry_run=False, remote_only=False, verbose=False):
Expand Down
10 changes: 9 additions & 1 deletion expyre/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ def subprocess_run(host, args, script=None, shell='bash -c', remsh_cmd=None, ret


def subprocess_copy(from_files, to_file, from_host='_LOCAL_', to_host='_LOCAL_',
rcp_args='-a', rcp_cmd='rsync', remsh_cmd=None, retry=None, remsh_flags='-e', verbose=False, dry_run=False):
rcp_args='-a', rcp_cmd='rsync', remsh_cmd=None, retry=None, remsh_flags='-e',
delete=False, verbose=False, dry_run=False):
"""Run a remote copy (e.g. rsync) in a subprocess, optionally to/from remote machine. Exactly one
machine has to be specified, and relative paths on that machine are relative to its home dir, like
rsync. If the specified machine is None the copy is local, but relative paths are still relative to
Expand Down Expand Up @@ -202,8 +203,12 @@ def subprocess_copy(from_files, to_file, from_host='_LOCAL_', to_host='_LOCAL_',
passed as retry argument to subprocess_run
remsh_flags: str, default '-e'
flag to prefix to remsh_cmd when calling rcp_cmd
delete: bool, default False
delete target files that aren't in source with --delete option
verbose: bool, default False
verbose output
dry_run: bool, default False
dry run, don't actually copy
"""
# exactly one of from_host, to_host must be provided
if from_host != '_LOCAL_' and to_host != '_LOCAL_':
Expand All @@ -213,6 +218,9 @@ def subprocess_copy(from_files, to_file, from_host='_LOCAL_', to_host='_LOCAL_',
remsh_cmd = os.environ.get('EXPYRE_RSH', 'ssh')
rcp_args = remsh_flags + ' ' + remsh_cmd + ' ' + rcp_args

if delete:
rcp_args += ' --delete'

# make from_files plain str or Path into list
if isinstance(from_files, str) or isinstance(from_files, Path):
from_files = [from_files]
Expand Down
10 changes: 8 additions & 2 deletions expyre/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,19 @@ def submit(self, id, stage_dir, resources, commands, header_extra=[], exact_fit=
return r


def get_remotes(self, local_dir, subdir_glob=None, verbose=False):
def get_remotes(self, local_dir, subdir_glob=None, delete=False, verbose=False):
"""get data from directories of remotely running jobs
Parameters
----------
local_dir: str
local directory to stage to
subdir_glob: str, list(str), default None
only get subdirectories that much one or more globs
delete: bool, default False
delete local files that aren't in remote dir
verbose: bool, default False
verbose output
"""
if self.remote_rundir is None:
# nothing to "get" since this ran in stage dir
Expand All @@ -204,7 +210,7 @@ def get_remotes(self, local_dir, subdir_glob=None, verbose=False):
subdir_glob = '/{' + ','.join(subdir_glob) + '}'

subprocess_copy(self.remote_rundir + subdir_glob, local_dir, from_host=self.host,
remsh_cmd=self.remsh_cmd, verbose=verbose)
remsh_cmd=self.remsh_cmd, delete=delete, verbose=verbose)



Expand Down

0 comments on commit 4e5953a

Please sign in to comment.