|
4 | 4 | import os
|
5 | 5 | import shutil
|
6 | 6 | import tempfile
|
| 7 | +import threading |
7 | 8 |
|
8 | 9 | from .law_customizations import HTCondorWorkflow
|
9 | 10 | from .crabTask import Task as CrabTask
|
10 | 11 | from .crabTaskStatus import Status
|
| 12 | +from .sh_tools import sh_call |
| 13 | + |
| 14 | +cond = threading.Condition() |
| 15 | + |
| 16 | +def update_kinit(verbose=0): |
| 17 | + if shutil.which('kinit'): |
| 18 | + sh_call(['kinit', '-R'], expected_return_codes=None, verbose=verbose) |
| 19 | + |
| 20 | +def update_kinit_thread(): |
| 21 | + timeout = 60.0 * 60 # 1 hour |
| 22 | + cond.acquire() |
| 23 | + while not cond.wait(timeout): |
| 24 | + update_kinit(verbose=1) |
| 25 | + cond.release() |
11 | 26 |
|
12 | 27 | class ProdTask(HTCondorWorkflow, law.LocalWorkflow):
|
13 | 28 | work_area = luigi.Parameter()
|
@@ -49,23 +64,33 @@ def output(self):
|
49 | 64 | return law.LocalFileTarget(done_flag)
|
50 | 65 |
|
51 | 66 | def run(self):
|
52 |
| - work_area, grid_job_id, done_flag = self.branch_data |
53 |
| - task = CrabTask.Load(workArea=work_area) |
54 |
| - if grid_job_id == -1: |
55 |
| - if task.taskStatus.status in [ Status.CrabFinished, Status.PostProcessingFinished ]: |
56 |
| - if task.taskStatus.status == Status.CrabFinished: |
57 |
| - print(f'Post-processing {task.name}') |
58 |
| - task.postProcessOutputs() |
59 |
| - self.output().touch() |
| 67 | + thread = threading.Thread(target=update_kinit_thread) |
| 68 | + thread.start() |
| 69 | + try: |
| 70 | + work_area, grid_job_id, done_flag = self.branch_data |
| 71 | + task = CrabTask.Load(workArea=work_area) |
| 72 | + if grid_job_id == -1: |
| 73 | + if task.taskStatus.status in [ Status.CrabFinished, Status.PostProcessingFinished ]: |
| 74 | + if task.taskStatus.status == Status.CrabFinished: |
| 75 | + print(f'Post-processing {task.name}') |
| 76 | + task.postProcessOutputs() |
| 77 | + self.output().touch() |
| 78 | + else: |
| 79 | + raise RuntimeError(f"task {task.name} is not ready for post-processing") |
60 | 80 | else:
|
61 |
| - raise RuntimeError(f"task {task.name} is not ready for post-processing") |
62 |
| - else: |
63 |
| - print(f'Running {task.name} job_id = {grid_job_id}') |
64 |
| - job_home, remove_job_home = self.law_job_home() |
65 |
| - result = task.runJobLocally(grid_job_id, job_home) |
66 |
| - state_str = 'finished' if result else 'failed' |
67 |
| - if remove_job_home: |
68 |
| - shutil.rmtree(job_home) |
69 |
| - with self.output().open('w') as output: |
70 |
| - output.write(state_str) |
| 81 | + print(f'Running {task.name} job_id = {grid_job_id}') |
| 82 | + job_home, remove_job_home = self.law_job_home() |
| 83 | + result = task.runJobLocally(grid_job_id, job_home) |
| 84 | + state_str = 'finished' if result else 'failed' |
| 85 | + if remove_job_home: |
| 86 | + shutil.rmtree(job_home) |
| 87 | + with self.output().open('w') as output: |
| 88 | + output.write(state_str) |
| 89 | + finally: |
| 90 | + cond.acquire() |
| 91 | + cond.notify_all() |
| 92 | + cond.release() |
| 93 | + thread.join() |
71 | 94 |
|
| 95 | + def poll_callback(self, poll_data): |
| 96 | + update_kinit(verbose=0) |
0 commit comments