From cc532e46a54c7b012c1d2c248ef69bba73a6661c Mon Sep 17 00:00:00 2001 From: Konstantin Date: Tue, 6 Jun 2023 14:59:57 +0200 Subject: [PATCH] crabLaw.py: call kinit -R every hour useful for jobs that last more than 24h --- crabLaw.py | 61 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/crabLaw.py b/crabLaw.py index c35c122..0da8974 100644 --- a/crabLaw.py +++ b/crabLaw.py @@ -4,10 +4,25 @@ import os import shutil import tempfile +import threading from .law_customizations import HTCondorWorkflow from .crabTask import Task as CrabTask from .crabTaskStatus import Status +from .sh_tools import sh_call + +cond = threading.Condition() + +def update_kinit(verbose=0): + if shutil.which('kinit'): + sh_call(['kinit', '-R'], expected_return_codes=None, verbose=verbose) + +def update_kinit_thread(): + timeout = 60.0 * 60 # 1 hour + cond.acquire() + while not cond.wait(timeout): + update_kinit(verbose=1) + cond.release() class ProdTask(HTCondorWorkflow, law.LocalWorkflow): work_area = luigi.Parameter() @@ -49,23 +64,33 @@ def output(self): return law.LocalFileTarget(done_flag) def run(self): - work_area, grid_job_id, done_flag = self.branch_data - task = CrabTask.Load(workArea=work_area) - if grid_job_id == -1: - if task.taskStatus.status in [ Status.CrabFinished, Status.PostProcessingFinished ]: - if task.taskStatus.status == Status.CrabFinished: - print(f'Post-processing {task.name}') - task.postProcessOutputs() - self.output().touch() + thread = threading.Thread(target=update_kinit_thread) + thread.start() + try: + work_area, grid_job_id, done_flag = self.branch_data + task = CrabTask.Load(workArea=work_area) + if grid_job_id == -1: + if task.taskStatus.status in [ Status.CrabFinished, Status.PostProcessingFinished ]: + if task.taskStatus.status == Status.CrabFinished: + print(f'Post-processing {task.name}') + task.postProcessOutputs() + self.output().touch() + else: + raise RuntimeError(f"task {task.name} is not ready for post-processing") else: - raise RuntimeError(f"task {task.name} is not ready for post-processing") - else: - print(f'Running {task.name} job_id = {grid_job_id}') - job_home, remove_job_home = self.law_job_home() - result = task.runJobLocally(grid_job_id, job_home) - state_str = 'finished' if result else 'failed' - if remove_job_home: - shutil.rmtree(job_home) - with self.output().open('w') as output: - output.write(state_str) + print(f'Running {task.name} job_id = {grid_job_id}') + job_home, remove_job_home = self.law_job_home() + result = task.runJobLocally(grid_job_id, job_home) + state_str = 'finished' if result else 'failed' + if remove_job_home: + shutil.rmtree(job_home) + with self.output().open('w') as output: + output.write(state_str) + finally: + cond.acquire() + cond.notify_all() + cond.release() + thread.join() + def poll_callback(self, poll_data): + update_kinit(verbose=0)