Skip to content

Commit

Permalink
Improve error handling in tasks & slurm
Browse files Browse the repository at this point in the history
touilleMan committed Oct 2, 2024
1 parent 91ce3c9 commit dfac664
Showing 5 changed files with 41 additions and 10 deletions.
13 changes: 8 additions & 5 deletions bin/queuer.py
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ def main():
if argv[1] == 'pendings' and len(argv) == 2:
count = pending_jobs_count()
print(count)
raise SystemExit(1 if count == 0 else 0)
raise SystemExit(0)
elif argv[1] == 'info':
if len(argv) == 2:
data = pending_jobs_info()
@@ -80,15 +80,18 @@ def main():
if argv[1] == 'submit':
job_id = submit_job(task, participation_id)
print('Submitted job %s' % job_id)
return
raise SystemExit(0)
else:
return context(task)(participation_id)
context(task)(participation_id)
raise SystemExit(0)
elif argv[1] == 'consume' and len(argv) == 3:
if argv[2] == 'next_job':
return context(queuer.execute_next_job)()
context(queuer.execute_next_job)()
raise SystemExit(0)
else:
job_id = ObjectId(argv[2])
return context(queuer.execute_job)(job_id)
context(queuer.execute_job)(job_id)
raise SystemExit(0)
raise SystemExit(USAGE)


16 changes: 16 additions & 0 deletions in2p3/slurm/check_queue.sh
Original file line number Diff line number Diff line change
@@ -47,6 +47,12 @@ print(len([l for l in out.splitlines()[1:] if l.strip()]))
for i in `seq 120`
do
PENDINGS=`python $VIGIECHIRO_DIR/vigiechiro-api/bin/queuer.py pendings`
if ( [ $? -ne 0 ] )
then
printf "[$(date)] command `python $VIGIECHIRO_DIR/vigiechiro-api/bin/queuer.py pendings` has failed, exiting\n"
exit 1
fi

SCHEDULED_WORKERS=`get_scheduled_workers`
NEEDED_TO_START=$(($PENDINGS - $SCHEDULED_WORKERS))
if ( [ "$NEEDED_TO_START" -gt 0 ] )
@@ -59,6 +65,11 @@ do
pushd $LOG_FOLDER
# --job-name=$WORKER_JOB_NAME seems broken, hence we must set this by using envvar
SBATCH_JOB_NAME=$WORKER_JOB_NAME sbatch $WORKER_JOB_OPTIONS $WORKER_SCRIPT_PATH
if ( [ $? -ne 0 ] )
then
printf "[$(date)] command `sbatch $WORKER_JOB_OPTIONS $WORKER_SCRIPT_PATH` has failed, exiting\n"
exit 1
fi
popd
else
printf "[$(date)] no pending jobs\n"
@@ -86,3 +97,8 @@ SBATCH_JOB_NAME=$JOB_NAME sbatch \
--constraint el9 \
--cpus-per-task=1 \
$VIGIECHIRO_DIR/slurm/check_queue.sh
if ( [ $? -ne 0 ] )
then
printf "[$(date)] Auto-restart has failed :(\n"
exit 1
fi
5 changes: 5 additions & 0 deletions in2p3/slurm/worker.sh
Original file line number Diff line number Diff line change
@@ -24,3 +24,8 @@ COND_INIT_PATH=$MINICONDA3_DIR/etc/profile.d/conda.sh
. $VIGIECHIRO_DIR/init.env

python $VIGIECHIRO_DIR/vigiechiro-api/bin/queuer.py consume next_job
if ( [ $? -ne 0 ] )
then
printf "[$(date)] command `python $VIGIECHIRO_DIR/vigiechiro-api/bin/queuer.py consume next_job` has failed\n"
exit 1
fi
5 changes: 5 additions & 0 deletions in2p3/start_slurm_check_queue.sh
Original file line number Diff line number Diff line change
@@ -21,5 +21,10 @@ SBATCH_JOB_NAME=$JOB_NAME sbatch \
--constraint el9 \
--cpus-per-task=1 \
$VIGIECHIRO_DIR/slurm/check_queue.sh
if ( [ $? -ne 0 ] )
then
printf "[$(date)] Command `sbatch [...] $VIGIECHIRO_DIR/slurm/check_queue.sh` has failed :(\n"
exit 1
fi

popd
12 changes: 7 additions & 5 deletions vigiechiro/scripts/task_participation.py
Original file line number Diff line number Diff line change
@@ -259,10 +259,10 @@ def participation_generate_bilan(participation_id):
json={'bilan': bilan.generate_payload()}, auth=AUTH,
timeout=REQUESTS_TIMEOUT)
if r.status_code != 200:
logger.error('Cannot update bilan for participation {}, error {} : {}'.format(
participation_id, r.status_code, r.text))
return 1
return 0
raise RuntimeError(
'Cannot update bilan for participation {}, error {} : {}'.format(
participation_id, r.status_code, r.text)
)


def extract_zipped_files_in_participation(participation):
@@ -402,11 +402,13 @@ def process_participation(participation_id, extra_pjs_ids=[], publique=True,
traitement['retry'] = retry_count + 1
traitement['date_fin'] = datetime.utcnow()
traitement['message'] = msg
p_resource.update(participation_id, {'traitement': traitement}, auto_abort=False)
else:
traitement['etat'] = 'ERREUR'
traitement['date_fin'] = datetime.utcnow()
traitement['message'] = msg
p_resource.update(participation_id, {'traitement': traitement}, auto_abort=False)
p_resource.update(participation_id, {'traitement': traitement}, auto_abort=False)
raise
else:
traitement['etat'] = 'FINI'
traitement['date_fin'] = datetime.utcnow()

0 comments on commit dfac664

Please sign in to comment.