-
Notifications
You must be signed in to change notification settings - Fork 0
/
snakefile
413 lines (303 loc) · 18 KB
/
snakefile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# conda activate mspipeline1
# snakemake --profile profiles/local
# snakemake --profile profiles/slurm-sigma2-saga
__author__ = "Arturo Vera Ponce De Leon, Ove Øyås & Carl Mathias Kobel"
__version__ = "v2.0.0"
# changelog:
# v2.0.0: Using Arturos pipeline, resulting in the first version that actually works.
import glob
import pandas as pd
import re
import pathlib
print("/* ") # Helps with outputting to dot format.
print(" ______________ ")
print(" < MS-pipeline1 > ")
print(" -------------- ")
print(" \\ ")
print(" ___......__ _ \\ ")
print(" _.-' ~-_ _.=a~~-_ ")
print(" --=====-.-.-_----------~ .--. _ -.__.-~ ( ___===> ")
print(" '''--...__ ( \\ \\\\\\ { ) _.-~ ")
print(" =_ ~_ \\\\-~~~//~~~~-=-~ ")
print(" |-=-~_ \\\\ \\\\ ")
print(" |_/ =. ) ~} ")
print(" |} || ")
print(" // || ")
print(" _// {{ ")
print(" '='~' \\\\_ = ")
print(" ~~' ")
print(" ")
# Read configuration
try:
configfile: "config.yaml"
except WorkflowError as e:
print(e, "Please make sure that the config.yaml file is present and correctly formatted. You can use the config_template.yaml file as a starting point.")
e
config_batch = config["batch"]
config_d_base = config["batch_parameters"][config_batch]["d_base"]
config_database_glob = config["batch_parameters"][config_batch]["database_glob"]
config_database_glob_read = glob.glob(config_database_glob)
config_samples = config["batch_parameters"][config_batch]["samples"]
absolute_output_dir = str(pathlib.Path("output/").absolute())
# Present configuration
print(f" abs out dir is: '{absolute_output_dir}'")
print(f" config_batch: '{config_batch}'")
print(f" config_d_base: '{config_d_base}'")
print(f" config_database_glob: '{config_database_glob}:'")
if len(config_database_glob_read) < 1:
raise Exception("No glob targets in config_database_glob") # Not tested yet.
for i, j in enumerate(config_database_glob_read):
print(f" {i}) {j}")
if i==19: # Only show up till 30 lines, otherwise the screen will become congested.
print(f"and {len(config_database_glob_read)-19} more.. ({len(config_database_glob)} in total)")
break # Do not print more.
print()
# Create a dataframe with all inputs
df = pd.DataFrame(data = {'sample': config_samples.keys(),
'barcode': config_samples.values()})
df["basename"] = [re.sub(".d$", "", barcode) for barcode in df["barcode"]]
print(df)
print("//")
print()
# Count input sizes for managing rule resources
n_genomes_database = len(config_database_glob_read)
n_samples = len(df["basename"])
print(f"n_genomes_database: {n_genomes_database}")
print(f"n_samples: {n_samples}")
#print("manifest:")
manifest = pd.DataFrame(data = {'path': config_samples.values(), 'experiment': config_samples.keys()})
manifest['path'] = absolute_output_dir + "/" + config_batch + "/samples/" + manifest['path'] # Instead of using bash realpath
##manifest["path"] = "output/" + config_batch + "/msfragger/" + manifest["path"] # But then I realized that I might not need to point absolutely anyway..
#manifest["path"] = manifest["path"]
#manifest["experiment"] = "experiment" # Experiment (can be empty, alphanumeric, and _) # IonQuant with MBR requires designating LCMS runs to experiments. If in doubt how to resolve this error, just assign all LCMS runs to the same experiment name.
manifest["bioreplicate"] = "" # Bioreplicate (can be empty and integer)
manifest["data_type"] = "DDA" # Data type (DDA, DIA, GPF-DIA, DIA-Quant, DIA-Lib)
#print(manifest); print("//")
# Define workflow targets
rule all:
input:
metadata = f"output/{config_batch}/metadata.tsv",
copy_input = f"output/{config_batch}/samples/copy_samples.done",
make_database = f"output/{config_batch}/philosopher_database.faa",
fragpipe = f"output/{config_batch}/fragpipe_done.flag",
report = f"output/{config_batch}/report_MS-pipeline1_{config_batch}.html",
zip_ = f"output/{config_batch}/MS-pipeline1_{config_batch}.zip",
# Save some metadata about inputs for good measure.
rule metadata:
output: "output/{config_batch}/metadata.tsv"
params: dataframe = df.to_csv(None, index_label = "index", sep = "\t"),
shell: """
echo '''{params.dataframe}''' > {output}
# TODO: Write something clever to the benchmarks/ directory, so we can infer relationship between hardware allocations, input size and running time.
"""
# Link input links or copies the input data to a specific directory. Long term, this should be on the fastest possible disk ie. userwork.
# TODO: Add the metadata as input to this rule.
rule copy_samples: # Or place_samples, or copy_samples
output:
flag = touch("output/{config_batch}/samples/copy_samples.done"), # Used to keep fragpipe waiting.
dir = directory("output/{config_batch}/samples"), # Why is this necessary?
d_files = "output/{config_batch}/samples/" + df["barcode"], # Bound for fragpipe. Update, but fragpipe uses the flag instead?
params:
d_files = (config_d_base + "/" + df["barcode"]).tolist(), # Problem is that snakemake doesn't like directories as inputs, so I think it is better to define it as a param.
benchmark: "output/{config_batch}/benchmarks/benchmark.copy_samples.{config_batch}.tsv"
shell: """
cp -vr {params.d_files} {output.dir}
# Enable editing of these files
chmod -R 775 output/
"""
# make_database cats all the amino acid fastas together and runs philosopher database on it
rule make_database:
input:
#glob = [glob.glob(config_database_glob)], # Not sure why this one was inside bracket/list?
glob = config_database_glob_read
output:
database = "output/{config_batch}/philosopher_database.faa",
params:
philosopher = config["philosopher_executable"],
retries: 4 # This is some black magic voodoo shit.
resources:
mem_mib = lambda wildcards, attempt : [16000, 32000, 64000, 128000, 175000, 0, 0][attempt-1], # Attempt starts from 1? Confirm:
#mem_mb = 64000,
runtime = "24h",
benchmark: "output/{config_batch}/benchmarks/benchmark.make_database.{config_batch}.tsv"
shell: """
mkdir -p output/{config_batch}/
>&2 echo "Concatenating database ..."
cat {input.glob} > output/{config_batch}/cat_database_sources.faa
mkdir -p output/{config_batch}/
cd output/{config_batch}/
{params.philosopher} workspace --init
# https://github.com/Nesvilab/philosopher/wiki/Database
{params.philosopher} database \
--custom cat_database_sources.faa \
--contam
echo "Existing database pattern-matched files:"
ls *-decoys-contam-cat_database_sources.faa.fas
mv *-decoys-contam-cat_database_sources.faa.fas philosopher_database.faa # rename database file.
# rm cat_database_sources.faa # remove unneccessary .faa file. # No, keep it for annotation purposes.
{params.philosopher} workspace --clean
"""
rule db_stats:
input:
database = "output/{config_batch}/philosopher_database.faa",
output:
db_stats_seqkit = "output/{config_batch}/db_stats_seqkit.tsv",
params:
config_database_glob_read = config_database_glob_read,
resources:
runtime = "1h",
mem_mib = 256,
conda: "envs/seqkit.yaml"
shell: """
# TODO: Database length in basepairs?
seqkit stats \
--tabular \
{params.config_database_glob_read} {input.database} \
> {output.db_stats_seqkit}
"""
# def memory_msfragger(wildcards, attempt):
# return attempt * 100000
# Run the fragpipe in headless. Define manifest and workflow on the fly.
# Potentially we could implement retries into this rule. But since there are soo many ways fragpipe can fail, and I don't want to waste many cpu hours, I'm abstaining from implementing.
rule fragpipe:
input:
copy_samples = "output/{config_batch}/samples/copy_samples.done",
database = "output/{config_batch}/philosopher_database.faa",
output:
flag = touch("output/{config_batch}/fragpipe_done.flag"),
manifest = "output/{config_batch}/fragpipe/{config_batch}.manifest",
fragpipe_workflow = "output/{config_batch}/fragpipe/fragpipe_modified.workflow",
#stats = "output/{config_batch}/fragpipe/fragpipe_stats.tsv", # Moved to rule fragpipe_stats
# final results:
#psm = "output/{config_batch}/fragpipe/experiment/psm.tsv", # A file will be created for each experiment, and really R should read each of those directly.
final_ion = "output/{config_batch}/fragpipe/combined_ion.tsv",
final_peptide = "output/{config_batch}/fragpipe/combined_peptide.tsv",
final_protein = "output/{config_batch}/fragpipe/combined_protein.tsv",
fragpipe_stdout = "output/{config_batch}/fragpipe/fragpipe.out.log"
params:
manifest = manifest.to_csv(path_or_buf=None, sep="\t", index=False, header=False), # This is a csv formatted string
#original_fragpipe_workflow = "assets/fragpipe_workflows/LFQ-MBR.workflow", # The path to the workflow that specifies the type of analysis
original_fragpipe_workflow = "assets/fragpipe_workflows/LFQ-MBR_carl_no_overwrite.workflow", # The path to the workflow that specifies the type of analysis. Honestly, I don't think it matters that you just overwrite the settings in the original.
slice_db = 16, # The number of database splits that fragpipe (msfragger) should perform.
fragpipe_executable = config["fragpipe_executable"],
msfragger_jar = config["msfragger_jar"],
ionquant_jar = config["ionquant_jar"],
philosopher_executable = config["philosopher_executable"],
fragpipe_workdir = "output/{config_batch}/fragpipe", # Bound for fragpipe --workdir
threads: 16 # 8 for testing
resources: # for memory, use around (6.3e-5)*n_proteins+41 GiB ram
#partition = "bigmem", # When using more than 178.5 GB at sigma2/saga
#mem_mb = 32000, # for testing
#mem_mb = 190000, # Some people like to use 150GB in bigmem with 12 threads.
mem_mib = 645120, # Gianter swap
runtime = "36h",
#conda: "envs/openjdk_python.yaml"
conda: "envs/openjdk_python_extra.yaml" # TODO: Use this file, I checked it already, and you just have to install pyopenms manually. Don't want to use a previous version of python (e.g. 3.9) just to have easypqp installed, as it seems like some people do not have it too.
benchmark: "output/{config_batch}/benchmarks/benchmark.fragpipe.{config_batch}.tsv"
shell: """
echo "Create manifest ..."
echo '''{params.manifest}''' > {output.manifest}
tail {output.manifest}
echo "Modifying workflow with runtime parameters ..." # TODO: Check if it matters to overwrite or not.
# Copy and modify parameter file with dynamic content.
cp {params.original_fragpipe_workflow} {output.fragpipe_workflow}
echo -e "\n# Added by mspipeline1 in rule fragpipe in snakefile below ..." >> {output.fragpipe_workflow}
echo "num_threads={threads}" >> {output.fragpipe_workflow}
echo "database_name={input.database}" >> {output.fragpipe_workflow}
echo "database.db-path={input.database}" >> {output.fragpipe_workflow}
echo "output_location={params.fragpipe_workdir}" >> {output.fragpipe_workflow}
# These settings minimize memory usage.
echo "msfragger.misc.slice-db={params.slice_db}" >> {output.fragpipe_workflow} # Default 1
echo "msfragger.calibrate_mass=0" >> {output.fragpipe_workflow} # Default 2
echo "msfragger.digest_max_length=35" >> {output.fragpipe_workflow} # Default 50
# echo "msfragger.allowed_missed_cleavage_1=1" >> {output.fragpipe_workflow} # Default 2
# echo "msfragger.allowed_missed_cleavage_2=1" >> {output.fragpipe_workflow} # Default 2
# Debug, presentation of the bottom of the modified workflow
echo "" >> {output.fragpipe_workflow}
tail {output.fragpipe_workflow}
# Convert mem_mb into gb (I'm not sure if fragpipe reads GiB or GB?)
mem_gib=$(({resources.mem_mib}/1024-2)) # Because there is some overhead, we subtract a few GBs. Everytime fragpipe runs out of memory, I subtract another one: that should be more effective than doing a series of tests ahead of time.
echo "Fragpipe will be told not to use more than $mem_gib GiB. In practice it usually uses a bit more." # Or maybe there just is an overhead when using a conda environment?
echo "Fragpipe ..."
# https://fragpipe.nesvilab.org/docs/tutorial_headless.html
{params.fragpipe_executable} \
--headless \
--workflow {output.fragpipe_workflow} \
--manifest {output.manifest} \
--workdir {params.fragpipe_workdir} \
--ram $mem_gib \
--threads {threads} \
--config-msfragger {params.msfragger_jar} \
--config-ionquant {params.ionquant_jar} \
--config-philosopher {params.philosopher_executable} \
| tee {output.fragpipe_stdout} # Write the log, so we can later extract the number of "scans"
"""
# I moved some of these stats out just to make the debugging easier.
# This could have been tailing the fragpipe, but I just think it is easier to develop it like this.
rule fragpipe_stats:
input:
fragpipe_stdout = "output/{config_batch}/fragpipe/fragpipe.out.log",
output:
scans = "output/{config_batch}/fragpipe/stats_fragpipe_scans.tsv",
#psms = "output/{config_batch}/psms.tsv", # Better to let R read these files from the zip directory.
params:
fragpipe_workdir = "output/{config_batch}/fragpipe", # Bound for fragpipe --workdir
shell: """
# Extract scans from the fragpipe stdout log. Will later be compared to the individual psm files.
grep -E ": Scans = [0-9]+" {input.fragpipe_stdout} \
> {output.scans}
"""
# Rename this to report: Do the report first, then zip the report with its outputs.
rule report:
input:
"output/{config_batch}/metadata.tsv",
"output/{config_batch}/db_stats_seqkit.tsv",
"output/{config_batch}/fragpipe/{config_batch}.manifest",
"output/{config_batch}/fragpipe/fragpipe_modified.workflow",
"output/{config_batch}/fragpipe/stats_fragpipe_scans.tsv",
"output/{config_batch}/fragpipe/combined_ion.tsv",
"output/{config_batch}/fragpipe/combined_peptide.tsv",
"output/{config_batch}/fragpipe/combined_protein.tsv",
"output/{config_batch}/fragpipe/fragpipe.out.log",
output:
report = "output/{config_batch}/report_MS-pipeline1_{config_batch}.html",
idrate = "output/{config_batch}/identification_rate.tsv",
benchmark: "output/{config_batch}/benchmarks/benchmark.report.{config_batch}.tsv"
conda: "envs/r-markdown.yaml"
resources:
runtime = "4h",
mem_mib = 4096,
shell: """
cp scripts/QC.Rmd rmarkdown_template.rmd
Rscript -e 'rmarkdown::render("rmarkdown_template.rmd", "html_document", output_file = "{output.report}", knit_root_dir = "output/{config_batch}/", quiet = F)'
rm rmarkdown_template.rmd
"""
rule zip:
input:
"output/{config_batch}/metadata.tsv",
"output/{config_batch}/db_stats_seqkit.tsv",
"output/{config_batch}/fragpipe/{config_batch}.manifest",
"output/{config_batch}/fragpipe/fragpipe_modified.workflow",
"output/{config_batch}/fragpipe/stats_fragpipe_scans.tsv",
"output/{config_batch}/fragpipe/combined_ion.tsv",
"output/{config_batch}/fragpipe/combined_peptide.tsv",
"output/{config_batch}/fragpipe/combined_protein.tsv",
"output/{config_batch}/report_MS-pipeline1_{config_batch}.html",
"output/{config_batch}/identification_rate.tsv",
"scripts/QC.Rmd"
output:
"output/{config_batch}/MS-pipeline1_{config_batch}.zip",
resources:
runtime = "1h",
shell: """
zip {output} {input}
"""
onstart:
shell("mkdir -p logs/old/; (mv logs/*.log logs/old/ 2> /dev/null || exit 0) &") # Put old logs aside
shell("find output/ > .onstart.txt 2> /dev/null || exit 0")
onsuccess:
print("onsuccess: The following (first 100) files were created:")
#shell("find output/ > .onsuccess.txt && diff .onstart.txt .onsuccess.txt | head -n 100 || exit 0")
shell(""" diff .onstart.txt .onsuccess.txt > .diff.txt || head -n 10 .diff.txt; echo "$(cat .diff.txt | wc -l) files total" """)
print("*/") # This is a dot-language specific comment close tag that helps when you export the workflow as a graph
# TODO: in the benchmark directory, there should also be information about number of proteins processed, and number of scans in the tims-input. This way we might better be able to figure the resource requirements for bigger future projects.