Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Replace mutect2_par wrapper with snakemake scattergather #538

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
106 changes: 101 additions & 5 deletions snappy_pipeline/workflows/somatic_variant_calling/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -127,23 +127,119 @@ if wf.w_config.step_config["somatic_variant_calling"].mutect2.common_variants:
wf.wrapper_path("mutect2/contamination")


rule somatic_variant_calling_mutect2_run:
scattergather:
mutect2=8,


rule somatic_variant_calling_mutect2_split:
input:
unpack(wf.get_input_files("mutect", "run")),
output:
regions=temp(
scatter.mutect2(
"work/{{mapper}}.mutect2.{{tumor_library}}/out/{{mapper}}.mutect2.{{tumor_library}}/mutect2par/in/{scatteritem}.txt"
)
),
params:
ignore_chroms=wf.w_config.get("ignore_chroms", wf.config.get("ignore_chroms", [])),
fai=config["static_data_config"]["reference"]["path"] + ".fai",
log:
log="work/{mapper}.mutect2.{tumor_library}/out/{mapper}.mutect2.{tumor_library}/mutect2par/in/split.log",
script:
"scripts/split_region.py"


rule somatic_variant_calling_mutect2_process:
input:
unpack(wf.get_input_files("mutect2", "run")),
region="work/{mapper}.mutect2.{tumor_library}/out/{mapper}.mutect2.{tumor_library}/mutect2par/in/{scatteritem}.txt",
output:
**wf.get_output_files("mutect2", "run"),
threads: wf.get_resource("mutect2", "run", "threads")
raw=temp(
"work/{mapper}.mutect2.{tumor_library}/out/{mapper}.mutect2.{tumor_library}/mutect2par/out/{scatteritem}.raw.vcf.gz"
),
stats=temp(
"work/{mapper}.mutect2.{tumor_library}/out/{mapper}.mutect2.{tumor_library}/mutect2par/out/{scatteritem}.raw.vcf.stats"
),
f1r2=temp(
"work/{mapper}.mutect2.{tumor_library}/out/{mapper}.mutect2.{tumor_library}/mutect2par/out/{scatteritem}.raw.f1r2_tar.tar.gz"
),
resources:
time=wf.get_resource("mutect2", "run", "time"),
memory=wf.get_resource("mutect2", "run", "memory"),
partition=wf.get_resource("mutect2", "run", "partition"),
tmpdir=wf.get_resource("mutect2", "run", "tmpdir"),
log:
**wf.get_log_file("mutect2", "run"),
log="work/{mapper}.mutect2.{tumor_library}/out/{mapper}.mutect2.{tumor_library}/mutect2par/out/{scatteritem}.log",
log_md5="work/{mapper}.mutect2.{tumor_library}/out/{mapper}.mutect2.{tumor_library}/mutect2par/out/{scatteritem}.log.md5",
conda_list="work/{mapper}.mutect2.{tumor_library}/out/{mapper}.mutect2.{tumor_library}/mutect2par/out/{scatteritem}.conda_list",
conda_info="work/{mapper}.mutect2.{tumor_library}/out/{mapper}.mutect2.{tumor_library}/mutect2par/out/{scatteritem}.conda_info",
conda_list_md5="work/{mapper}.mutect2.{tumor_library}/out/{mapper}.mutect2.{tumor_library}/mutect2par/out/{scatteritem}.conda_list.md5",
conda_info_md5="work/{mapper}.mutect2.{tumor_library}/out/{mapper}.mutect2.{tumor_library}/mutect2par/out/{scatteritem}.conda_info.md5",
params:
args=lambda wildcards, input: {
"intervals": list(map(str.strip, open(input.region, "rt").readlines()))
},
normal_lib_name=wf.substep_getattr("mutect2", "get_normal_lib_name"),
intervals=lambda wildcards, input: list(
map(str.strip, open(input.region, "rt").readlines())
),
wrapper:
wf.wrapper_path("mutect2_par/run")
wf.wrapper_path("mutect2/run")


rule somatic_variant_calling_mutect2_gather:
input:
raw=gather.mutect2(
"work/{{mapper}}.mutect2.{{tumor_library}}/out/{{mapper}}.mutect2.{{tumor_library}}/mutect2par/out/{scatteritem}.raw.vcf.gz"
),
stats=gather.mutect2(
"work/{{mapper}}.mutect2.{{tumor_library}}/out/{{mapper}}.mutect2.{{tumor_library}}/mutect2par/out/{scatteritem}.raw.vcf.stats"
),
f1r2=gather.mutect2(
"work/{{mapper}}.mutect2.{{tumor_library}}/out/{{mapper}}.mutect2.{{tumor_library}}/mutect2par/out/{scatteritem}.raw.f1r2_tar.tar.gz"
),
output:
**wf.get_output_files("mutect2", "run"),
conda:
"envs/mutect2_gather.yaml"
log:
log="work/{mapper}.mutect2.{tumor_library}/out/{mapper}.mutect2.{tumor_library}/gather.raw.vcf.gz.log",
shell:
"""
# Concatenate raw calls vcfs & index result ----------------------
bcftools concat \
--allow-overlaps \
-d none \
-o {output.raw} \
-O z \
{input.raw}
tabix -f {output.raw}

# Concatenate stats with GATK tool -------------------------------
stats=$(echo "{input.stats}" | sed -e "s/ / -stats /g")
gatk MergeMutectStats -stats $stats -O {output.stats}

# Contatenate orientation tar files ------------------------------
tmpdir=$(mktemp -d)
for tar_file in {input.f1r2}
do
abs_path=$(realpath $tar_file)
pushd $tmpdir
tar -zxvf $abs_path
popd
done
tar -zcvf {output.f1r2} -C $tmpdir .
rm -rf $tmpdir

# Compute md5 sums -----------------------------------------------
pushd $(dirname {output.raw})
for f in *; do
if [[ -f "$f" ]] && [[ $f != *.md5 ]]; then
md5sum $f > $f.md5;
fi;
done
popd
"""


rule somatic_variant_calling_mutect2_filter:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import csv
from bisect import bisect
from contextlib import redirect_stderr
from itertools import accumulate

from snappy_wrappers.tools.genome_windows import matches_any


def split_regions(
contig_lengths: dict[str, int], n_chunks: int, offset: int = 1
) -> list[list[[str, int, int]]]:
"""Splits a given collection of contigs based on their length into `n_chunks` equal sized chunks"""
contigs, lengths = zip(*contig_lengths.items())
cumulative_lengths = list(accumulate(lengths, initial=0))
chunk_size = cumulative_lengths[-1] // n_chunks

all_entries = []
cumulative_start = cumulative_end = 0
for _ in range(n_chunks):
entries = []
idx = bisect(cumulative_lengths, cumulative_start) - 1
accumulated_length = 0
while accumulated_length < chunk_size and idx < len(contigs):
contig = contigs[idx]
available_length = cumulative_lengths[idx + 1] - cumulative_end
remaining = chunk_size - accumulated_length
cumulative_end = cumulative_start + min(available_length, remaining)

start = cumulative_start - cumulative_lengths[idx]
end = cumulative_end - cumulative_lengths[idx]
entries.append([contig, start + offset, end + offset])

diff = cumulative_end - cumulative_start
accumulated_length += diff
cumulative_start += diff
idx = bisect(cumulative_lengths, cumulative_start) - 1

assert sum(map(lambda x: x[2] - x[1], entries)) == chunk_size

all_entries.append(entries)

# make sure the last entry has any bases lost due to rounding
all_entries[-1][-1][2] = lengths[-1]
return all_entries


with open(snakemake.log.log, "wt") as log:
with redirect_stderr(log):
with open(snakemake.params.fai, "rt") as fai:
csv_reader = csv.reader(fai, delimiter="\t")
lengths = {
contig: int(length)
for contig, length, *_ in csv_reader
if not matches_any(contig, snakemake.params.ignore_chroms)
}
regions = split_regions(lengths, len(snakemake.output.regions))
for region, path in zip(regions, snakemake.output.regions):
with open(path, "wt") as f:
for chrom, start, end in region:
print(f"{chrom}:{start}-{end}", file=f)
Loading