-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimulate.py
159 lines (143 loc) · 4.11 KB
/
simulate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import click
import logging
from pipeline import Pipeline
import sys
import os
from timeit import default_timer as timer
from datetime import timedelta
logger = logging.getLogger(__name__)
@click.command()
@click.option(
"--counts_path",
help="chromosome counts file path",
type=click.Path(exists=True, file_okay=True, readable=True),
required=True,
)
@click.option(
"--tree_path",
help="path to the tree file",
type=click.Path(exists=True, file_okay=True, readable=True),
required=True,
)
@click.option(
"--work_dir",
help="directory to create the chromevol input in",
type=click.Path(exists=False),
required=True,
)
@click.option(
"--simulations_dir",
help="directory to create the simulations in",
type=click.Path(exists=False),
required=True,
)
@click.option(
"--log_path",
help="path to log file of the script",
type=click.Path(exists=False),
required=True,
)
@click.option(
"--parallel",
help="indicator weather to run the pipeline in parallel (1) with one idle parent job or sequentially",
type=bool,
required=False,
default=False,
)
@click.option(
"--ram_per_job",
help="memory size per job to parallelize on",
type=int,
required=False,
default=1,
)
@click.option(
"--queue",
help="queue to submit jobs to",
type=str,
required=False,
default="itaym",
)
@click.option(
"--max_parallel_jobs",
help="maximal jobs to submit at the same time from the parent process",
type=int,
required=False,
default=1000,
)
@click.option(
"--simulations_num",
help="number of datasets to simulate",
type=int,
required=False,
default=100,
)
@click.option(
"--trials_num",
help="number of datasets to attempt to simulate",
type=int,
required=False,
default=1000,
)
@click.option(
"--use_model_selection",
help="indicator of weather model selection should be used",
type=bool,
required=False,
default=True,
)
def simulate(
counts_path: str,
tree_path: str,
work_dir: str,
simulations_dir: str,
log_path: str,
parallel: bool,
ram_per_job: int,
queue: str,
max_parallel_jobs: int,
simulations_num: int,
trials_num: int,
use_model_selection: bool,
):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s module: %(module)s function: %(funcName)s line %(lineno)d: %(message)s",
handlers=[logging.StreamHandler(sys.stdout), logging.FileHandler(log_path)],
force=True, # run over root logger settings to enable simultaneous writing to both stdout and file handler
)
res = os.system(f"dos2unix {counts_path}")
res = os.system(f"dos2unix {tree_path}")
start_time = timer()
os.makedirs(work_dir, exist_ok=True)
pipeline = Pipeline(
work_dir=work_dir, parallel=parallel, ram_per_job=ram_per_job, queue=queue, max_parallel_jobs=max_parallel_jobs
)
logger.info(f"selecting the best chromevol model")
model_selection_dir = f"{work_dir}/model_selection/"
if os.path.exists(model_selection_dir):
for model_name in os.listdir(model_selection_dir):
out_path = f"{model_selection_dir}{model_name}/parsed_output.json"
if os.path.exists(out_path):
os.remove(out_path)
model_to_weight = pipeline.get_model_weights(
counts_path=counts_path,
tree_path=tree_path,
use_model_selection=use_model_selection,
)
best_model_results_path = list(model_to_weight.keys())[0]
os.makedirs(simulations_dir, exist_ok=True)
logger.info(f"simulating data based on selected model = {best_model_results_path}")
simulations_dirs = pipeline.get_simulations(
simulations_dir=simulations_dir,
orig_counts_path=counts_path,
tree_path=tree_path,
model_parameters_path=best_model_results_path,
simulations_num=simulations_num,
trials_num=trials_num,
)
print(f"simulations were generated in {simulations_dir}")
end_time = timer()
logger.info(f"duration = {timedelta(seconds=end_time-start_time)}")
if __name__ == "__main__":
simulate()