From 35676e4abb2ef1d76db6a7301b69b01b668bb3c1 Mon Sep 17 00:00:00 2001 From: Ryan Routsong Date: Mon, 12 Aug 2024 12:12:19 -0500 Subject: [PATCH] feat: add flag to set snakemake rerun triggers (#27) * feat: add flag to set snakemake rerun triggers * chore: remove references to old coassembly methods * fix: switch to space seperated trigger setting * fix: fix delimter for csv files --- metamorph | 102 +++++++++++++++++++++++------------------ src/run.py | 65 +++++--------------------- src/run.sh | 24 ++++++++-- src/utils.py | 57 +++++++++++++++++++++++ workflow/Snakefile | 6 +-- workflow/rules/DNA.smk | 49 +------------------- workflow/rules/RNA.smk | 45 ------------------ 7 files changed, 148 insertions(+), 200 deletions(-) diff --git a/metamorph b/metamorph index e0dfe86..230b299 100755 --- a/metamorph +++ b/metamorph @@ -29,20 +29,26 @@ USAGE: $ metamorph [OPTIONS] EXAMPLES: - co-assembly dna-only: - $ metamorph run --coa --input *.R?.fastq.gz --output output - $ metamorph run -C --input *.R?.fastq.gz --output output - - per-sample assembly dna-only: - $ metamorph run --input *.R?.fastq.gz --output output - - co-assembly rna & dna: - $ metamorph run --coa --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output - $ metamorph run -C --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output - - per-sample assembly rna & dna: - $ metamorph run --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output -""" + dna-only: + $ metamorph run --input --output + sample sheet: + ________ + | DNA | + | ------- + pair1 | path | + pair2 | path | + -------- + + rna & dna: + $ metamorph run --input --output + sample sheet: + ________________ + | DNA | RNA | + |---------------| + pair1 | path | path | + pair2 | path | path | + --------------- +""" from __future__ import print_function from datetime import timezone, datetime import argparse, sys, os, subprocess, json, textwrap @@ -50,16 +56,9 @@ import argparse, sys, os, subprocess, json, textwrap # Local imports from src import version -from src.run import init, setup, bind, dryrun, runner, valid_input -from src.utils import ( - Colors, - err, - exists, - fatal, - check_cache, - require, - permissions -) +from src.run import init, setup, bind, dryrun, runner +from src.utils import Colors, err, exists, fatal, check_cache, require, \ + permissions, valid_trigger, valid_input # Pipeline Metadata @@ -156,7 +155,6 @@ def run(sub_args): config = config ) config['bindpaths'] = bindpaths - config['coassembly'] = False # Step 4b. Setup assembly mode # modes: 0 - megahit + metaspades assembly @@ -190,6 +188,7 @@ def run(sub_args): if 'databases' in config: bindpaths.extend([mount['from']+':'+mount['to']+':'+mount['mode'] for mount in config['databases']]) + triggers = sub_args.triggers if sub_args.triggers else None mjob = runner(mode = sub_args.mode, outdir = sub_args.output, alt_cache = sub_args.singularity_cache, @@ -199,6 +198,7 @@ def run(sub_args): logger = logfh, additional_bind_paths = ",".join(bindpaths), tmp_dir = sub_args.tmp_dir, + triggers = triggers ) # Step 6. Wait for subprocess to complete, @@ -391,7 +391,7 @@ def parsed_arguments(name, description): module load singularity snakemake # Step 2A.) Dry-run the pipeline - ./{0} run --input .tests/*.R?.fastq.gz \\ + ./{0} run --input \\ --output /data/$USER/output \\ --mode slurm \\ --dry-run @@ -400,30 +400,36 @@ def parsed_arguments(name, description): # The slurm mode will submit jobs to # the cluster. It is recommended running # the pipeline in this mode. - ./{0} run --input .tests/*.R?.fastq.gz \\ + ./{0} run --input \\ --output /data/$USER/output \\ --mode slurm # Step 3B.) Run the {0} pipeline in co-assembly fashion # with slurm - ./{0} run --coa --input .tests/*.R?.fastq.gz \\ + ./{0} run --input .tests/*.R?.fastq.gz \\ --output /data/$USER/output \\ --mode slurm {2}{3}EXAMPLES:{4} - co-assembly dna-only: - $ metamorph run --coa --input *.R?.fastq.gz --output output - $ metamorph run -C --input *.R?.fastq.gz --output output - - per-sample assembly dna-only: - $ metamorph run --input *.R?.fastq.gz --output output - - co-assembly rna & dna: - $ metamorph run --coa --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output - $ metamorph run -C --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output - - per-sample assembly rna & dna: - $ metamorph run --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output + dna-only: + $ metamorph run --input --output + sample sheet: + ________ + | DNA | + | ------- + pair1 | path | + pair2 | path | + -------- + + rna & dna: + $ metamorph run --input --output + sample sheet: + ________________ + | DNA | RNA | + |---------------| + pair1 | path | path | + pair2 | path | path | + --------------- {2}{3}VERSION:{4} @@ -466,9 +472,6 @@ def parsed_arguments(name, description): action='help', help=argparse.SUPPRESS ) - - # Analysis options - # ... add here # Orchestration Options # Execution Method, run locally @@ -492,6 +495,16 @@ def parsed_arguments(name, description): help = argparse.SUPPRESS ) + # Snakemake rerun triggers + subparser_run.add_argument( + '-t', '--triggers', + type = valid_trigger, + required = False, + default = None, + nargs="*", + help = argparse.SUPPRESS + ) + # Dry-run # Do not execute the workflow, # prints what steps remain @@ -744,7 +757,6 @@ def parsed_arguments(name, description): def main(): - # Sanity check for usage if len(sys.argv) == 1: # Nothing was provided diff --git a/src/run.py b/src/run.py index 36c54df..b93ed58 100644 --- a/src/run.py +++ b/src/run.py @@ -5,7 +5,7 @@ from __future__ import print_function from shutil import copytree from pathlib import Path -from csv import DictReader, Sniffer + import os, re, json, sys, subprocess, argparse @@ -664,53 +664,6 @@ def dryrun(outdir, config='config.json', snakefile=os.path.join('workflow', 'Sna return dryrun_output -def valid_input(sheet): - """ - Valid sample sheets should contain two columns: "DNA" and "RNA" - - _________________ - | DNA | RNA | - |---------------| - pair1 | path | path | - pair2 | path | path | - """ - # check file permissions - sheet = os.path.abspath(sheet) - if not os.path.exists(sheet): - raise argparse.ArgumentTypeError(f'Sample sheet path {sheet} does not exist!') - if not os.access(sheet, os.R_OK): - raise argparse.ArgumentTypeError(f"Path `{sheet}` exists, but cannot read path due to permissions!") - - # check format to make sure it's correct - if sheet.endswith('.tsv') or sheet.endswith('.txt'): - delim = '\t' - elif sheet.endswith('.csv'): - delim = '\t' - - rdr = DictReader(open(sheet, 'r'), delimiter=delim) - - if 'DNA' not in rdr.fieldnames: - raise argparse.ArgumentTypeError("Sample sheet does not contain `DNA` column") - if 'RNA' not in rdr.fieldnames: - print("-- Running in DNA only mode --") - else: - print("-- Running in paired DNA & RNA mode --") - - data = [row for row in rdr] - RNA_included = False - for row in data: - row['DNA'] = os.path.abspath(row['DNA']) - if not os.path.exists(row['DNA']): - raise argparse.ArgumentTypeError(f"Sample sheet path `{row['DNA']}` does not exist") - if 'RNA' in row and not row['RNA'] in ('', None, 'None'): - RNA_included = True - row['RNA'] = os.path.abspath(row['RNA']) - if not os.path.exists(row['RNA']): - raise argparse.ArgumentTypeError(f"Sample sheet path `{row['RNA']}` does not exist") - - return data, RNA_included - - try: __job_name__ = 'metamorph_' + os.getlogin() + ':master' except OSError: @@ -726,6 +679,7 @@ def runner( threads=2, jobname=__job_name__, submission_script='run.sh', + triggers=None, tmp_dir = '/lscratch/$SLURM_JOB_ID/' ): """Runs the pipeline via selected executor: local or slurm. @@ -833,11 +787,14 @@ def runner( # --cluster "${CLUSTER_OPTS}" --keep-going --restart-times 3 -j 500 \ # --rerun-incomplete --stats "$3"/logfiles/runtime_statistics.json \ # --keep-remote --local-cores 30 2>&1 | tee -a "$3"/logfiles/master.log - masterjob = subprocess.Popen([ - str(submission_script), mode, - '-j', jobname, '-b', str(bindpaths), - '-o', str(outdir), '-c', str(cache), - '-t', "'{}'".format(tmp_dir) - ], cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env) + cmd = [ + str(submission_script), mode, + '-j', jobname, '-b', str(bindpaths), + '-o', str(outdir), '-c', str(cache), + '-t', "'{}'".format(tmp_dir), + ] + if triggers: + cmd.extend(['-r', ','.join(triggers)]) + masterjob = subprocess.Popen(cmd, cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env) return masterjob diff --git a/src/run.sh b/src/run.sh index 079e4be..b531542 100755 --- a/src/run.sh +++ b/src/run.sh @@ -8,7 +8,8 @@ USAGE: -o OUTDIR \\ -j MASTER_JOB_NAME \\ -b SINGULARITY_BIND_PATHS \\ - -t TMP_DIR + -t TMP_DIR \\ + -r RERUN_TRIGGERS SYNOPSIS: This script creates/submits the pipeline's master job to the cluster. The master job acts as the pipeline's main controller or @@ -61,7 +62,12 @@ Required Arguments: this location. On Biowulf, it should be set to '/lscratch/\$SLURM_JOBID/'. On FRCE, this value should be set to the following: - '/scratch/cluster_scratch/\$USER/'. + '/scratch/cluster_scratch/\$USER/'. + -r, --triggers [Type: Str] Snakemake rerun triggers. See + description of flag '--rerun-triggers', at + https://snakemake.readthedocs.io/en/stable/executing/cli.html#all-options + for more details. + Default: code params software_env input mtime OPTIONS: -c, --cache [Type: Path] Path to singularity cache. If not provided, the path will default to the current working @@ -97,6 +103,7 @@ function parser() { -t | --tmp-dir) provided "$key" "${2:-}"; Arguments["t"]="$2"; shift; shift;; -o | --outdir) provided "$key" "${2:-}"; Arguments["o"]="$2"; shift; shift;; -c | --cache) provided "$key" "${2:-}"; Arguments["c"]="$2"; shift; shift;; + -r | --triggers) provided "$key" "${2:-}"; Arguments["r"]="${2/,/' '}"; shift; shift;; -* | --*) err "Error: Failed to parse unsupported argument: '${key}'."; usage && exit 1;; *) err "Error: Failed to parse unrecognized argument: '${key}'. Do any of your inputs have spaces?"; usage && exit 1;; esac @@ -159,6 +166,7 @@ function submit(){ # INPUT $4 = Singularity Bind paths # INPUT $5 = Singularity cache directory # INPUT $6 = Temporary directory for output files + # INPUT $7 = rerun trigger values # Check if singularity and snakemake are in $PATH # If not, try to module load singularity as a last resort @@ -191,6 +199,9 @@ function submit(){ # --printshellcmds --keep-going --rerun-incomplete # --keep-remote --restart-times 3 -j 500 --use-singularity # --singularity-args -B {}.format({bindpaths}) --local-cores 24 + triggers="${7:-'code params software_env input mtime'}" + rerun="--rerun-triggers $triggers" + SLURM_DIR="$3/logfiles/slurmfiles" CLUSTER_OPTS="sbatch --gres {cluster.gres} --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname} -e $SLURM_DIR/slurm-%j_{params.rname}.out -o $SLURM_DIR/slurm-%j_{params.rname}.out" # Check if NOT running on Biowulf @@ -228,6 +239,7 @@ snakemake \\ -s "$3/workflow/Snakefile" \\ -d "$3" \\ --use-singularity \\ + $rerun \\ --singularity-args "\\-c \\-B '$4'" \\ --use-envmodules \\ --verbose \\ @@ -279,9 +291,9 @@ function main(){ # Parses remaining user provided command-line arguments parser "${@:2}" # Remove first item of list + outdir="$(abspath "$(dirname "${Arguments[o]}")")" Arguments[o]="${Arguments[o]%/}" # clean outdir path (remove trailing '/') - # Setting defaults for non-required arguments # If singularity cache not provided, default to ${outdir}/.singularity cache="${Arguments[o]}/.singularity" @@ -294,7 +306,11 @@ function main(){ # Run pipeline and submit jobs to cluster using the defined executor mkdir -p "${Arguments[o]}/logfiles/" - job_id=$(submit "${Arguments[e]}" "${Arguments[j]}" "${Arguments[o]}" "${Arguments[b]}" "${Arguments[c]}" "${Arguments[t]}") + if [[ ! -v Arguments[r] ]] ; then + job_id=$(submit "${Arguments[e]}" "${Arguments[j]}" "${Arguments[o]}" "${Arguments[b]}" "${Arguments[c]}" "${Arguments[t]}") + else + job_id=$(submit "${Arguments[e]}" "${Arguments[j]}" "${Arguments[o]}" "${Arguments[b]}" "${Arguments[c]}" "${Arguments[t]}" "${Arguments[r]}") + fi echo -e "[$(date)] Pipeline submitted to cluster.\nMaster Job ID: $job_id" echo "${job_id}" > "${Arguments[o]}/logfiles/mjobid.log" diff --git a/src/utils.py b/src/utils.py index 37af5e1..e24aeef 100644 --- a/src/utils.py +++ b/src/utils.py @@ -6,6 +6,7 @@ from shutil import copytree import os, sys, hashlib import subprocess, json +from csv import DictReader def md5sum(filename, first_block_only = False, blocksize = 65536): @@ -370,6 +371,62 @@ def hashed(l): return h +def valid_input(sheet): + from argparse import ArgumentTypeError + """ + Valid sample sheets should contain two columns: "DNA" and "RNA" + + _________________ + | DNA | RNA | + |---------------| + pair1 | path | path | + pair2 | path | path | + """ + # check file permissions + sheet = os.path.abspath(sheet) + if not os.path.exists(sheet): + raise ArgumentTypeError(f'Sample sheet path {sheet} does not exist!') + if not os.access(sheet, os.R_OK): + raise ArgumentTypeError(f"Path `{sheet}` exists, but cannot read path due to permissions!") + + # check format to make sure it's correct + if sheet.endswith('.tsv') or sheet.endswith('.txt'): + delim = '\t' + elif sheet.endswith('.csv'): + delim = ',' + + rdr = DictReader(open(sheet, 'r'), delimiter=delim) + + if 'DNA' not in rdr.fieldnames: + raise ArgumentTypeError("Sample sheet does not contain `DNA` column") + if 'RNA' not in rdr.fieldnames: + print("-- Running in DNA only mode --") + else: + print("-- Running in paired DNA & RNA mode --") + + data = [row for row in rdr] + RNA_included = False + for row in data: + row['DNA'] = os.path.abspath(row['DNA']) + if not os.path.exists(row['DNA']): + raise ArgumentTypeError(f"Sample sheet path `{row['DNA']}` does not exist") + if 'RNA' in row and not row['RNA'] in ('', None, 'None'): + RNA_included = True + row['RNA'] = os.path.abspath(row['RNA']) + if not os.path.exists(row['RNA']): + raise ArgumentTypeError(f"Sample sheet path `{row['RNA']}` does not exist") + + return data, RNA_included + + +def valid_trigger(trigger_given): + from argparse import ArgumentTypeError + snk_triggers = ('mtime', 'code', 'input', 'params', 'software-env') + if not trigger_given in snk_triggers: + raise ArgumentTypeError('Invalid trigger selected please only use one of: ' + ', '.join(snk_triggers)) + return trigger_given + + if __name__ == '__main__': # Calculate MD5 checksum of entire file print('{} {}'.format(md5sum(sys.argv[0]), sys.argv[0])) diff --git a/workflow/Snakefile b/workflow/Snakefile index 46a556d..1cc8f40 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -13,11 +13,9 @@ rna_datapath = config["project"].get("rna_datapath", []) workpath = config["project"]["workpath"] tmpdir = config["options"]["tmp_dir"] megahit_only = bool(int(config["options"]["assembler_mode"])) -coassemble = config['coassembly'] is True rna_included = list_bool(config.get("rna", 'false')) -rna_coasm = str_bool(config["options"].get("rnacoa", 'False')) -rna_sample_stems = config.get("rna", []) if not rna_coasm else ['concatenated'] -samples = config["samples"] if not coassemble else ['concatenated'] +rna_sample_stems = config.get("rna", []) +samples = config["samples"] # DNA directories top_assembly_dir = join(workpath, config['project']['id'], "metawrap_assembly") diff --git a/workflow/rules/DNA.smk b/workflow/rules/DNA.smk index 5fb561f..c699923 100644 --- a/workflow/rules/DNA.smk +++ b/workflow/rules/DNA.smk @@ -17,7 +17,7 @@ default_memory = cluster["__default__"]['mem'] # directories workpath = config["project"]["workpath"] datapath = config["project"]["datapath"] -samples = config["samples"] if not coassemble else ['concatenated'] +samples = config["samples"] top_log_dir = join(workpath, "logfiles") top_readqc_dir = join(workpath, config['project']['id'], "metawrap_read_qc") top_trim_dir = join(workpath, config['project']['id'], "trimmed_reads") @@ -36,7 +36,6 @@ megahit_only = bool(int(config["options"]["assembler_mode"])) """ Step-wise pipeline outline: - 0. concat or no concat 1. read qc 2. assembly 3. binning @@ -47,52 +46,6 @@ megahit_only = bool(int(config["options"]["assembler_mode"])) 8. align DNA to assembly """ -rule concat_reads: - input: - all_r1_reads = expand(join(workpath, "dna", "{sid}_R1.fastq.gz"), sid=config['samples'] if config['coassembly'] else []), - all_r2_reads = expand(join(workpath, "dna", "{sid}_R2.fastq.gz"), sid=config['samples'] if config['coassembly'] else []), - output: - big_compressed_read_r1 = join(workpath, "dna", "concatenated_R1.fastq.gz"), - big_compressed_read_r2 = join(workpath, "dna", "concatenated_R2.fastq.gz"), - big_read1_hash = join(workpath, "dna", "concatenated_R1.md5"), - big_read2_hash = join(workpath, "dna", "concatenated_R2.md5"), - params: - big_read_r1 = join(workpath, "dna", "concatenated_R1.fastq"), - big_read_r2 = join(workpath, "dna", "concatenated_R2.fastq"), - rname = "concat_reads", - input_dir = join(workpath, "dna"), - threads: int(cluster["concat_reads"].get('threads', default_threads)), - shell: - """ - shopt -s extglob - # concat r1 - for fastq in {params.input_dir}/*R1*f?(ast)q*; do - ext=$(echo "${{fastq: -2}}" | tr '[:upper:]' '[:lower:]') - if [[ "$ext" == "gz" ]]; then - zcat $fastq >> {params.big_read_r1} - else - cat $fastq >> {params.big_read_r1} - fi; - done - - # concat r2 - for fastq in {params.input_dir}/*R2*f?(ast)q*; do - ext=$(echo "${{fastq: -2}}" | tr '[:upper:]' '[:lower:]') - if [[ "$ext" == "gz" ]]; then - zcat $fastq > {params.big_read_r2} - else - cat $fastq > {params.big_read_r2} - fi; - done - shopt -u extglob - - pigz -9 -p 28 -c {params.big_read_r1} > {output.big_compressed_read_r1} - pigz -9 -p 28 -c {params.big_read_r2} > {output.big_compressed_read_r2} - md5sum {output.big_compressed_read_r1} > {output.big_read1_hash} - md5sum {output.big_compressed_read_r2} > {output.big_read2_hash} - rm {params.big_read_r1} {params.big_read_r2} - """ - rule metawrap_read_qc: """ diff --git a/workflow/rules/RNA.smk b/workflow/rules/RNA.smk index dcdfbb0..b05ca04 100644 --- a/workflow/rules/RNA.smk +++ b/workflow/rules/RNA.smk @@ -12,7 +12,6 @@ from scripts.common import str_bool, list_bool, get_paired_dna workpath = config["project"]["workpath"] rna_datapath = config["project"].get("rna_datapath", "/dev/null") rna_included = list_bool(config.get("rna", 'false')) -rna_coasm = False rna_sample_stems = config.get("rna", []) rna_compressed = True # if accepting uncompressed fastq input get_dna = partial(get_paired_dna, config) @@ -23,50 +22,6 @@ top_trim_dir_rna = join(workpath, config['project']['id'], "t top_map_dir_rna = join(workpath, config['project']['id'], "mapping_RNA") -rule concat_rna_reads: - input: - all_r1_reads = expand(join(workpath, "rna", "{rname}_R1.fastq.gz"), rname=rna_sample_stems if rna_coasm else []) if rna_included else [], - all_r2_reads = expand(join(workpath, "rna", "{rname}_R2.fastq.gz"), rname=rna_sample_stems if rna_coasm else []) if rna_included else [], - output: - big_compressed_read_r1 = join(workpath, "rna", "concatenated_R1.fastq.gz"), - big_compressed_read_r2 = join(workpath, "rna", "concatenated_R2.fastq.gz"), - big_read1_hash = join(workpath, "rna", "concatenated_R1.md5"), - big_read2_hash = join(workpath, "rna", "concatenated_R2.md5"), - params: - rname = "concat_rna_reads", - big_read_r1 = join(workpath, "rna", "concatenated_R1.fastq"), - big_read_r2 = join(workpath, "rna", "concatenated_R2.fastq"), - input_dir = workpath, - threads: int(cluster["concat_rna_reads"].get('threads', default_threads)), - shell: - """ - # concat r1 - for fastq in {params.input_dir}/*R1*fastq; do - ext=$(echo "${{fastq: -2}}" | tr '[:upper:]' '[:lower:]') - if [[ "$ext" == "gz" ]]; then - zcat $fastq >> {params.big_read_r1} - else - cat $fastq >> {params.big_read_r1} - fi; - done - - # concat r2 - for fastq in {params.input_dir}/*R2*fastq; do - ext=$(echo "${{fastq: -2}}" | tr '[:upper:]' '[:lower:]') - if [[ "$ext" == "gz" ]]; then - zcat $fastq > {params.big_read_r2} - else - cat $fastq >> {params.big_read_r2} - fi; - done - pigz -9 -p {threads} -c {output.big_read_r1} > {output.big_compressed_read_r1} - pigz -9 -p {threads} -c {output.big_read_r2} > {output.big_compressed_read_r2} - md5sum {output.big_compressed_read_r1} > {output.big_read1_hash} - md5sum {output.big_compressed_read_r2} > {output.big_read2_hash} - rm {output.big_read_r1} {output.big_read_r2} - """ - - rule rna_read_qc: input: R1 = join(workpath, "rna", "{rname}_R1.fastq.gz") if rna_included else [],