diff --git a/.github/workflows/end-to-end-se.yml b/.github/workflows/end-to-end-se.yml new file mode 100644 index 0000000..d723dd6 --- /dev/null +++ b/.github/workflows/end-to-end-se.yml @@ -0,0 +1,32 @@ +name: End-to-end MGS workflow test for single-end run + +on: [pull_request] + +jobs: + test-run-dev-se: + runs-on: ubuntu-latest + timeout-minutes: 10 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + + - name: Set up JDK 11 + uses: actions/setup-java@v4 + with: + java-version: '11' + distribution: 'adopt' + + - name: Setup Nextflow latest (stable) + uses: nf-core/setup-nextflow@v1 + with: + version: "latest" + + - name: Install nf-test + run: | + wget -qO- https://get.nf-test.com | bash + sudo mv nf-test /usr/local/bin/ + + - name: Run run_dev_se workflow + run: nf-test test --tag run_dev_se --verbose \ No newline at end of file diff --git a/.github/workflows/end-to-end.yml b/.github/workflows/end-to-end.yml index a044fea..b9bf7ec 100644 --- a/.github/workflows/end-to-end.yml +++ b/.github/workflows/end-to-end.yml @@ -37,6 +37,7 @@ jobs: - name: Checkout uses: actions/checkout@v4 + - name: Set up JDK 11 uses: actions/setup-java@v4 with: diff --git a/.gitignore b/.gitignore index 9ad9742..c49d0fb 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,4 @@ test/.nextflow* pipeline_report.txt .nf-test/ -.nf-test.log \ No newline at end of file +.nf-test.log diff --git a/bin/generate_samplesheet.sh b/bin/generate_samplesheet.sh index 9d531df..2ea712f 100755 --- a/bin/generate_samplesheet.sh +++ b/bin/generate_samplesheet.sh @@ -1,5 +1,6 @@ #!/bin/bash + set -u set -e @@ -10,10 +11,28 @@ dir_path="" forward_suffix="" reverse_suffix="" s3=0 +single_end=0 output_path="samplesheet.csv" # Default output path group_file="" # Optional parameter for the group file group_across_illumina_lanes=false +# Function to print usage +print_usage() { + echo "Usage:" + echo "For paired-end reads:" + echo " $0 --dir_path --forward_suffix --reverse_suffix [--s3] [--output_path ]" + echo "For single-end reads:" + echo " $0 --dir_path --single_end [--s3] [--output_path ]" + echo + echo "Options:" + echo " --dir_path Directory containing FASTQ files" + echo " --forward_suffix Suffix for forward reads (required for paired-end only)" + echo " --reverse_suffix Suffix for reverse reads (required for paired-end only)" + echo " --single_end Flag for single-end data" + echo " --s3 Flag for S3 bucket access" + echo " --output_path Output path for samplesheet (default: samplesheet.csv)" +} + # Parse command-line arguments while [[ $# -gt 0 ]]; do case $1 in @@ -33,10 +52,18 @@ while [[ $# -gt 0 ]]; do s3=1 shift ;; + --single_end) + single_end=1 + shift + ;; --output_path) output_path="$2" shift 2 ;; + --help) + print_usage + exit 0 + ;; --group_file) # Optional group file group_file="$2" shift 2 @@ -47,20 +74,22 @@ while [[ $# -gt 0 ]]; do ;; *) echo "Unknown option: $1" + print_usage exit 1 ;; esac done # Check if all required parameters are provided -if [[ -z "$dir_path" || -z "$forward_suffix" || -z "$reverse_suffix" ]]; then - echo "Error: dir_path, forward_suffix, and reverse_suffix are required." +if [[ -z "$dir_path" || -z "$single_end" ]]; then + echo "Error: dir_path and single_end are required." echo -e "\nUsage: $0 [options]" echo -e "\nRequired arguments:" echo -e " --dir_path Directory containing FASTQ files" - echo -e " --forward_suffix Suffix identifying forward reads, supports regex (e.g., '_R1_001' or '_1')" - echo -e " --reverse_suffix Suffix identifying reverse reads, supports regex (e.g., '_R2_001' or '_2')" + echo -e " --single_end Flag for single-end data" echo -e "\nOptional arguments:" + echo -e " --forward_suffix When single_end is 0, suffix identifying forward reads, supports regex (e.g., '_R1_001' or '_1')" + echo -e " --reverse_suffix When single_end is 0, suffix identifying reverse reads, supports regex (e.g., '_R2_001' or '_2')" echo -e " --s3 Use if files are stored in S3 bucket" echo -e " --output_path Output path for samplesheet [default: samplesheet.csv]" echo -e " --group_file Path to group file for sample grouping [header column must have the names 'sample,group' in that order; additional columns may be included, however they will be ignored by the script]" @@ -74,15 +103,28 @@ if $group_across_illumina_lanes && [[ -n "$group_file" ]]; then exit 1 fi +if [ $single_end -eq 0 ]; then + # Paired-end validation + if [[ -z "$forward_suffix" || -z "$reverse_suffix" ]]; then + echo "Error: forward_suffix and reverse_suffix are required for paired-end reads." + print_usage + exit 1 + fi +fi + # Display the parameters echo "Parameters:" echo "dir_path: $dir_path" -echo "forward_suffix: $forward_suffix" -echo "reverse_suffix: $reverse_suffix" +echo "single_end: $single_end" echo "s3: $s3" echo "output_path: $output_path" echo "group_file: $group_file" echo "group_across_illumina_lanes: $group_across_illumina_lanes" +if [ $single_end -eq 0 ]; then + echo "forward_suffix: $forward_suffix" + echo "reverse_suffix: $reverse_suffix" +fi + #### EXAMPLES #### @@ -109,7 +151,13 @@ echo "group_across_illumina_lanes: $group_across_illumina_lanes" # Create a temporary file for the initial samplesheet temp_samplesheet=$(mktemp) -echo "sample,fastq_1,fastq_2" > "$temp_samplesheet" +# Create header based on single_end flag +if [ $single_end -eq 0 ]; then + echo "sample,fastq_1,fastq_2" > "$temp_samplesheet" +else + echo "sample,fastq" > "$temp_samplesheet" +fi +echo "group_file: $group_file" # Ensure dir_path ends with a '/' @@ -117,22 +165,31 @@ if [[ "$dir_path" != */ ]]; then dir_path="${dir_path}/" fi -listing=0 - +# Get file listing based on s3 flag if [ $s3 -eq 1 ]; then listing=$(aws s3 ls ${dir_path} | awk '{print $4}') else listing=$(ls ${dir_path} | awk '{print $1}') fi -echo "$listing" | grep "${forward_suffix}\.fastq\.gz$" | while read -r forward_read; do - sample=$(echo "$forward_read" | sed -E "s/${forward_suffix}\.fastq\.gz$//") - reverse_read=$(echo "$listing" | grep "${sample}${reverse_suffix}\.fastq\.gz$") - # If sample + reverse_suffix exists in s3_listing, then add to samplesheet - if [ -n "$reverse_read" ]; then - echo "$sample,${dir_path}${forward_read},${dir_path}${reverse_read}" >> "$temp_samplesheet" - fi -done +# Process files based on single_end flag +if [ $single_end -eq 0 ]; then + # Paired-end processing + echo "$listing" | grep "${forward_suffix}\.fastq\.gz$" | while read -r forward_read; do + sample=$(echo "$forward_read" | sed -E "s/${forward_suffix}\.fastq\.gz$//") + reverse_read=$(echo "$listing" | grep "${sample}${reverse_suffix}\.fastq\.gz$") + # If sample + reverse_suffix exists in s3_listing, then add to samplesheet + if [ -n "$reverse_read" ]; then + echo "$sample,${dir_path}${forward_read},${dir_path}${reverse_read}" >> "$temp_samplesheet" + fi + done +else + # Single-end processing - just process all fastq.gz files + echo "$listing" | grep "\.fastq\.gz$" | while read -r read_file; do + sample=$(echo "$read_file" | sed -E "s/\.fastq\.gz$//") + echo "$sample,${dir_path}${read_file}" >> "$temp_samplesheet" + done +fi # Check if group file is provided if [[ -n "$group_file" ]]; then diff --git a/configs/read_type.config b/configs/read_type.config new file mode 100644 index 0000000..cd55388 --- /dev/null +++ b/configs/read_type.config @@ -0,0 +1,6 @@ +// Universal flags for read type (single-end vs paired-end) + +params { + // Whether the underlying data is paired-end or single-end + single_end = new File(params.sample_sheet).text.readLines()[0].contains('fastq_2') ? false : true +} \ No newline at end of file diff --git a/configs/run.config b/configs/run.config index 9819909..3669ba3 100644 --- a/configs/run.config +++ b/configs/run.config @@ -31,4 +31,5 @@ includeConfig "${projectDir}/configs/containers.config" includeConfig "${projectDir}/configs/resources.config" includeConfig "${projectDir}/configs/profiles.config" includeConfig "${projectDir}/configs/output.config" +includeConfig "${projectDir}/configs/read_type.config" process.queue = "will-batch-queue" // AWS Batch job queue diff --git a/configs/run_dev_se.config b/configs/run_dev_se.config new file mode 100644 index 0000000..39080c0 --- /dev/null +++ b/configs/run_dev_se.config @@ -0,0 +1,34 @@ +/************************************************ +| CONFIGURATION FILE FOR NAO VIRAL MGS WORKFLOW | +************************************************/ + +params { + mode = "run_dev_se" + + // Directories + base_dir = "s3://nao-mgs-simon/test_single_read" // Parent for working and output directories (can be S3) + ref_dir = "s3://nao-mgs-wb/index-20241113/output" // Reference/index directory (generated by index workflow) + + // Files + sample_sheet = "${launchDir}/samplesheet.csv" // Path to library TSV + adapters = "${projectDir}/ref/adapters.fasta" // Path to adapter file for adapter trimming + + // Numerical + grouping = false // Whether to group samples by 'group' column in samplesheet + n_reads_trunc = 0 // Number of reads per sample to run through pipeline (0 = all reads) + n_reads_profile = 1000000 // Number of reads per sample to run through taxonomic profiling + bt2_score_threshold = 20 // Normalized score threshold for HV calling (typically 15 or 20) + blast_hv_fraction = 0 // Fraction of putative HV reads to BLAST vs nt (0 = don't run BLAST) + kraken_memory = "128 GB" // Memory needed to safely load Kraken DB + quality_encoding = "phred33" // FASTQ quality encoding (probably phred33, maybe phred64) + fuzzy_match_alignment_duplicates = 0 // Fuzzy matching the start coordinate of reads for identification of duplicates through alignment (0 = exact matching; options are 0, 1, or 2) + host_taxon = "vertebrate" +} + +includeConfig "${projectDir}/configs/logging.config" +includeConfig "${projectDir}/configs/containers.config" +includeConfig "${projectDir}/configs/resources.config" +includeConfig "${projectDir}/configs/profiles.config" +includeConfig "${projectDir}/configs/output.config" +includeConfig "${projectDir}/configs/read_type.config" +process.queue = "simon-batch-queue" // AWS Batch job queue diff --git a/main.nf b/main.nf index 6ce890c..82420a1 100644 --- a/main.nf +++ b/main.nf @@ -1,6 +1,7 @@ include { RUN } from "./workflows/run" include { RUN_VALIDATION } from "./workflows/run_validation" include { INDEX } from "./workflows/index" +include { RUN_DEV_SE } from "./workflows/run_dev_se" workflow { if (params.mode == "index") { @@ -9,6 +10,8 @@ workflow { RUN() } else if (params.mode == "run_validation") { RUN_VALIDATION() + } else if (params.mode == "run_dev_se") { + RUN_DEV_SE() } } diff --git a/modules/local/fastp/main.nf b/modules/local/fastp/main.nf index beb3de2..bef0525 100644 --- a/modules/local/fastp/main.nf +++ b/modules/local/fastp/main.nf @@ -1,4 +1,4 @@ -process FASTP { +process FASTP_PAIRED { label "max" label "fastp" input: @@ -32,6 +32,41 @@ process FASTP { ''' } +process FASTP_SINGLE { + label "max" + label "fastp" + input: + // reads is a list of two files: forward/reverse reads + tuple val(sample), path(reads) + path(adapters) + output: + tuple val(sample), path("${sample}_fastp.fastq.gz"), emit: reads + tuple val(sample), path("${sample}_fastp_failed.fastq.gz"), emit: failed + tuple val(sample), path("${sample}_fastp.{json,html}"), emit: log + shell: + /* Cleaning not done in CUTADAPT or TRIMMOMATIC: + * Higher quality threshold for sliding window trimming; + * Removing poly-X tails; + * Automatic adapter detection; + * Base correction in overlapping paired-end reads; + * Filter low complexity reads. + */ + ''' + # Define paths and subcommands + of=!{sample}_fastp_failed.fastq.gz + oj=!{sample}_fastp.json + oh=!{sample}_fastp.html + ad=!{adapters} + o=!{sample}_fastp.fastq.gz + io="--in1 !{reads[0]} --out1 ${o} --failed_out ${of} --html ${oh} --json ${oj} --adapter_fasta ${ad}" + par="--cut_front --cut_tail --correction --detect_adapter_for_pe --trim_poly_x --cut_mean_quality 20 --average_qual 20 --qualified_quality_phred 20 --verbose --dont_eval_duplication --thread !{task.cpus} --low_complexity_filter" + # Execute + fastp ${io} ${par} + ''' +} + + + // Run FASTP for adapter trimming but don't trim for quality process FASTP_NOTRIM { label "max" @@ -66,3 +101,4 @@ process FASTP_NOTRIM { fastp ${io} ${par} ''' } + diff --git a/modules/local/summarizeMultiqcPair/main.nf b/modules/local/summarizeMultiqcPair/main.nf index 191d860..18f3fd8 100644 --- a/modules/local/summarizeMultiqcPair/main.nf +++ b/modules/local/summarizeMultiqcPair/main.nf @@ -4,10 +4,11 @@ process SUMMARIZE_MULTIQC_PAIR { label "single" input: tuple val(stage), val(sample), path(multiqc_data) + val(single_end) output: tuple path("${stage}_${sample}_qc_basic_stats.tsv.gz"), path("${stage}_${sample}_qc_adapter_stats.tsv.gz"), path("${stage}_${sample}_qc_quality_base_stats.tsv.gz"), path("${stage}_${sample}_qc_quality_sequence_stats.tsv.gz") shell: ''' - summarize-multiqc-pair.R -i !{multiqc_data} -s !{stage} -S !{sample} -o ${PWD} + summarize-multiqc-pair.R -i !{multiqc_data} -s !{stage} -S !{sample} -r !{single_end} -o ${PWD} ''' -} +} \ No newline at end of file diff --git a/modules/local/summarizeMultiqcPair/resources/usr/bin/summarize-multiqc-pair.R b/modules/local/summarizeMultiqcPair/resources/usr/bin/summarize-multiqc-pair.R index 047ace3..6c1193c 100755 --- a/modules/local/summarizeMultiqcPair/resources/usr/bin/summarize-multiqc-pair.R +++ b/modules/local/summarizeMultiqcPair/resources/usr/bin/summarize-multiqc-pair.R @@ -13,12 +13,24 @@ option_list = list( help="Stage descriptor."), make_option(c("-S", "--sample"), type="character", default=NULL, help="Sample ID."), + make_option(c("-r", "--single_end"), type="character", default=FALSE, + help="Single-end flag."), make_option(c("-o", "--output_dir"), type="character", default=NULL, help="Path to output directory.") ) opt_parser = OptionParser(option_list=option_list); opt = parse_args(opt_parser); +# Convert single_end from string to logical +if (opt$single_end == "true") { + single_end <- TRUE +} else if (opt$single_end == "false") { + single_end <- FALSE +} else { + stop("single_end must be 'true' or 'false'") +} + + # Set input paths multiqc_json_path <- file.path(opt$input_dir, "multiqc_data.json") fastqc_tsv_path <- file.path(opt$input_dir, "multiqc_fastqc.txt") @@ -57,8 +69,19 @@ basic_info_fastqc <- function(fastqc_tsv, multiqc_json){ tab_tsv <- fastqc_tsv %>% mutate(n_bases_approx = process_n_bases(`Total Bases`)) %>% select(n_bases_approx, per_base_sequence_quality:adapter_content) %>% - summarize_all(function(x) paste(x, collapse="/")) %>% - mutate(n_bases_approx = n_bases_approx %>% str_split("/") %>% sapply(as.numeric) %>% colSums()) + summarize_all(function(x) paste(x, collapse="/")) + + if (single_end) { + tab_tsv <- tab_tsv %>% + mutate(n_bases_approx = n_bases_approx %>% as.numeric) + } else { + tab_tsv <- tab_tsv %>% + mutate(n_bases_approx = n_bases_approx %>% + str_split("/") %>% + sapply(as.numeric) %>% + colSums()) + } + # Combine return(bind_cols(tab_json, tab_tsv)) } @@ -86,7 +109,7 @@ extract_adapter_data <- function(multiqc_json){ extract_per_base_quality_single <- function(per_base_quality_dataset){ # Convert a single JSON per-base-quality dataset into a tibble data <- lapply(1:length(per_base_quality_dataset$name), function(n) - per_base_quality_dataset$data[[n]] %>% as.data.frame %>% + per_base_quality_dataset$data[[n]] %>% as.data.frame %>% mutate(file=per_base_quality_dataset$name[n])) %>% bind_rows() %>% as_tibble %>% rename(position=V1, mean_phred_score=V2) @@ -103,7 +126,7 @@ extract_per_base_quality <- function(multiqc_json){ extract_per_sequence_quality_single <- function(per_sequence_quality_dataset){ # Convert a single JSON per-sequence-quality dataset into a tibble data <- lapply(1:length(per_sequence_quality_dataset$name), function(n) - per_sequence_quality_dataset$data[[n]] %>% as.data.frame %>% + per_sequence_quality_dataset$data[[n]] %>% as.data.frame %>% mutate(file=per_sequence_quality_dataset$name[n])) %>% bind_rows() %>% as_tibble %>% rename(mean_phred_score=V1, n_sequences=V2) diff --git a/modules/local/truncateConcat/main.nf b/modules/local/truncateConcat/main.nf index 9c19ae5..75c0172 100644 --- a/modules/local/truncateConcat/main.nf +++ b/modules/local/truncateConcat/main.nf @@ -1,5 +1,5 @@ // Truncate concatenated read files for trial run -process TRUNCATE_CONCAT { +process TRUNCATE_CONCAT_PAIRED { label "single" label "BBTools" input: @@ -18,3 +18,21 @@ process TRUNCATE_CONCAT { zcat !{reads[1]} | head -n ${n_lines} | gzip -c > ${o2} ''' } + +process TRUNCATE_CONCAT_SINGLE { + label "single" + label "BBTools" + input: + tuple val(sample), path(reads) + val n_reads + output: + tuple val(sample), path("${sample}_trunc.fastq.gz"), emit: reads + shell: + ''' + echo "Number of output reads: !{n_reads}" + n_lines=$(expr !{n_reads} \\* 4) + echo "Number of output lines: ${n_lines}" + o=!{sample}_trunc.fastq.gz + zcat !{reads[0]} | head -n ${n_lines} | gzip -c > ${o} + ''' +} \ No newline at end of file diff --git a/subworkflows/local/clean/main.nf b/subworkflows/local/clean/main.nf index 7c5929d..7d62370 100644 --- a/subworkflows/local/clean/main.nf +++ b/subworkflows/local/clean/main.nf @@ -7,7 +7,11 @@ ***************************/ include { QC } from "../../../subworkflows/local/qc" -include { FASTP } from "../../../modules/local/fastp" +if (params.single_end) { + include { FASTP_SINGLE as FASTP } from "../../../modules/local/fastp" +} else { + include { FASTP_PAIRED as FASTP } from "../../../modules/local/fastp" +} /*********** | WORKFLOW | @@ -20,10 +24,11 @@ workflow CLEAN { fastqc_cpus fastqc_mem stage_label + single_end main: fastp_ch = FASTP(reads_ch, adapter_path) - qc_ch = QC(fastp_ch.reads, fastqc_cpus, fastqc_mem, stage_label) + qc_ch = QC(fastp_ch.reads, fastqc_cpus, fastqc_mem, stage_label, single_end) emit: reads = fastp_ch.reads qc = qc_ch.qc -} +} \ No newline at end of file diff --git a/subworkflows/local/loadSampleSheet/main.nf b/subworkflows/local/loadSampleSheet/main.nf new file mode 100644 index 0000000..11b0a45 --- /dev/null +++ b/subworkflows/local/loadSampleSheet/main.nf @@ -0,0 +1,45 @@ +/*********** +| WORKFLOW | +***********/ + +workflow LOAD_SAMPLESHET { + take: + sample_sheet + main: + if (params.single_end) { + if (params.grouping) { + samplesheet = Channel + .fromPath(params.sample_sheet) + .splitCsv(header: true) + .map { row -> tuple(row.sample, file(row.fastq), row.group) } + samplesheet_ch = samplesheet.map { sample, read, group -> tuple(sample, [read]) } + group_ch = samplesheet.map { sample, read, group -> tuple(sample, group) } + } else { + samplesheet = Channel + .fromPath(params.sample_sheet) + .splitCsv(header: true) + .map { row -> tuple(row.sample, file(row.fastq)) } + samplesheet_ch = samplesheet.map { sample, read -> tuple(sample, [read]) } + group_ch = Channel.empty() + } + } else { + if (params.grouping) { + samplesheet = Channel + .fromPath(params.sample_sheet) + .splitCsv(header: true) + .map { row -> tuple(row.sample, file(row.fastq_1), file(row.fastq_2), row.group) } + samplesheet_ch = samplesheet.map { sample, read1, read2, group -> tuple(sample, [read1, read2]) } + group_ch = samplesheet.map { sample, read1, read2, group -> tuple(sample, group) } + } else { + samplesheet = Channel + .fromPath(params.sample_sheet) + .splitCsv(header: true) + .map { row -> tuple(row.sample, file(row.fastq_1), file(row.fastq_2)) } + samplesheet_ch = samplesheet.map { sample, read1, read2 -> tuple(sample, [read1, read2]) } + group_ch = Channel.empty() + } + } + emit: + samplesheet = samplesheet_ch + group = group_ch +} diff --git a/subworkflows/local/qc/main.nf b/subworkflows/local/qc/main.nf index 419a260..8823136 100644 --- a/subworkflows/local/qc/main.nf +++ b/subworkflows/local/qc/main.nf @@ -20,13 +20,14 @@ workflow QC { fastqc_cpus fastqc_mem stage_label + single_end main: - // 1. Run FASTQC on each pair of read files + // 1. Run FASTQC on each read file / pair of read files fastqc_ch = FASTQC_LABELED(reads, fastqc_cpus, fastqc_mem) - // 2. Extract data with MultiQC for each pair of read files + // 2. Extract data with MultiQC for each read file / pair of read files multiqc_ch = MULTIQC_LABELED(stage_label, fastqc_ch.zip) - // 3. Summarize MultiQC information for each pair of read files - process_ch = SUMMARIZE_MULTIQC_PAIR(multiqc_ch.data) + // 3. Summarize MultiQC information for each read file / pair of read files + process_ch = SUMMARIZE_MULTIQC_PAIR(multiqc_ch.data, single_end) // 4. Collate MultiQC outputs multiqc_basic_ch = process_ch.map{ it[0] }.collect().ifEmpty([]) multiqc_adapt_ch = process_ch.map{ it[1] }.collect().ifEmpty([]) diff --git a/subworkflows/local/raw/main.nf b/subworkflows/local/raw/main.nf index 91b02fd..3163b9f 100644 --- a/subworkflows/local/raw/main.nf +++ b/subworkflows/local/raw/main.nf @@ -7,7 +7,11 @@ ***************************/ include { QC } from "../../../subworkflows/local/qc" -include { TRUNCATE_CONCAT } from "../../../modules/local/truncateConcat" +if (params.single_end) { + include { TRUNCATE_CONCAT_SINGLE as TRUNCATE_CONCAT } from "../../../modules/local/truncateConcat" +} else { + include { TRUNCATE_CONCAT_PAIRED as TRUNCATE_CONCAT } from "../../../modules/local/truncateConcat" +} /*********** | WORKFLOW | @@ -20,13 +24,14 @@ workflow RAW { fastqc_cpus fastqc_mem stage_label + single_end main: if ( n_reads_trunc > 0 ) { out_ch = TRUNCATE_CONCAT(samplesheet_ch, n_reads_trunc) } else { out_ch = samplesheet_ch } - qc_ch = QC(out_ch, fastqc_cpus, fastqc_mem, stage_label) + qc_ch = QC(out_ch, fastqc_cpus, fastqc_mem, stage_label, single_end) emit: reads = out_ch qc = qc_ch.qc diff --git a/test-data/nextflow.config b/test-data/nextflow.config index 9819909..3669ba3 100644 --- a/test-data/nextflow.config +++ b/test-data/nextflow.config @@ -31,4 +31,5 @@ includeConfig "${projectDir}/configs/containers.config" includeConfig "${projectDir}/configs/resources.config" includeConfig "${projectDir}/configs/profiles.config" includeConfig "${projectDir}/configs/output.config" +includeConfig "${projectDir}/configs/read_type.config" process.queue = "will-batch-queue" // AWS Batch job queue diff --git a/test-data/single-end-samplesheet.csv b/test-data/single-end-samplesheet.csv new file mode 100644 index 0000000..5b6a53f --- /dev/null +++ b/test-data/single-end-samplesheet.csv @@ -0,0 +1,2 @@ +sample,fastq +230926Esv_D23-14904-1,s3://nao-testing/gold-standard-test/raw/gold_standard_R1.fastq.gz \ No newline at end of file diff --git a/tests/main.nf.test b/tests/main.nf.test index 5156cf9..925af3c 100644 --- a/tests/main.nf.test +++ b/tests/main.nf.test @@ -19,6 +19,14 @@ nextflow_pipeline { assert workflow.success } } + test("Test single-end run workflow") { + config "tests/run_dev_se.config" + tag "run_dev_se" + + then { + assert workflow.success + } + } test("Test validation workflow") { config "tests/run_validation.config" tag "validation" diff --git a/tests/run.config b/tests/run.config index fc15787..a80203b 100644 --- a/tests/run.config +++ b/tests/run.config @@ -30,4 +30,5 @@ params { includeConfig "${projectDir}/configs/containers.config" includeConfig "${projectDir}/configs/profiles.config" +includeConfig "${projectDir}/configs/read_type.config" includeConfig "${projectDir}/configs/output.config" \ No newline at end of file diff --git a/tests/run_dev_se.config b/tests/run_dev_se.config new file mode 100644 index 0000000..4090dab --- /dev/null +++ b/tests/run_dev_se.config @@ -0,0 +1,31 @@ +/************************************************ +| CONFIGURATION FILE FOR NAO VIRAL MGS WORKFLOW | +************************************************/ + +params { + mode = "run_dev_se" + + // Directories + base_dir = "./" // Parent for working and output directories (can be S3) + ref_dir = "s3://nao-testing/index-test/output" // Reference/index directory (generated by index workflow) + + // Files + sample_sheet = "${projectDir}/test-data/single-end-samplesheet.csv" // Path to library TSV + adapters = "${projectDir}/ref/adapters.fasta" // Path to adapter file for adapter trimming + + // Numerical + grouping = false // Whether to group samples by 'group' column in samplesheet + n_reads_trunc = 0 // Number of reads per sample to run through pipeline (0 = all reads) + n_reads_profile = 1000000 // Number of reads per sample to run through taxonomic profiling + bt2_score_threshold = 20 // Normalized score threshold for HV calling (typically 15 or 20) + blast_hv_fraction = 0 // Fraction of putative HV reads to BLAST vs nt (0 = don't run BLAST) + kraken_memory = "128 GB" // Memory needed to safely load Kraken DB + quality_encoding = "phred33" // FASTQ quality encoding (probably phred33, maybe phred64) + fuzzy_match_alignment_duplicates = 0 // Fuzzy matching the start coordinate of reads for identification of duplicates through alignment (0 = exact matching; options are 0, 1, or 2) + host_taxon = "vertebrate" +} + +includeConfig "${projectDir}/configs/containers.config" +includeConfig "${projectDir}/configs/profiles.config" +includeConfig "${projectDir}/configs/read_type.config" +includeConfig "${projectDir}/configs/output.config" diff --git a/workflows/run.nf b/workflows/run.nf index 899bef2..df48b17 100644 --- a/workflows/run.nf +++ b/workflows/run.nf @@ -16,6 +16,7 @@ include { BLAST_VIRAL } from "../subworkflows/local/blastViral" include { PROFILE } from "../subworkflows/local/profile" include { PROCESS_OUTPUT } from "../subworkflows/local/processOutput" include { EXTRACT_RAW_READS_FROM_PROCESSED } from "../modules/local/extractRawReadsFromProcessed" +include { LOAD_SAMPLESHET } from "../subworkflows/local/loadSampleSheet" nextflow.preview.output = true /***************** @@ -40,25 +41,14 @@ workflow RUN { } } - // Prepare samplesheet - if ( params.grouping ) { - samplesheet = Channel - .fromPath(params.sample_sheet) - .splitCsv(header: true) - .map { row -> tuple(row.sample, file(row.fastq_1), file(row.fastq_2), row.group) } - samplesheet_ch = samplesheet.map { sample, read1, read2, group -> tuple(sample, [read1, read2]) } - group_ch = samplesheet.map { sample, read1, read2, group -> tuple(sample, group) } - } else { - samplesheet = Channel - .fromPath(params.sample_sheet) - .splitCsv(header: true) - .map { row -> tuple(row.sample, file(row.fastq_1), file(row.fastq_2)) } - samplesheet_ch = samplesheet.map { sample, read1, read2 -> tuple(sample, [read1, read2]) } - group_ch = Channel.empty() - } + // Load samplesheet + LOAD_SAMPLESHET(params.sample_sheet) + samplesheet_ch = LOAD_SAMPLESHET.out.samplesheet + group_ch = LOAD_SAMPLESHET.out.group + // Preprocessing - RAW(samplesheet_ch, params.n_reads_trunc, "2", "4 GB", "raw_concat") - CLEAN(RAW.out.reads, params.adapters, "2", "4 GB", "cleaned") + RAW(samplesheet_ch, params.n_reads_trunc, "2", "4 GB", "raw_concat", params.single_end) + CLEAN(RAW.out.reads, params.adapters, "2", "4 GB", "cleaned", params.single_end) // Extract and count human-viral reads EXTRACT_VIRAL_READS(CLEAN.out.reads, group_ch, params.ref_dir, kraken_db_path, params.bt2_score_threshold, params.adapters, params.host_taxon, "1", "24", "viral", "${params.quality_encoding}", "${params.fuzzy_match_alignment_duplicates}", params.grouping) // Process intermediate output for chimera detection diff --git a/workflows/run_dev_se.nf b/workflows/run_dev_se.nf new file mode 100644 index 0000000..e8e7fa9 --- /dev/null +++ b/workflows/run_dev_se.nf @@ -0,0 +1,76 @@ +/*********************************************************************************************** +| WORKFLOW: PREPROCESSING ON SHORT-READ MGS DATA (EITHER SINGLE-END OR PAIRED-END) | +***********************************************************************************************/ + +import groovy.json.JsonOutput +import java.time.LocalDateTime + +/*************************** +| MODULES AND SUBWORKFLOWS | +***************************/ + +include { RAW } from "../subworkflows/local/raw" +include { CLEAN } from "../subworkflows/local/clean" +include { PROCESS_OUTPUT } from "../subworkflows/local/processOutput" +include { LOAD_SAMPLESHET } from "../subworkflows/local/loadSampleSheet" +nextflow.preview.output = true + +/***************** +| MAIN WORKFLOWS | +*****************/ + +// Complete primary workflow +workflow RUN_DEV_SE { + // Start time + start_time = new Date() + start_time_str = start_time.format("YYYY-MM-dd HH:mm:ss z (Z)") + + // Check if grouping column exists in samplesheet + check_grouping = new File(params.sample_sheet).text.readLines()[0].contains('group') ? true : false + if (params.grouping != check_grouping) { + if (params.grouping && !check_grouping) { + throw new Exception("Grouping enabled in config file, but group column absent from samplesheet.") + } else if (!params.grouping && check_grouping) { + throw new Exception("Grouping is not enabled in config file, but group column is present in the samplesheet.") + } + } + + // Load samplesheet + LOAD_SAMPLESHET(params.sample_sheet) + samplesheet_ch = LOAD_SAMPLESHET.out.samplesheet + group_ch = LOAD_SAMPLESHET.out.group + + // Preprocessing + RAW(samplesheet_ch, params.n_reads_trunc, "2", "4 GB", "raw_concat", params.single_end) + CLEAN(RAW.out.reads, params.adapters, "2", "4 GB", "cleaned", params.single_end) + + // Process output + qc_ch = RAW.out.qc.concat(CLEAN.out.qc) + PROCESS_OUTPUT(qc_ch) + + // Publish results + params_str = JsonOutput.prettyPrint(JsonOutput.toJson(params)) + params_ch = Channel.of(params_str).collectFile(name: "run-params.json") + time_ch = Channel.of(start_time_str + "\n").collectFile(name: "time.txt") + version_ch = Channel.fromPath("${projectDir}/pipeline-version.txt") + index_params_ch = Channel.fromPath("${params.ref_dir}/input/index-params.json") + .map { file -> file.copyTo("${params.base_dir}/work/params-index.json") } + index_pipeline_version_ch = Channel.fromPath("${params.ref_dir}/logging/pipeline-version.txt") + .map { file -> file.copyTo("${params.base_dir}/work/pipeline-version-index.txt") } + publish: + // Saved inputs + index_params_ch >> "input" + index_pipeline_version_ch >> "logging" + Channel.fromPath(params.sample_sheet) >> "input" + Channel.fromPath(params.adapters) >> "input" + params_ch >> "input" + time_ch >> "logging" + version_ch >> "logging" + // Intermediate files + CLEAN.out.reads >> "intermediates/reads/cleaned" + // QC + PROCESS_OUTPUT.out.basic >> "results" + PROCESS_OUTPUT.out.adapt >> "results" + PROCESS_OUTPUT.out.qbase >> "results" + PROCESS_OUTPUT.out.qseqs >> "results" +}