Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add flag to set snakemake rerun triggers #27

Merged
merged 4 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 57 additions & 44 deletions metamorph
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,36 @@ USAGE:
$ metamorph <command> [OPTIONS]

EXAMPLES:
co-assembly dna-only:
$ metamorph run --coa --input *.R?.fastq.gz --output output
$ metamorph run -C --input *.R?.fastq.gz --output output

per-sample assembly dna-only:
$ metamorph run --input *.R?.fastq.gz --output output

co-assembly rna & dna:
$ metamorph run --coa --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output
$ metamorph run -C --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output

per-sample assembly rna & dna:
$ metamorph run --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output
"""
dna-only:
$ metamorph run --input <sample_sheet> --output <output_dir>
sample sheet:
________
| DNA |
| -------
pair1 | path |
pair2 | path |
--------

rna & dna:
$ metamorph run --input <sample_sheet> --output <output_dir>
sample sheet:
________________
| DNA | RNA |
|---------------|
pair1 | path | path |
pair2 | path | path |
---------------
"""
from __future__ import print_function
from datetime import timezone, datetime
import argparse, sys, os, subprocess, json, textwrap


# Local imports
from src import version
from src.run import init, setup, bind, dryrun, runner, valid_input
from src.utils import (
Colors,
err,
exists,
fatal,
check_cache,
require,
permissions
)
from src.run import init, setup, bind, dryrun, runner
from src.utils import Colors, err, exists, fatal, check_cache, require, \
permissions, valid_trigger, valid_input


# Pipeline Metadata
Expand Down Expand Up @@ -156,7 +155,6 @@ def run(sub_args):
config = config
)
config['bindpaths'] = bindpaths
config['coassembly'] = False

# Step 4b. Setup assembly mode
# modes: 0 - megahit + metaspades assembly
Expand Down Expand Up @@ -190,6 +188,7 @@ def run(sub_args):
if 'databases' in config:
bindpaths.extend([mount['from']+':'+mount['to']+':'+mount['mode'] for mount in config['databases']])

triggers = sub_args.triggers if sub_args.triggers else None
mjob = runner(mode = sub_args.mode,
outdir = sub_args.output,
alt_cache = sub_args.singularity_cache,
Expand All @@ -199,6 +198,7 @@ def run(sub_args):
logger = logfh,
additional_bind_paths = ",".join(bindpaths),
tmp_dir = sub_args.tmp_dir,
triggers = triggers
)

# Step 6. Wait for subprocess to complete,
Expand Down Expand Up @@ -391,7 +391,7 @@ def parsed_arguments(name, description):
module load singularity snakemake

# Step 2A.) Dry-run the pipeline
./{0} run --input .tests/*.R?.fastq.gz \\
./{0} run --input <sample_sheet> \\
--output /data/$USER/output \\
--mode slurm \\
--dry-run
Expand All @@ -400,30 +400,36 @@ def parsed_arguments(name, description):
# The slurm mode will submit jobs to
# the cluster. It is recommended running
# the pipeline in this mode.
./{0} run --input .tests/*.R?.fastq.gz \\
./{0} run --input <sample_sheet> \\
--output /data/$USER/output \\
--mode slurm

# Step 3B.) Run the {0} pipeline in co-assembly fashion
# with slurm
./{0} run --coa --input .tests/*.R?.fastq.gz \\
./{0} run --input .tests/*.R?.fastq.gz \\
--output /data/$USER/output \\
--mode slurm

{2}{3}EXAMPLES:{4}
co-assembly dna-only:
$ metamorph run --coa --input *.R?.fastq.gz --output output
$ metamorph run -C --input *.R?.fastq.gz --output output

per-sample assembly dna-only:
$ metamorph run --input *.R?.fastq.gz --output output

co-assembly rna & dna:
$ metamorph run --coa --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output
$ metamorph run -C --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output

per-sample assembly rna & dna:
$ metamorph run --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output
dna-only:
$ metamorph run --input <sample_sheet> --output <output_dir>
sample sheet:
________
| DNA |
| -------
pair1 | path |
pair2 | path |
--------

rna & dna:
$ metamorph run --input <sample_sheet> --output <output_dir>
sample sheet:
________________
| DNA | RNA |
|---------------|
pair1 | path | path |
pair2 | path | path |
---------------


{2}{3}VERSION:{4}
Expand Down Expand Up @@ -466,9 +472,6 @@ def parsed_arguments(name, description):
action='help',
help=argparse.SUPPRESS
)

# Analysis options
# ... add here

# Orchestration Options
# Execution Method, run locally
Expand All @@ -492,6 +495,16 @@ def parsed_arguments(name, description):
help = argparse.SUPPRESS
)

# Snakemake rerun triggers
subparser_run.add_argument(
'-t', '--triggers',
type = valid_trigger,
required = False,
default = None,
nargs="*",
help = argparse.SUPPRESS
)

# Dry-run
# Do not execute the workflow,
# prints what steps remain
Expand Down
65 changes: 11 additions & 54 deletions src/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from __future__ import print_function
from shutil import copytree
from pathlib import Path
from csv import DictReader, Sniffer

import os, re, json, sys, subprocess, argparse


Expand Down Expand Up @@ -664,53 +664,6 @@ def dryrun(outdir, config='config.json', snakefile=os.path.join('workflow', 'Sna
return dryrun_output


def valid_input(sheet):
"""
Valid sample sheets should contain two columns: "DNA" and "RNA"

_________________
| DNA | RNA |
|---------------|
pair1 | path | path |
pair2 | path | path |
"""
# check file permissions
sheet = os.path.abspath(sheet)
if not os.path.exists(sheet):
raise argparse.ArgumentTypeError(f'Sample sheet path {sheet} does not exist!')
if not os.access(sheet, os.R_OK):
raise argparse.ArgumentTypeError(f"Path `{sheet}` exists, but cannot read path due to permissions!")

# check format to make sure it's correct
if sheet.endswith('.tsv') or sheet.endswith('.txt'):
delim = '\t'
elif sheet.endswith('.csv'):
delim = '\t'

rdr = DictReader(open(sheet, 'r'), delimiter=delim)

if 'DNA' not in rdr.fieldnames:
raise argparse.ArgumentTypeError("Sample sheet does not contain `DNA` column")
if 'RNA' not in rdr.fieldnames:
print("-- Running in DNA only mode --")
else:
print("-- Running in paired DNA & RNA mode --")

data = [row for row in rdr]
RNA_included = False
for row in data:
row['DNA'] = os.path.abspath(row['DNA'])
if not os.path.exists(row['DNA']):
raise argparse.ArgumentTypeError(f"Sample sheet path `{row['DNA']}` does not exist")
if 'RNA' in row and not row['RNA'] in ('', None, 'None'):
RNA_included = True
row['RNA'] = os.path.abspath(row['RNA'])
if not os.path.exists(row['RNA']):
raise argparse.ArgumentTypeError(f"Sample sheet path `{row['RNA']}` does not exist")

return data, RNA_included


try:
__job_name__ = 'metamorph_' + os.getlogin() + ':master'
except OSError:
Expand All @@ -726,6 +679,7 @@ def runner(
threads=2,
jobname=__job_name__,
submission_script='run.sh',
triggers=None,
tmp_dir = '/lscratch/$SLURM_JOB_ID/'
):
"""Runs the pipeline via selected executor: local or slurm.
Expand Down Expand Up @@ -833,11 +787,14 @@ def runner(
# --cluster "${CLUSTER_OPTS}" --keep-going --restart-times 3 -j 500 \
# --rerun-incomplete --stats "$3"/logfiles/runtime_statistics.json \
# --keep-remote --local-cores 30 2>&1 | tee -a "$3"/logfiles/master.log
masterjob = subprocess.Popen([
str(submission_script), mode,
'-j', jobname, '-b', str(bindpaths),
'-o', str(outdir), '-c', str(cache),
'-t', "'{}'".format(tmp_dir)
], cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env)
cmd = [
str(submission_script), mode,
'-j', jobname, '-b', str(bindpaths),
'-o', str(outdir), '-c', str(cache),
'-t', "'{}'".format(tmp_dir),
]
if triggers:
cmd.extend(['-r', ','.join(triggers)])
masterjob = subprocess.Popen(cmd, cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env)

return masterjob
26 changes: 22 additions & 4 deletions src/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ USAGE:
-o OUTDIR \\
-j MASTER_JOB_NAME \\
-b SINGULARITY_BIND_PATHS \\
-t TMP_DIR
-t TMP_DIR \\
-r RERUN_TRIGGERS
SYNOPSIS:
This script creates/submits the pipeline's master job to the
cluster. The master job acts as the pipeline's main controller or
Expand Down Expand Up @@ -61,7 +62,12 @@ Required Arguments:
this location. On Biowulf, it should be
set to '/lscratch/\$SLURM_JOBID/'. On FRCE,
this value should be set to the following:
'/scratch/cluster_scratch/\$USER/'.
'/scratch/cluster_scratch/\$USER/'.
-r, --triggers [Type: Str] Snakemake rerun triggers. See
description of flag '--rerun-triggers', at
https://snakemake.readthedocs.io/en/stable/executing/cli.html#all-options
for more details.
Default: code params software_env input mtime
OPTIONS:
-c, --cache [Type: Path] Path to singularity cache. If not provided,
the path will default to the current working
Expand Down Expand Up @@ -97,6 +103,7 @@ function parser() {
-t | --tmp-dir) provided "$key" "${2:-}"; Arguments["t"]="$2"; shift; shift;;
-o | --outdir) provided "$key" "${2:-}"; Arguments["o"]="$2"; shift; shift;;
-c | --cache) provided "$key" "${2:-}"; Arguments["c"]="$2"; shift; shift;;
-r | --triggers) provided "$key" "${2:-}"; Arguments["r"]="$2"; shift; shift;;
-* | --*) err "Error: Failed to parse unsupported argument: '${key}'."; usage && exit 1;;
*) err "Error: Failed to parse unrecognized argument: '${key}'. Do any of your inputs have spaces?"; usage && exit 1;;
esac
Expand Down Expand Up @@ -159,6 +166,7 @@ function submit(){
# INPUT $4 = Singularity Bind paths
# INPUT $5 = Singularity cache directory
# INPUT $6 = Temporary directory for output files
# INPUT $7 = rerun trigger values

# Check if singularity and snakemake are in $PATH
# If not, try to module load singularity as a last resort
Expand Down Expand Up @@ -191,6 +199,11 @@ function submit(){
# --printshellcmds --keep-going --rerun-incomplete
# --keep-remote --restart-times 3 -j 500 --use-singularity
# --singularity-args -B {}.format({bindpaths}) --local-cores 24
triggers="${7:-'{code,params,software_env,input,mtime}'}"
skchronicles marked this conversation as resolved.
Show resolved Hide resolved
if [[ ! ${triggers:0:1} == "{" ]] ; then triggers="{$triggers"; fi
if [[ ! ${triggers:0-1} == "}" ]] ; then triggers+='}'; fi
rerun="--rerun-triggers $triggers"

SLURM_DIR="$3/logfiles/slurmfiles"
CLUSTER_OPTS="sbatch --gres {cluster.gres} --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname} -e $SLURM_DIR/slurm-%j_{params.rname}.out -o $SLURM_DIR/slurm-%j_{params.rname}.out"
# Check if NOT running on Biowulf
Expand Down Expand Up @@ -228,6 +241,7 @@ snakemake \\
-s "$3/workflow/Snakefile" \\
-d "$3" \\
--use-singularity \\
$rerun \\
--singularity-args "\\-c \\-B '$4'" \\
--use-envmodules \\
--verbose \\
Expand Down Expand Up @@ -279,9 +293,9 @@ function main(){

# Parses remaining user provided command-line arguments
parser "${@:2}" # Remove first item of list

outdir="$(abspath "$(dirname "${Arguments[o]}")")"
Arguments[o]="${Arguments[o]%/}" # clean outdir path (remove trailing '/')

# Setting defaults for non-required arguments
# If singularity cache not provided, default to ${outdir}/.singularity
cache="${Arguments[o]}/.singularity"
Expand All @@ -294,7 +308,11 @@ function main(){

# Run pipeline and submit jobs to cluster using the defined executor
mkdir -p "${Arguments[o]}/logfiles/"
job_id=$(submit "${Arguments[e]}" "${Arguments[j]}" "${Arguments[o]}" "${Arguments[b]}" "${Arguments[c]}" "${Arguments[t]}")
if [[ ! -v Arguments[r] ]] ; then
job_id=$(submit "${Arguments[e]}" "${Arguments[j]}" "${Arguments[o]}" "${Arguments[b]}" "${Arguments[c]}" "${Arguments[t]}")
else
job_id=$(submit "${Arguments[e]}" "${Arguments[j]}" "${Arguments[o]}" "${Arguments[b]}" "${Arguments[c]}" "${Arguments[t]}" "${Arguments[r]}")
fi
echo -e "[$(date)] Pipeline submitted to cluster.\nMaster Job ID: $job_id"
echo "${job_id}" > "${Arguments[o]}/logfiles/mjobid.log"

Expand Down
Loading
Loading