forked from kandrosov/RunKit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcrabLaw.py
96 lines (83 loc) · 2.97 KB
/
crabLaw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import json
import law
import luigi
import os
import shutil
import tempfile
import threading
from .law_customizations import HTCondorWorkflow
from .crabTask import Task as CrabTask
from .crabTaskStatus import Status
from .sh_tools import sh_call
cond = threading.Condition()
def update_kinit(verbose=0):
if shutil.which('kinit'):
sh_call(['kinit', '-R'], expected_return_codes=None, verbose=verbose)
def update_kinit_thread():
timeout = 60.0 * 60 # 1 hour
cond.acquire()
while not cond.wait(timeout):
update_kinit(verbose=1)
cond.release()
class ProdTask(HTCondorWorkflow, law.LocalWorkflow):
work_area = luigi.Parameter()
def local_path(self, *path):
return os.path.join(self.htcondor_output_directory().path, *path)
def workflow_requires(self):
return {}
def requires(self):
return {}
def law_job_home(self):
if 'LAW_JOB_HOME' in os.environ:
return os.environ['LAW_JOB_HOME'], False
os.makedirs(self.local_path(), exist_ok=True)
return tempfile.mkdtemp(dir=self.local_path()), True
def create_branch_map(self):
task_list_path = os.path.join(self.work_area, 'tasks.json')
with open(task_list_path, 'r') as f:
task_names = json.load(f)
branches = {}
job_id = 0
for task_name in task_names:
task = CrabTask.Load(mainWorkArea=self.work_area, taskName=task_name)
if task.taskStatus.status in [ Status.CrabFinished, Status.PostProcessingFinished ]:
branches[job_id] = (task.workArea, -1, task.getPostProcessingDoneFlagFile())
job_id += 1
else:
for grid_job_id in task.getGridJobs():
branches[job_id] = (task.workArea, grid_job_id, task.getGridJobDoneFlagFile(grid_job_id))
job_id += 1
return branches
def output(self):
work_area, grid_job_id, done_flag = self.branch_data
return law.LocalFileTarget(done_flag)
def run(self):
thread = threading.Thread(target=update_kinit_thread)
thread.start()
try:
work_area, grid_job_id, done_flag = self.branch_data
task = CrabTask.Load(workArea=work_area)
if grid_job_id == -1:
if task.taskStatus.status in [ Status.CrabFinished, Status.PostProcessingFinished ]:
if task.taskStatus.status == Status.CrabFinished:
print(f'Post-processing {task.name}')
task.postProcessOutputs()
self.output().touch()
else:
raise RuntimeError(f"task {task.name} is not ready for post-processing")
else:
print(f'Running {task.name} job_id = {grid_job_id}')
job_home, remove_job_home = self.law_job_home()
result = task.runJobLocally(grid_job_id, job_home)
state_str = 'finished' if result else 'failed'
if remove_job_home:
shutil.rmtree(job_home)
with self.output().open('w') as output:
output.write(state_str)
finally:
cond.acquire()
cond.notify_all()
cond.release()
thread.join()
def poll_callback(self, poll_data):
update_kinit(verbose=0)