diff --git a/CHANGELOG.md b/CHANGELOG.md index f78febaa..fa13fa78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,14 +1,15 @@ # v2.5.3 (in progress) -- Added new LOAD_SAMPLESHEET subworkflow to centralize samplesheet processing +- Added new LOAD_SAMPLESHEET subworkflow to centralize samplesheet processing - Testing infrastructure - Split up the tests in `End-to-end MGS workflow test` so that they can be run in parallel on Github Actions. - Implemented an end-to-end test that checks if the RUN workflow produces the correct output. The correct output for the test has been saved in `test-data/gold-standard-results` so that the user can diff the output of their test with the correct output to check where their pipeline might be failing. - Began development of single-end read processing (still in progress) - - Restructured RAW, CLEAN, and QC workflows to handle both single-end and paired-end reads - - Added new FASTP_SINGLE and TRUNCATE_CONCAT_SINGLE processes to handle single-end reads + - Restructured RAW, CLEAN, QC, TAXONOMY, and PROFILE workflows to handle both single-end and paired-end reads + - Added new FASTP_SINGLE, TRUNCATE_CONCAT_SINGLE, BBDUK_SINGLE, CONCAT_GROUP_SINGLE, SUBSET_READS_SINGLE and SUBSET_READS_SINGLE_TARGET processes to handle single-end reads - Created separate end-to-end test workflow for single-end processing (which will be removed once single-end processing is fully integrated) - Modified samplesheet handling to support both single-end and paired-end data - Updated generate_samplesheet.sh to handle single-end data with --single_end flag + - - Added read_type.config to handle single-end vs paired-end settings (set automatically based on samplesheet format) - Created run_dev_se.config and run_dev_se.nf for single-end development testing (which will be removed once single-end processing is fully integrated) - Added single-end samplesheet to test-data diff --git a/modules/local/bbduk/main.nf b/modules/local/bbduk/main.nf index df19e871..a6c3b210 100644 --- a/modules/local/bbduk/main.nf +++ b/modules/local/bbduk/main.nf @@ -1,5 +1,5 @@ // Detection and removal of contaminant reads -process BBDUK { +process BBDUK_PAIRED { label "large" label "BBTools" input: @@ -31,6 +31,35 @@ process BBDUK { ''' } +process BBDUK_SINGLE { + label "large" + label "BBTools" + input: + tuple val(sample), path(reads) + path(contaminant_ref) + val(min_kmer_fraction) + val(k) + val(suffix) + output: + tuple val(sample), path("${sample}_${suffix}_bbduk_pass.fastq.gz"), emit: reads + tuple val(sample), path("${sample}_${suffix}_bbduk_fail.fastq.gz"), emit: fail + tuple val(sample), path("${sample}_${suffix}_bbduk.stats.txt"), emit: log + shell: + ''' + # Define input/output + in=!{reads} + op=!{sample}_!{suffix}_bbduk_pass.fastq.gz + of=!{sample}_!{suffix}_bbduk_fail.fastq.gz + stats=!{sample}_!{suffix}_bbduk.stats.txt + ref=!{contaminant_ref} + io="in=${in} ref=${ref} out=${op} outm=${of} stats=${stats}" + # Define parameters + par="minkmerfraction=!{min_kmer_fraction} k=!{k} t=!{task.cpus} -Xmx!{task.memory.toGiga()}g" + # Execute + bbduk.sh ${io} ${par} + ''' +} + // Detection and removal of contaminant reads (use minkmerhits instead of minkmerfraction) process BBDUK_HITS { label "large" diff --git a/modules/local/concatGroup/main.nf b/modules/local/concatGroup/main.nf index f81b45d0..5bb687f3 100644 --- a/modules/local/concatGroup/main.nf +++ b/modules/local/concatGroup/main.nf @@ -1,5 +1,5 @@ // Copy a file to a new location with a custom path -process CONCAT_GROUP { +process CONCAT_GROUP_PAIRED { label "coreutils" label "single" input: @@ -14,3 +14,19 @@ process CONCAT_GROUP { cat ${fastq_2_list.join(' ')} > ${group}_R2.fastq.gz """ } + + +process CONCAT_GROUP_SINGLE { + label "base" + label "single" + input: + tuple val(samples), path(fastq_list), val(group) + + output: + tuple val(group), path("${group}.fastq.gz") + + script: + """ + cat ${fastq_list.join(' ')} > ${group}.fastq.gz + """ +} diff --git a/modules/local/subsetReads/main.nf b/modules/local/subsetReads/main.nf index 864d575f..91b8f53d 100644 --- a/modules/local/subsetReads/main.nf +++ b/modules/local/subsetReads/main.nf @@ -26,6 +26,31 @@ process SUBSET_READS_PAIRED { ''' } +// Subsample reads with seqtk (single-end) +process SUBSET_READS_SINGLE { + label "seqtk" + label "single" + input: + tuple val(sample), path(reads) + val readFraction + val suffix + output: + tuple val(sample), path("${sample}_subset.${suffix}.gz") + shell: + ''' + # Define input/output + in=!{reads} + out=!{sample}_subset.!{suffix}.gz + # Count reads for validation + echo "Input reads: $(zcat ${in} | wc -l | awk '{ print $1/4 }')" + # Carry out subsetting + seed=${RANDOM} + seqtk sample -s ${seed} ${in} !{readFraction} | gzip -c > ${out} + # Count reads for validation + echo "Output reads: $(zcat ${out} | wc -l | awk '{ print $1/4 }')" + ''' +} + // Subsample reads with seqtk (no sample name) process SUBSET_READS_PAIRED_MERGED { label "seqtk" @@ -54,7 +79,7 @@ process SUBSET_READS_PAIRED_MERGED { ''' } -// Subsample reads with seqtk with an autocomputed read fraction +// Subsample reads with seqtk with an autocomputed read fraction (paired-end) process SUBSET_READS_PAIRED_TARGET { label "seqtk" label "single" @@ -91,3 +116,37 @@ process SUBSET_READS_PAIRED_TARGET { echo "Output reads: $(zcat ${out1} | wc -l | awk '{ print $1/4 }')" ''' } + +// Subsample reads with seqtk with an autocomputed read fraction (single-end) +process SUBSET_READS_SINGLE_TARGET { + label "seqtk" + label "single" + input: + tuple val(sample), path(reads) + val readTarget + val suffix + output: + tuple val(sample), path("${sample}_subset.${suffix}.gz") + shell: + ''' + # Define input/output + in=!{reads} + out=!{sample}_subset.!{suffix}.gz + # Count reads and compute target fraction + n_reads=$(zcat ${in} | wc -l | awk '{ print $1/4 }') + echo "Input reads: ${n_reads}" + echo "Target reads: !{readTarget}" + if (( ${n_reads} <= !{readTarget} )); then + echo "Target larger than input; returning all reads." + cp ${in} ${out} + else + frac=$(awk -v a=${n_reads} -v b=!{readTarget} 'BEGIN {result = b/a; print (result > 1) ? 1.0 : result}') + echo "Read fraction for subsetting: ${frac}" + # Carry out subsetting + seed=${RANDOM} + seqtk sample -s ${seed} ${in} ${frac} | gzip -c > ${out} + fi + # Count reads for validation + echo "Output reads: $(zcat ${out} | wc -l | awk '{ print $1/4 }')" + ''' +} diff --git a/subworkflows/local/extractViralReads/main.nf b/subworkflows/local/extractViralReads/main.nf index 44e45510..6520211e 100644 --- a/subworkflows/local/extractViralReads/main.nf +++ b/subworkflows/local/extractViralReads/main.nf @@ -27,7 +27,11 @@ include { COLLAPSE_VIRUS_READS } from "../../../modules/local/collapseVirusReads include { ADD_FRAG_DUP_TO_VIRUS_READS } from "../../../modules/local/addFragDupToVirusReads" include { MAKE_VIRUS_READS_FASTA } from "../../../modules/local/makeVirusReadsFasta" include { COUNT_VIRUS_CLADES } from "../../../modules/local/countVirusClades" -include { CONCAT_GROUP } from "../../../modules/local/concatGroup" +if (params.single_end) { + include { CONCAT_GROUP_SINGLE as CONCAT_GROUP } from "../../../modules/local/concatGroup" +} else { + include { CONCAT_GROUP_PAIRED as CONCAT_GROUP } from "../../../modules/local/concatGroup" +} /*********** | WORKFLOW | @@ -48,6 +52,7 @@ workflow EXTRACT_VIRAL_READS { encoding fuzzy_match grouping + single_end main: // Get reference paths viral_genome_path = "${ref_dir}/results/virus-genomes-filtered.fasta.gz" @@ -90,7 +95,7 @@ workflow EXTRACT_VIRAL_READS { human_bbm_ch = BBMAP_HUMAN(other_bt2_ch.reads_unconc, bbm_human_index_path, "human") other_bbm_ch = BBMAP_OTHER(human_bbm_ch.reads_unmapped, bbm_other_index_path, "other") // Run Kraken on filtered viral candidates - tax_ch = TAXONOMY(other_bbm_ch.reads_unmapped, kraken_db_ch, true, "F") + tax_ch = TAXONOMY(other_bbm_ch.reads_unmapped, kraken_db_ch, true, "F", single_end) // Process Kraken output and merge with Bowtie2 output across samples kraken_output_ch = PROCESS_KRAKEN_VIRAL(tax_ch.kraken_output, virus_db_path, host_taxon) bowtie2_kraken_merged_ch = MERGE_SAM_KRAKEN(kraken_output_ch.combine(bowtie2_sam_ch, by: 0)) diff --git a/subworkflows/local/profile/main.nf b/subworkflows/local/profile/main.nf index efb4cbb3..e0bc1db3 100644 --- a/subworkflows/local/profile/main.nf +++ b/subworkflows/local/profile/main.nf @@ -6,12 +6,23 @@ | MODULES AND SUBWORKFLOWS | ***************************/ -include { SUBSET_READS_PAIRED_TARGET; SUBSET_READS_PAIRED_TARGET as SUBSET_READS_PAIRED_TARGET_GROUP } from "../../../modules/local/subsetReads" -include { BBDUK } from "../../../modules/local/bbduk" + +if (params.single_end) { + include { SUBSET_READS_SINGLE_TARGET as SUBSET_READS_TARGET } from "../../../modules/local/subsetReads" + include { BBDUK_SINGLE as BBDUK } from "../../../modules/local/bbduk" + include { CONCAT_GROUP_SINGLE as CONCAT_GROUP } from "../../../modules/local/concatGroup" + include { SUBSET_READS_SINGLE_TARGET; SUBSET_READS_SINGLE_TARGET as SUBSET_READS_TARGET_GROUP } from "../../../modules/local/subsetReads" +} else { + include { SUBSET_READS_PAIRED_TARGET as SUBSET_READS_TARGET } from "../../../modules/local/subsetReads" + include { SUBSET_READS_PAIRED_TARGET; SUBSET_READS_PAIRED_TARGET as SUBSET_READS_TARGET_GROUP } from "../../../modules/local/subsetReads" + include { BBDUK_PAIRED as BBDUK } from "../../../modules/local/bbduk" + include { CONCAT_GROUP_PAIRED as CONCAT_GROUP } from "../../../modules/local/concatGroup" +} + +include { BBDUK_HITS } from "../../../modules/local/bbduk" include { TAXONOMY as TAXONOMY_RIBO } from "../../../subworkflows/local/taxonomy" include { TAXONOMY as TAXONOMY_NORIBO } from "../../../subworkflows/local/taxonomy" include { MERGE_TAXONOMY_RIBO } from "../../../modules/local/mergeTaxonomyRibo" -include { CONCAT_GROUP } from "../../../modules/local/concatGroup" /**************** | MAIN WORKFLOW | @@ -28,23 +39,36 @@ workflow PROFILE { k bbduk_suffix grouping + single_end main: + + // Randomly subset reads to target number - subset_ch = SUBSET_READS_PAIRED_TARGET(reads_ch, n_reads, "fastq") + subset_ch = SUBSET_READS_TARGET(reads_ch, n_reads, "fastq") + if (grouping){ // Join samplesheet with trimmed_reads and update fastq files - subset_group_ch = group_ch.join(subset_ch, by: 0) - .map { sample, group, reads -> tuple(sample, reads[0], reads[1], group) } - .groupTuple(by: 3) + if (single_end) { + subset_group_ch = group_ch.join(subset_ch, by: 0) + .map { sample, group, reads -> tuple(sample, reads, group) } + .groupTuple(by: 2) + // Single-sample groups are already subsetted to target number + single_sample_groups = subset_group_ch.filter { it[0].size() == 1 } + .map { samples, read_list, group -> tuple(group, [read_list[0]]) } + + } else { + subset_group_ch = group_ch.join(subset_ch, by: 0) + .map { sample, group, reads -> tuple(sample, reads[0], reads[1], group) } + .groupTuple(by: 3) + single_sample_groups = subset_group_ch.filter { it[0].size() == 1 } + .map { samples, fwd_list, rev_list, group -> tuple(group, [fwd_list[0], rev_list[0]]) } + } // Split into multi-sample groups, these need to be subsetted to target number multi_sample_groups = subset_group_ch.filter { it[0].size() > 1 } - // These are already subsetted to target number - single_sample_groups = subset_group_ch.filter { it[0].size() == 1 } - .map { samples, fwd_list, rev_list, group -> tuple(group, [fwd_list[0], rev_list[0]]) } // Concatenate multi-sample groups grouped_samples = CONCAT_GROUP(multi_sample_groups) // Randomly subset multi-sample groups to target number - subset_grouped_ch = SUBSET_READS_PAIRED_TARGET_GROUP(grouped_samples, n_reads, "fastq") + subset_grouped_ch = SUBSET_READS_TARGET_GROUP(grouped_samples, n_reads, "fastq") // Mix with subsetted multi-sample group with already subsetted single-sample groups grouped_ch = subset_grouped_ch.mix(single_sample_groups) } else { @@ -54,8 +78,8 @@ workflow PROFILE { ribo_path = "${ref_dir}/results/ribo-ref-concat.fasta.gz" ribo_ch = BBDUK(grouped_ch, ribo_path, min_kmer_fraction, k, bbduk_suffix) // Run taxonomic profiling separately on ribo and non-ribo reads - tax_ribo_ch = TAXONOMY_RIBO(ribo_ch.fail, kraken_db_ch, false, "D") - tax_noribo_ch = TAXONOMY_NORIBO(ribo_ch.reads, kraken_db_ch, false, "D") + tax_ribo_ch = TAXONOMY_RIBO(ribo_ch.fail, kraken_db_ch, false, "D", single_end) + tax_noribo_ch = TAXONOMY_NORIBO(ribo_ch.reads, kraken_db_ch, false, "D", single_end) // Merge ribo and non-ribo outputs kr_ribo = tax_ribo_ch.kraken_reports.collectFile(name: "kraken_reports_ribo.tsv.gz") kr_noribo = tax_noribo_ch.kraken_reports.collectFile(name: "kraken_reports_noribo.tsv.gz") diff --git a/subworkflows/local/taxonomy/main.nf b/subworkflows/local/taxonomy/main.nf index 425d03a3..2e7c70ea 100644 --- a/subworkflows/local/taxonomy/main.nf +++ b/subworkflows/local/taxonomy/main.nf @@ -6,11 +6,11 @@ | MODULES AND SUBWORKFLOWS | ***************************/ +include { JOIN_FASTQ } from "../../../modules/local/joinFastq" include { BBMERGE } from "../../../modules/local/bbmerge" include { SUMMARIZE_BBMERGE } from "../../../modules/local/summarizeBBMerge" include { SUMMARIZE_DEDUP } from "../../../modules/local/summarizeDedup" include { CLUMPIFY_PAIRED } from "../../../modules/local/clumpify" -include { JOIN_FASTQ } from "../../../modules/local/joinFastq" include { CLUMPIFY_SINGLE } from "../../../modules/local/clumpify" include { KRAKEN } from "../../../modules/local/kraken" include { LABEL_KRAKEN_REPORTS } from "../../../modules/local/labelKrakenReports" @@ -29,24 +29,33 @@ workflow TAXONOMY { kraken_db_ch dedup_rc classification_level + single_end main: - // Deduplicate reads (if applicable) - if ( dedup_rc ){ - paired_dedup_ch = CLUMPIFY_PAIRED(reads_ch) + if (single_end) { + // No merging in single read version + summarize_bbmerge_ch = Channel.empty() + single_read_ch = reads_ch } else { - paired_dedup_ch = reads_ch + // Deduplicate reads (if applicable) + if ( dedup_rc ){ + paired_dedup_ch = CLUMPIFY_PAIRED(reads_ch) + } else { + paired_dedup_ch = reads_ch + } + // Prepare reads + merged_ch = BBMERGE(paired_dedup_ch) + // Only want to summarize the merged elements + summarize_bbmerge_ch = SUMMARIZE_BBMERGE(merged_ch.reads.map{sample, files -> [sample, files[0]]}) + single_read_ch = JOIN_FASTQ(merged_ch.reads) } - // Prepare reads - merged_ch = BBMERGE(paired_dedup_ch) - // Only want to summarize the merged elements - summarize_bbmerge_ch = SUMMARIZE_BBMERGE(merged_ch.reads.map{sample, files -> [sample, files[0]]}) - joined_ch = JOIN_FASTQ(merged_ch.reads) + // Deduplicate reads (if applicable) - if ( dedup_rc ){ - dedup_ch = CLUMPIFY_SINGLE(joined_ch) - } else { - dedup_ch = joined_ch + if (dedup_rc) { + dedup_ch = CLUMPIFY_SINGLE(single_read_ch) + } else { + dedup_ch = single_read_ch } + // Summarize last of the output summarize_dedup_ch = SUMMARIZE_DEDUP(dedup_ch) diff --git a/test-data/nextflow.config b/test-data/nextflow.config index 09cbd5d3..191f7a20 100644 --- a/test-data/nextflow.config +++ b/test-data/nextflow.config @@ -9,6 +9,8 @@ params { base_dir = "s3://nao-mgs-wb/test-batch" // Parent for working and output directories (can be S3) ref_dir = "s3://nao-mgs-wb/index/20241209/output" // Reference/index directory (generated by index workflow) + + // Files sample_sheet = "${launchDir}/samplesheet.csv" // Path to library TSV adapters = "${projectDir}/ref/adapters.fasta" // Path to adapter file for adapter trimming diff --git a/test-data/single-end-samplesheet.csv b/test-data/single-end-samplesheet.csv index 5b6a53f0..f59a9a86 100644 --- a/test-data/single-end-samplesheet.csv +++ b/test-data/single-end-samplesheet.csv @@ -1,2 +1,2 @@ sample,fastq -230926Esv_D23-14904-1,s3://nao-testing/gold-standard-test/raw/gold_standard_R1.fastq.gz \ No newline at end of file +gold_standard,s3://nao-testing/gold-standard-test/raw/gold_standard_R1.fastq.gz \ No newline at end of file diff --git a/tests/run.config b/tests/run.config index a80203b3..cd8c62a6 100644 --- a/tests/run.config +++ b/tests/run.config @@ -24,7 +24,6 @@ params { quality_encoding = "phred33" // FASTQ quality encoding (probably phred33, maybe phred64) fuzzy_match_alignment_duplicates = 0 // Fuzzy matching the start coordinate of reads for identification of duplicates through alignment (0 = exact matching; options are 0, 1, or 2) host_taxon = "vertebrate" - blast_db_prefix = "nt_others" } diff --git a/tests/run_dev_se.config b/tests/run_dev_se.config index 4090dabc..34b72954 100644 --- a/tests/run_dev_se.config +++ b/tests/run_dev_se.config @@ -23,6 +23,8 @@ params { quality_encoding = "phred33" // FASTQ quality encoding (probably phred33, maybe phred64) fuzzy_match_alignment_duplicates = 0 // Fuzzy matching the start coordinate of reads for identification of duplicates through alignment (0 = exact matching; options are 0, 1, or 2) host_taxon = "vertebrate" + + blast_db_prefix = "nt_others" } includeConfig "${projectDir}/configs/containers.config" diff --git a/workflows/run.nf b/workflows/run.nf index e5e543eb..94cf02d2 100644 --- a/workflows/run.nf +++ b/workflows/run.nf @@ -39,7 +39,7 @@ workflow RUN { RAW(samplesheet_ch, params.n_reads_trunc, "2", "4 GB", "raw_concat", params.single_end) CLEAN(RAW.out.reads, params.adapters, "2", "4 GB", "cleaned", params.single_end) // Extract and count human-viral reads - EXTRACT_VIRAL_READS(CLEAN.out.reads, group_ch, params.ref_dir, kraken_db_path, params.bt2_score_threshold, params.adapters, params.host_taxon, "1", "24", "viral", "${params.quality_encoding}", "${params.fuzzy_match_alignment_duplicates}", params.grouping) + EXTRACT_VIRAL_READS(CLEAN.out.reads, group_ch, params.ref_dir, kraken_db_path, params.bt2_score_threshold, params.adapters, params.host_taxon, "1", "24", "viral", "${params.quality_encoding}", "${params.fuzzy_match_alignment_duplicates}", params.grouping, params.single_end) // Process intermediate output for chimera detection raw_processed_ch = EXTRACT_VIRAL_READS.out.bbduk_match.join(RAW.out.reads, by: 0) EXTRACT_RAW_READS_FROM_PROCESSED(raw_processed_ch, "raw_viral_subset") @@ -53,7 +53,7 @@ workflow RUN { blast_paired_ch = Channel.empty() } // Taxonomic profiling - PROFILE(CLEAN.out.reads, group_ch, kraken_db_path, params.n_reads_profile, params.ref_dir, "0.4", "27", "ribo", params.grouping) + PROFILE(CLEAN.out.reads, group_ch, kraken_db_path, params.n_reads_profile, params.ref_dir, "0.4", "27", "ribo", params.grouping, params.single_end) // Process output qc_ch = RAW.out.qc.concat(CLEAN.out.qc) PROCESS_OUTPUT(qc_ch) diff --git a/workflows/run_dev_se.nf b/workflows/run_dev_se.nf index ca301db8..3df5f0e4 100644 --- a/workflows/run_dev_se.nf +++ b/workflows/run_dev_se.nf @@ -12,6 +12,7 @@ import java.time.LocalDateTime include { RAW } from "../subworkflows/local/raw" include { CLEAN } from "../subworkflows/local/clean" include { PROCESS_OUTPUT } from "../subworkflows/local/processOutput" +include { PROFILE } from "../subworkflows/local/profile" include { LOAD_SAMPLESHEET } from "../subworkflows/local/loadSampleSheet" nextflow.preview.output = true @@ -27,10 +28,17 @@ workflow RUN_DEV_SE { group_ch = LOAD_SAMPLESHEET.out.group start_time_str = LOAD_SAMPLESHEET.out.start_time_str + // Load kraken db path + kraken_db_path = "${params.ref_dir}/results/kraken_db" + + // Preprocessing RAW(samplesheet_ch, params.n_reads_trunc, "2", "4 GB", "raw_concat", params.single_end) CLEAN(RAW.out.reads, params.adapters, "2", "4 GB", "cleaned", params.single_end) + // Taxonomic profiling + PROFILE(CLEAN.out.reads, group_ch, kraken_db_path, params.n_reads_profile, params.ref_dir, "0.4", "27", "ribo", params.grouping, params.single_end) + // Process output qc_ch = RAW.out.qc.concat(CLEAN.out.qc) PROCESS_OUTPUT(qc_ch) @@ -60,4 +68,7 @@ workflow RUN_DEV_SE { PROCESS_OUTPUT.out.adapt >> "results" PROCESS_OUTPUT.out.qbase >> "results" PROCESS_OUTPUT.out.qseqs >> "results" + // Final results + PROFILE.out.bracken >> "results" + PROFILE.out.kraken >> "results" }