Skip to content

Commit

Permalink
feat: add flag to set snakemake rerun triggers (#27)
Browse files Browse the repository at this point in the history
* feat: add flag to set snakemake rerun triggers

* chore: remove references to old coassembly methods

* fix: switch to space seperated trigger setting

* fix: fix delimter for csv files
  • Loading branch information
rroutsong authored Aug 12, 2024
1 parent 17a982b commit 35676e4
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 200 deletions.
102 changes: 57 additions & 45 deletions metamorph
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,36 @@ USAGE:
$ metamorph <command> [OPTIONS]
EXAMPLES:
co-assembly dna-only:
$ metamorph run --coa --input *.R?.fastq.gz --output output
$ metamorph run -C --input *.R?.fastq.gz --output output
per-sample assembly dna-only:
$ metamorph run --input *.R?.fastq.gz --output output
co-assembly rna & dna:
$ metamorph run --coa --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output
$ metamorph run -C --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output
per-sample assembly rna & dna:
$ metamorph run --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output
"""
dna-only:
$ metamorph run --input <sample_sheet> --output <output_dir>
sample sheet:
________
| DNA |
| -------
pair1 | path |
pair2 | path |
--------
rna & dna:
$ metamorph run --input <sample_sheet> --output <output_dir>
sample sheet:
________________
| DNA | RNA |
|---------------|
pair1 | path | path |
pair2 | path | path |
---------------
"""
from __future__ import print_function
from datetime import timezone, datetime
import argparse, sys, os, subprocess, json, textwrap


# Local imports
from src import version
from src.run import init, setup, bind, dryrun, runner, valid_input
from src.utils import (
Colors,
err,
exists,
fatal,
check_cache,
require,
permissions
)
from src.run import init, setup, bind, dryrun, runner
from src.utils import Colors, err, exists, fatal, check_cache, require, \
permissions, valid_trigger, valid_input


# Pipeline Metadata
Expand Down Expand Up @@ -156,7 +155,6 @@ def run(sub_args):
config = config
)
config['bindpaths'] = bindpaths
config['coassembly'] = False

# Step 4b. Setup assembly mode
# modes: 0 - megahit + metaspades assembly
Expand Down Expand Up @@ -190,6 +188,7 @@ def run(sub_args):
if 'databases' in config:
bindpaths.extend([mount['from']+':'+mount['to']+':'+mount['mode'] for mount in config['databases']])

triggers = sub_args.triggers if sub_args.triggers else None
mjob = runner(mode = sub_args.mode,
outdir = sub_args.output,
alt_cache = sub_args.singularity_cache,
Expand All @@ -199,6 +198,7 @@ def run(sub_args):
logger = logfh,
additional_bind_paths = ",".join(bindpaths),
tmp_dir = sub_args.tmp_dir,
triggers = triggers
)

# Step 6. Wait for subprocess to complete,
Expand Down Expand Up @@ -391,7 +391,7 @@ def parsed_arguments(name, description):
module load singularity snakemake
# Step 2A.) Dry-run the pipeline
./{0} run --input .tests/*.R?.fastq.gz \\
./{0} run --input <sample_sheet> \\
--output /data/$USER/output \\
--mode slurm \\
--dry-run
Expand All @@ -400,30 +400,36 @@ def parsed_arguments(name, description):
# The slurm mode will submit jobs to
# the cluster. It is recommended running
# the pipeline in this mode.
./{0} run --input .tests/*.R?.fastq.gz \\
./{0} run --input <sample_sheet> \\
--output /data/$USER/output \\
--mode slurm
# Step 3B.) Run the {0} pipeline in co-assembly fashion
# with slurm
./{0} run --coa --input .tests/*.R?.fastq.gz \\
./{0} run --input .tests/*.R?.fastq.gz \\
--output /data/$USER/output \\
--mode slurm
{2}{3}EXAMPLES:{4}
co-assembly dna-only:
$ metamorph run --coa --input *.R?.fastq.gz --output output
$ metamorph run -C --input *.R?.fastq.gz --output output
per-sample assembly dna-only:
$ metamorph run --input *.R?.fastq.gz --output output
co-assembly rna & dna:
$ metamorph run --coa --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output
$ metamorph run -C --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output
per-sample assembly rna & dna:
$ metamorph run --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output
dna-only:
$ metamorph run --input <sample_sheet> --output <output_dir>
sample sheet:
________
| DNA |
| -------
pair1 | path |
pair2 | path |
--------
rna & dna:
$ metamorph run --input <sample_sheet> --output <output_dir>
sample sheet:
________________
| DNA | RNA |
|---------------|
pair1 | path | path |
pair2 | path | path |
---------------
{2}{3}VERSION:{4}
Expand Down Expand Up @@ -466,9 +472,6 @@ def parsed_arguments(name, description):
action='help',
help=argparse.SUPPRESS
)

# Analysis options
# ... add here

# Orchestration Options
# Execution Method, run locally
Expand All @@ -492,6 +495,16 @@ def parsed_arguments(name, description):
help = argparse.SUPPRESS
)

# Snakemake rerun triggers
subparser_run.add_argument(
'-t', '--triggers',
type = valid_trigger,
required = False,
default = None,
nargs="*",
help = argparse.SUPPRESS
)

# Dry-run
# Do not execute the workflow,
# prints what steps remain
Expand Down Expand Up @@ -744,7 +757,6 @@ def parsed_arguments(name, description):


def main():

# Sanity check for usage
if len(sys.argv) == 1:
# Nothing was provided
Expand Down
65 changes: 11 additions & 54 deletions src/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from __future__ import print_function
from shutil import copytree
from pathlib import Path
from csv import DictReader, Sniffer

import os, re, json, sys, subprocess, argparse


Expand Down Expand Up @@ -664,53 +664,6 @@ def dryrun(outdir, config='config.json', snakefile=os.path.join('workflow', 'Sna
return dryrun_output


def valid_input(sheet):
"""
Valid sample sheets should contain two columns: "DNA" and "RNA"
_________________
| DNA | RNA |
|---------------|
pair1 | path | path |
pair2 | path | path |
"""
# check file permissions
sheet = os.path.abspath(sheet)
if not os.path.exists(sheet):
raise argparse.ArgumentTypeError(f'Sample sheet path {sheet} does not exist!')
if not os.access(sheet, os.R_OK):
raise argparse.ArgumentTypeError(f"Path `{sheet}` exists, but cannot read path due to permissions!")

# check format to make sure it's correct
if sheet.endswith('.tsv') or sheet.endswith('.txt'):
delim = '\t'
elif sheet.endswith('.csv'):
delim = '\t'

rdr = DictReader(open(sheet, 'r'), delimiter=delim)

if 'DNA' not in rdr.fieldnames:
raise argparse.ArgumentTypeError("Sample sheet does not contain `DNA` column")
if 'RNA' not in rdr.fieldnames:
print("-- Running in DNA only mode --")
else:
print("-- Running in paired DNA & RNA mode --")

data = [row for row in rdr]
RNA_included = False
for row in data:
row['DNA'] = os.path.abspath(row['DNA'])
if not os.path.exists(row['DNA']):
raise argparse.ArgumentTypeError(f"Sample sheet path `{row['DNA']}` does not exist")
if 'RNA' in row and not row['RNA'] in ('', None, 'None'):
RNA_included = True
row['RNA'] = os.path.abspath(row['RNA'])
if not os.path.exists(row['RNA']):
raise argparse.ArgumentTypeError(f"Sample sheet path `{row['RNA']}` does not exist")

return data, RNA_included


try:
__job_name__ = 'metamorph_' + os.getlogin() + ':master'
except OSError:
Expand All @@ -726,6 +679,7 @@ def runner(
threads=2,
jobname=__job_name__,
submission_script='run.sh',
triggers=None,
tmp_dir = '/lscratch/$SLURM_JOB_ID/'
):
"""Runs the pipeline via selected executor: local or slurm.
Expand Down Expand Up @@ -833,11 +787,14 @@ def runner(
# --cluster "${CLUSTER_OPTS}" --keep-going --restart-times 3 -j 500 \
# --rerun-incomplete --stats "$3"/logfiles/runtime_statistics.json \
# --keep-remote --local-cores 30 2>&1 | tee -a "$3"/logfiles/master.log
masterjob = subprocess.Popen([
str(submission_script), mode,
'-j', jobname, '-b', str(bindpaths),
'-o', str(outdir), '-c', str(cache),
'-t', "'{}'".format(tmp_dir)
], cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env)
cmd = [
str(submission_script), mode,
'-j', jobname, '-b', str(bindpaths),
'-o', str(outdir), '-c', str(cache),
'-t', "'{}'".format(tmp_dir),
]
if triggers:
cmd.extend(['-r', ','.join(triggers)])
masterjob = subprocess.Popen(cmd, cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env)

return masterjob
24 changes: 20 additions & 4 deletions src/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ USAGE:
-o OUTDIR \\
-j MASTER_JOB_NAME \\
-b SINGULARITY_BIND_PATHS \\
-t TMP_DIR
-t TMP_DIR \\
-r RERUN_TRIGGERS
SYNOPSIS:
This script creates/submits the pipeline's master job to the
cluster. The master job acts as the pipeline's main controller or
Expand Down Expand Up @@ -61,7 +62,12 @@ Required Arguments:
this location. On Biowulf, it should be
set to '/lscratch/\$SLURM_JOBID/'. On FRCE,
this value should be set to the following:
'/scratch/cluster_scratch/\$USER/'.
'/scratch/cluster_scratch/\$USER/'.
-r, --triggers [Type: Str] Snakemake rerun triggers. See
description of flag '--rerun-triggers', at
https://snakemake.readthedocs.io/en/stable/executing/cli.html#all-options
for more details.
Default: code params software_env input mtime
OPTIONS:
-c, --cache [Type: Path] Path to singularity cache. If not provided,
the path will default to the current working
Expand Down Expand Up @@ -97,6 +103,7 @@ function parser() {
-t | --tmp-dir) provided "$key" "${2:-}"; Arguments["t"]="$2"; shift; shift;;
-o | --outdir) provided "$key" "${2:-}"; Arguments["o"]="$2"; shift; shift;;
-c | --cache) provided "$key" "${2:-}"; Arguments["c"]="$2"; shift; shift;;
-r | --triggers) provided "$key" "${2:-}"; Arguments["r"]="${2/,/' '}"; shift; shift;;
-* | --*) err "Error: Failed to parse unsupported argument: '${key}'."; usage && exit 1;;
*) err "Error: Failed to parse unrecognized argument: '${key}'. Do any of your inputs have spaces?"; usage && exit 1;;
esac
Expand Down Expand Up @@ -159,6 +166,7 @@ function submit(){
# INPUT $4 = Singularity Bind paths
# INPUT $5 = Singularity cache directory
# INPUT $6 = Temporary directory for output files
# INPUT $7 = rerun trigger values

# Check if singularity and snakemake are in $PATH
# If not, try to module load singularity as a last resort
Expand Down Expand Up @@ -191,6 +199,9 @@ function submit(){
# --printshellcmds --keep-going --rerun-incomplete
# --keep-remote --restart-times 3 -j 500 --use-singularity
# --singularity-args -B {}.format({bindpaths}) --local-cores 24
triggers="${7:-'code params software_env input mtime'}"
rerun="--rerun-triggers $triggers"

SLURM_DIR="$3/logfiles/slurmfiles"
CLUSTER_OPTS="sbatch --gres {cluster.gres} --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname} -e $SLURM_DIR/slurm-%j_{params.rname}.out -o $SLURM_DIR/slurm-%j_{params.rname}.out"
# Check if NOT running on Biowulf
Expand Down Expand Up @@ -228,6 +239,7 @@ snakemake \\
-s "$3/workflow/Snakefile" \\
-d "$3" \\
--use-singularity \\
$rerun \\
--singularity-args "\\-c \\-B '$4'" \\
--use-envmodules \\
--verbose \\
Expand Down Expand Up @@ -279,9 +291,9 @@ function main(){

# Parses remaining user provided command-line arguments
parser "${@:2}" # Remove first item of list

outdir="$(abspath "$(dirname "${Arguments[o]}")")"
Arguments[o]="${Arguments[o]%/}" # clean outdir path (remove trailing '/')

# Setting defaults for non-required arguments
# If singularity cache not provided, default to ${outdir}/.singularity
cache="${Arguments[o]}/.singularity"
Expand All @@ -294,7 +306,11 @@ function main(){

# Run pipeline and submit jobs to cluster using the defined executor
mkdir -p "${Arguments[o]}/logfiles/"
job_id=$(submit "${Arguments[e]}" "${Arguments[j]}" "${Arguments[o]}" "${Arguments[b]}" "${Arguments[c]}" "${Arguments[t]}")
if [[ ! -v Arguments[r] ]] ; then
job_id=$(submit "${Arguments[e]}" "${Arguments[j]}" "${Arguments[o]}" "${Arguments[b]}" "${Arguments[c]}" "${Arguments[t]}")
else
job_id=$(submit "${Arguments[e]}" "${Arguments[j]}" "${Arguments[o]}" "${Arguments[b]}" "${Arguments[c]}" "${Arguments[t]}" "${Arguments[r]}")
fi
echo -e "[$(date)] Pipeline submitted to cluster.\nMaster Job ID: $job_id"
echo "${job_id}" > "${Arguments[o]}/logfiles/mjobid.log"

Expand Down
Loading

0 comments on commit 35676e4

Please sign in to comment.