Skip to content

Commit

Permalink
Merge pull request #30 from jtracker-io/exec-restart
Browse files Browse the repository at this point in the history
tested locally
  • Loading branch information
junjun-zhang authored Jul 18, 2018
2 parents 17e8834 + dd59111 commit 0fc2d83
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 6 deletions.
8 changes: 6 additions & 2 deletions jtracker/cli/exec/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
@click.option('-j', '--job-file', type=click.Path(exists=True), help='Execute local job file')
@click.option('-w', '--workflow-name', help='Specify registered workflow name in format: [{owner}/]{workflow}:{ver}')
@click.option('-c', '--continuous-run', is_flag=True, help='Keep executor running even job queue is empty')
@click.option('-f', '--force-restart', is_flag=True, help='Force executor restart, set previous running jobs to cancelled')
@click.option('-r', '--resume-job', is_flag=True, help='Force executor restart, set previous running jobs to resume')
@click.pass_context
def run(ctx, job_file, job_id, queue_id,
def run(ctx, job_file, job_id, queue_id, force_restart, resume_job,
workflow_name, parallel_jobs, max_jobs, min_disk, parallel_workers, continuous_run):
"""
Launch JTracker executor
Expand All @@ -33,7 +35,9 @@ def run(ctx, job_file, job_id, queue_id,
max_jobs=max_jobs,
min_disk=min_disk * 1000000000,
parallel_workers=parallel_workers,
continuous_run=continuous_run
continuous_run=continuous_run,
force_restart=force_restart,
resume_job=resume_job
)
except Exception as e:
click.echo(str(e))
Expand Down
39 changes: 35 additions & 4 deletions jtracker/execution/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def __init__(self, jt_home=None, jt_account=None,
job_file=None, # when job_file is provided, it's local mode, no tracking from the server side
job_id=None, # can optionally specify which job to run, not applicable when job_file specified
min_disk=None, # minimally require disk space (in bytes) for launching task execution
parallel_jobs=1, parallel_workers=1, sleep_interval=5, max_jobs=0, continuous_run=False):
parallel_jobs=1, parallel_workers=1, sleep_interval=5, max_jobs=0, continuous_run=False,
force_restart=False, resume_job=False):

self._killer = GracefulKiller()

Expand All @@ -66,6 +67,8 @@ def __init__(self, jt_home=None, jt_account=None,
self._sleep_interval = sleep_interval
self._ran_jobs = 0
self._continuous_run = continuous_run
self._force_restart = force_restart
self._resume_job = resume_job

self._running_jobs = []
self._worker_processes = {}
Expand Down Expand Up @@ -109,7 +112,8 @@ def __init__(self, jt_home=None, jt_account=None,
# init executor dir
self._init_executor_dir()

# TODO: check whether previous executor session exists, restore it unless user chose not to (via options)
# clean up any jobs left in `running` state on the server
self._clean_up_running_jobs()

click.echo("Executor: %s started." % self.id)

Expand Down Expand Up @@ -195,6 +199,14 @@ def ran_jobs(self):
def continuous_run(self):
return self._continuous_run

@property
def force_restart(self):
return self._force_restart

@property
def resume_job(self):
return self._resume_job

@property
def worker_processes(self):
return self._worker_processes
Expand Down Expand Up @@ -450,10 +462,29 @@ def _init_executor_dir(self):
os.open(os.path.join(self.executor_dir, '_state.running'), flags)
except OSError as e:
if e.errno == errno.EEXIST: # Exit as the executor is running.
click.echo('The executor: %s for queue: %s is running on this node: %s already, not start another one!'
% (self.id, self.queue_id, self.node_id))
if not (self.force_restart or self.resume_job):
click.echo('The executor: %s for queue: %s is running on this node: %s already, not start executor without -f or -r option.'
% (self.id, self.queue_id, self.node_id))
sys.exit(1)
else:
click.echo('Unable to start executor, write permission is needed in JTHome')
sys.exit(1)

def _clean_up_running_jobs(self):
server_running_jobs = self.scheduler.running_jobs()
if server_running_jobs:
if not (self.resume_job or self.force_restart):
click.echo('Server reports running jobs by the executor on this compute node, not start executor without -f or -r option.')
sys.exit(1)

for j in server_running_jobs:
if self.resume_job:
click.echo('Set previous running job: %s to resume' % j.get('id'))
self.scheduler.resume_job(j.get('id'))
elif self.force_restart:
click.echo('Cancel previous running job: %s' % j.get('id'))
self.scheduler.cancel_job(j.get('id'))

def _enough_disk(self):
statvfs = os.statvfs(self.executor_dir)

Expand Down
18 changes: 18 additions & 0 deletions jtracker/execution/scheduler/jess.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,21 @@ def suspend_job(self, job_id=None):
raise JessNotAvailable('JESS service temporarily unavailable')

print('Job: %s suspended' % job_id)


def resume_job(self, job_id=None):
# call JESS endpoint: /jobs/owner/{owner_name}/queue/{queue_id}/job/{job_id}/action
request_body = {
'action': 'resume',
'executor_id': self.executor_id
}

request_url = "%s/jobs/owner/%s/queue/%s/job/%s/action" % (self.jess_server.strip('/'),
self.jt_account, self.queue_id, job_id)

try:
r = requests.put(request_url, json=request_body)
except:
raise JessNotAvailable('JESS service temporarily unavailable')

print('Job: %s set to resume' % job_id)

0 comments on commit 0fc2d83

Please sign in to comment.