Nextflow Development - Outputs, Scatter, and Gather
+-
+
- Gain an understanding of how to structure nextflow published outputs +
- Gain an understanding of how to do scatter & gather processes +
Environment Setup
+Set up an interactive shell to run our Nextflow workflow:
+ srun --pty -p prod_short --mem 8GB --mincpus 2 -t 0-2:00 bash
Load the required modules to run Nextflow:
+
+ module load nextflow/23.04.1 module load singularity/3.7.3
Set the singularity cache environment variable:
+ export NXF_SINGULARITY_CACHEDIR=/config/binaries/singularity/containers_devel/nextflow
Singularity images downloaded by workflow executions will now be stored in this directory.
+You may want to include these, or other environmental variables, in your .bashrc
file (or alternate) that is loaded when you log in so you don’t need to export variables every session. A complete list of environment variables can be found here.
The training data can be cloned from:
+ git clone https://github.com/nextflow-io/training.git
RNA-seq Workflow and Module Files
+Previously, we created three Nextflow files and one config file:
+
+ ├── nextflow.config
+ ├── rnaseq.nf
+ ├── modules.nf
+ └── modules └── trimgalore.nf
-
+
rnaseq.nf
: main workflow script where parameters are defined and processes were called.
+
+ #!/usr/bin/env nextflow
+
+ params.reads = "/scratch/users/.../training/nf-training/data/ggal/*_{1,2}.fq"
+ params.transcriptome_file = "/scratch/users/.../training/nf-training/data/ggal/transcriptome.fa"
+
+ reads_ch = Channel.fromFilePairs("$params.reads")
+
+ include { INDEX } from './modules.nf'
+ include { QUANTIFICATION as QT } from './modules.nf'
+ include { FASTQC as FASTQC_one } from './modules.nf'
+ include { FASTQC as FASTQC_two } from './modules.nf'
+ include { MULTIQC } from './modules.nf'
+ include { TRIMGALORE } from './modules/trimgalore.nf'
+
+ workflow {
+ index_ch = INDEX(params.transcriptome_file)
+ quant_ch = QT(index_ch, reads_ch)
+ fastqc_ch = FASTQC_one(reads_ch)
+ trimgalore_out_ch = TRIMGALORE(reads_ch).reads
+ fastqc_cleaned_ch = FASTQC_two(trimgalore_out_ch)
+ multiqc_ch = MULTIQC(quant_ch, fastqc_ch) }
-
+
modules.nf
: script containing the majority of modules, includingINDEX
,QUANTIFICATION
,FASTQC
, andMULTIQC
+
+ process INDEX {
+ container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"
+
+ input:
+ path transcriptome
+
+ output:
+ path "salmon_idx"
+
+ script:
+ """
+ salmon index --threads $task.cpus -t $transcriptome -i salmon_idx
+ """
+ }
+
+ process QUANTIFICATION {
+ container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-salmon-1.10.1--h7e5ed60_0.img"
+
+ input:
+ path salmon_index
+ tuple val(sample_id), path(reads)
+
+ output:
+ path "$sample_id"
+
+ script:
+ """
+ salmon quant --threads $task.cpus --libType=U \
+ -i $salmon_index -1 ${reads[0]} -2 ${reads[1]} -o $sample_id
+ """
+ }
+
+ process FASTQC {
+ container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-fastqc-0.12.1--hdfd78af_0.img"
+
+ input:
+ tuple val(sample_id), path(reads)
+
+ output:
+ path "fastqc_${sample_id}_logs"
+
+ script:
+ """
+ mkdir fastqc_${sample_id}_logs
+ fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads}
+ """
+ }
+
+ process MULTIQC {
+ publishDir params.outdir, mode:'copy'
+ container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"
+
+ input:
+ path quantification
+ path fastqc
+
+ output:
+ path "*.html"
+
+ script:
+ """
+ multiqc . --filename $quantification
+ """ }
-
+
modules/trimgalore.nf
: script inside amodules
folder, containing only theTRIMGALORE
process
+
+ process TRIMGALORE {
+ container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img'
+
+ input:
+ tuple val(sample_id), path(reads)
+
+ output:
+ tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
+ tuple val(sample_id), path("*report.txt") , emit: log , optional: true
+ tuple val(sample_id), path("*unpaired*.fq.gz") , emit: unpaired, optional: true
+ tuple val(sample_id), path("*.html") , emit: html , optional: true
+ tuple val(sample_id), path("*.zip") , emit: zip , optional: true
+
+ script:
+ """
+ trim_galore \\
+ --paired \\
+ --gzip \\
+ ${reads[0]} \\
+ ${reads[1]}
+ """ }
-
+
nextflow.config
: config file that enables singularity
+
+ singularity {
+ enabled = true
+ autoMounts = true
+ cacheDir = "/config/binaries/singularity/containers_devel/nextflow" }
Run the pipeline, specifying --outdir
:
+ >>> nextflow run rnaseq.nf --outdir output
+ N E X T F L O W ~ version 23.04.1
+ Launching `rnaseq.nf` [soggy_jennings] DSL2 - revision: 87afc1d98d
+ executor > local (16)
+ [93/d37ef0] process > INDEX [100%] 1 of 1 ✔
+ [b3/4c4d9c] process > QT (1) [100%] 3 of 3 ✔
+ [d0/173a6e] process > FASTQC_one (3) [100%] 3 of 3 ✔
+ [58/0b8af2] process > TRIMGALORE (3) [100%] 3 of 3 ✔
+ [c6/def175] process > FASTQC_two (3) [100%] 3 of 3 ✔ [e0/bcf904] process > MULTIQC (3) [100%] 3 of 3 ✔
8.1. Organise outputs
+The output declaration block defines the channels used by the process to send out the results produced. However, this output only stays in the work/
directory if there is no publishDir
directive specified.
Given each task is being executed in separate temporary work/ folder (e.g., work/f1/850698…), you may want to save important, non-intermediary, and/or final files in a results folder.
+To store our workflow result files, you need to explicitly mark them using the directive publishDir
in the process that’s creating the files. For example:
+ process MULTIQC {
+ publishDir params.outdir, mode:'copy'
+ container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"
+
+ input:
+ path quantification
+ path fastqc
+
+ output:
+ path "*.html"
+
+ script:
+ """
+ multiqc . --filename $quantification
+ """ }
The above example will copy all html
files created by the MULTIQC process into the directory path specified in the params.outdir
8.1.1. Store outputs matching a glob pattern
+You can use more than one publishDir
to keep different outputs in separate directories. For each directive specify a different glob pattern
using the pattern option to store into each directory only the files that match the provided pattern.
For example:
+
+ reads_ch = Channel.fromFilePairs('data/ggal/*_{1,2}.fq')
+
+ process FOO {
+ publishDir "results/bam", pattern: "*.bam"
+ publishDir "results/bai", pattern: "*.bai"
+
+ input:
+ tuple val(sample_id), path(sample_id_paths)
+
+ output:
+ tuple val(sample_id), path("*.bam")
+ tuple val(sample_id), path("*.bai")
+
+ script:
+ """
+ echo your_command_here --sample $sample_id_paths > ${sample_id}.bam
+ echo your_command_here --sample $sample_id_paths > ${sample_id}.bai
+ """ }
Exercise
+Use publishDir
and pattern
to keep the outputs from the trimgalore.nf
into separate directories.
+ process TRIMGALORE {
+ container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img'
+ publishDir "$params.outdir/report", mode: "copy", pattern:"*report.txt"
+ publishDir "$params.outdir/trimmed_fastq", mode: "copy", pattern:"*fq.gz"
+
+ input:
+ tuple val(sample_id), path(reads)
+
+ output:
+ tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
+ tuple val(sample_id), path("*report.txt") , emit: log , optional: true
+ tuple val(sample_id), path("*unpaired*.fq.gz") , emit: unpaired, optional: true
+ tuple val(sample_id), path("*.html") , emit: html , optional: true
+ tuple val(sample_id), path("*.zip") , emit: zip , optional: true
+
+ script:
+ """
+ trim_galore \\
+ --paired \\
+ --gzip \\
+ ${reads[0]} \\
+ ${reads[1]}
+ """ }
Output should now look like
+
+ >>> tree ./output
+ ./output
+ ├── gut.html
+ ├── liver.html
+ ├── lung.html
+ ├── report
+ │ ├── gut_1.fq_trimming_report.txt
+ │ ├── gut_2.fq_trimming_report.txt
+ │ ├── liver_1.fq_trimming_report.txt
+ │ ├── liver_2.fq_trimming_report.txt
+ │ ├── lung_1.fq_trimming_report.txt
+ │ └── lung_2.fq_trimming_report.txt
+ └── trimmed_fastq
+ ├── gut_1_val_1.fq.gz
+ ├── gut_2_val_2.fq.gz
+ ├── liver_1_val_1.fq.gz
+ ├── liver_2_val_2.fq.gz
+ ├── lung_1_val_1.fq.gz
+ └── lung_2_val_2.fq.gz
+ 2 directories, 15 files
8.1.2. Store outputs renaming files or in a sub-directory
+The publishDir
directive also allow the use of saveAs
option to give each file a name of your choice, providing a custom rule as a closure.
+ process foo {
+ publishDir 'results', saveAs: { filename -> "foo_$filename" }
+
+ output:
+ path '*.txt'
+
+ '''
+ touch this.txt
+ touch that.txt
+ ''' }
The same pattern can be used to store specific files in separate directories depending on the actual name.
+
+ process foo {
+ publishDir 'results', saveAs: { filename -> filename.endsWith(".zip") ? "zips/$filename" : filename }
+
+ output:
+ path '*'
+
+ '''
+ touch this.txt
+ touch that.zip
+ ''' }
Exercise
+Modify the MULTIQC
output with saveAs
such that resulting folder is as follow:
+ ./output
+ ├── MultiQC
+ │ ├── multiqc_gut.html
+ │ ├── multiqc_liver.html
+ │ └── multiqc_lung.html
+ ├── report
+ │ ├── gut_1.fq_trimming_report.txt
+ │ ├── gut_2.fq_trimming_report.txt
+ │ ├── liver_1.fq_trimming_report.txt
+ │ ├── liver_2.fq_trimming_report.txt
+ │ ├── lung_1.fq_trimming_report.txt
+ │ └── lung_2.fq_trimming_report.txt
+ └── trimmed_fastq
+ ├── gut_1_val_1.fq.gz
+ ├── gut_2_val_2.fq.gz
+ ├── liver_1_val_1.fq.gz
+ ├── liver_2_val_2.fq.gz
+ ├── lung_1_val_1.fq.gz
+ └── lung_2_val_2.fq.gz
+ 3 directories, 15 files
You need to remove existing output folder/files if you want to have a clean output. By default, nextflow will overwrite existing files, and keep all the remaining files in the same specified output directory.
+
+ process MULTIQC {
+ publishDir params.outdir, mode:'copy', saveAs: { filename -> filename.endsWith(".html") ? "MultiQC/multiqc_$filename" : filename }
+ container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"
+
+ input:
+ path quantification
+ path fastqc
+
+ output:
+ path "*.html"
+
+ script:
+ """
+ multiqc . --filename $quantification
+ """ }
Challenge
+Modify all the processes in rnaseq.nf
such that we will have the following output structure
+ ./output
+ ├── gut
+ │ ├── QC
+ │ │ ├── fastqc_gut_logs
+ │ │ │ ├── gut_1_fastqc.html
+ │ │ │ ├── gut_1_fastqc.zip
+ │ │ │ ├── gut_2_fastqc.html
+ │ │ │ └── gut_2_fastqc.zip
+ │ │ └── gut.html
+ │ ├── report
+ │ │ ├── gut_1.fq_trimming_report.txt
+ │ │ └── gut_2.fq_trimming_report.txt
+ │ └── trimmed_fastq
+ │ ├── gut_1_val_1.fq.gz
+ │ └── gut_2_val_2.fq.gz
+ ├── liver
+ │ ├── QC
+ │ │ ├── fastqc_liver_logs
+ │ │ │ ├── liver_1_fastqc.html
+ │ │ │ ├── liver_1_fastqc.zip
+ │ │ │ ├── liver_2_fastqc.html
+ │ │ │ └── liver_2_fastqc.zip
+ │ │ └── liver.html
+ │ ├── report
+ │ │ ├── liver_1.fq_trimming_report.txt
+ │ │ └── liver_2.fq_trimming_report.txt
+ │ └── trimmed_fastq
+ │ ├── liver_1_val_1.fq.gz
+ │ └── liver_2_val_2.fq.gz
+ └── lung
+ ├── QC
+ │ ├── fastqc_lung_logs
+ │ │ ├── lung_1_fastqc.html
+ │ │ ├── lung_1_fastqc.zip
+ │ │ ├── lung_2_fastqc.html
+ │ │ └── lung_2_fastqc.zip
+ │ └── lung.html
+ ├── report
+ │ ├── lung_1.fq_trimming_report.txt
+ │ └── lung_2.fq_trimming_report.txt
+ └── trimmed_fastq
+ ├── lung_1_val_1.fq.gz
+ └── lung_2_val_2.fq.gz
+ 15 directories, 27 files
+ process FASTQC {
+ publishDir "$params.outdir/$sample_id/QC", mode:'copy'
+ container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-fastqc-0.12.1--hdfd78af_0.img"
+
+ input:
+ tuple val(sample_id), path(reads)
+
+ output:
+ path "fastqc_${sample_id}_logs"
+
+ script:
+ """
+ mkdir fastqc_${sample_id}_logs
+ fastqc -o fastqc_${sample_id}_logs -f fastq -q ${reads}
+ """
+ }
+
+ process MULTIQC {
+ //publishDir params.outdir, mode:'copy', saveAs: { filename -> filename.endsWith(".html") ? "MultiQC/multiqc_$filename" : filename }
+ publishDir "$params.outdir/$quantification/QC", mode:'copy'
+ container "/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-multiqc-1.21--pyhdfd78af_0.img"
+
+ input:
+ path quantification
+ path fastqc
+
+ output:
+ path "*.html"
+
+ script:
+ """
+ multiqc . --filename $quantification
+ """
+ }
+
+ process TRIMGALORE {
+ container '/config/binaries/singularity/containers_devel/nextflow/depot.galaxyproject.org-singularity-trim-galore-0.6.6--0.img'
+ publishDir "${params.outdir}/${sample_id}/report", mode: "copy", pattern:"*report.txt"
+ publishDir "${params.outdir}/${sample_id}/trimmed_fastq", mode: "copy", pattern:"*fq.gz"
+
+ input:
+ tuple val(sample_id), path(reads)
+
+ output:
+ tuple val(sample_id), path("*{3prime,5prime,trimmed,val}*.fq.gz"), emit: reads
+ tuple val(sample_id), path("*report.txt") , emit: log , optional: true
+ tuple val(sample_id), path("*unpaired*.fq.gz") , emit: unpaired, optional: true
+ tuple val(sample_id), path("*.html") , emit: html , optional: true
+ tuple val(sample_id), path("*.zip") , emit: zip , optional: true
+
+ script:
+ """
+ trim_galore \\
+ --paired \\
+ --gzip \\
+ ${reads[0]} \\
+ ${reads[1]}
+ """ }
8.2 Scatter
+The scatter
operation involves distributing large input data into smaller chunks that can be analysed across multiple processes in parallel.
One very simple example of native scatter
is how nextflow handles Channel factories with the Channel.fromPath
or Channel.fromFilePairs
method, where multiple input data is processed in parallel.
+ params.reads = "/scratch/users/.../training/nf-training/data/ggal/*_{1,2}.fq"
+ reads_ch = Channel.fromFilePairs("$params.reads")
+
+ include { FASTQC as FASTQC_one } from './modules.nf'
+
+ workflow {
+ fastqc_ch = FASTQC_one(reads_ch) }
From the above snippet from our rnaseq.nf
, we will get three execution of FASTQC_one for each pairs of our input data.
Other than natively splitting execution by input data, Nextflow also provides operators to scatter existing input data for various benefits, such as faster processing. For example:
+-
+
- splitText +
- splitFasta +
- splitFastq +
- map with from or fromList +
- flatten +
8.2.1 Process per file chunk
+Exercise
+
+ params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
+ params.size = 100000
+
+ process count_line {
+ debug true
+ input:
+ file x
+
+ script:
+ """
+ wc -l $x
+ """
+ }
+
+ workflow {
+ Channel.fromPath(params.infile) \
+ | splitText(by: params.size, file: true) \
+ | count_line }
Exercise
+
+ params.infile = "/scratch/users/rlupat/nfWorkshop/dev1/training/nf-training/data/ggal/*_{1,2}.fq"
+ params.size = 1000
+
+ workflow {
+ Channel.fromFilePairs(params.infile, flat: true) \
+ | splitFastq(by: params.size, pe: true, file: true) \
+ | view() }
8.2.1 Process per file range
+Exercise
+
+ Channel.from(1..22) \
+ | map { chr -> ["sample${chr}", file("${chr}.indels.vcf"), file("${chr}.vcf")] } \ | view()
+ >> nextflow run test_scatter.nf
+
+ [sample1, /scratch/users/${users}/1.indels.vcf, /scratch/users/${users}/1.vcf]
+ [sample2, /scratch/users/${users}/2.indels.vcf, /scratch/users/${users}/2.vcf]
+ [sample3, /scratch/users/${users}/3.indels.vcf, /scratch/users/${users}/3.vcf]
+ [sample4, /scratch/users/${users}/4.indels.vcf, /scratch/users/${users}/4.vcf]
+ [sample5, /scratch/users/${users}/5.indels.vcf, /scratch/users/${users}/5.vcf]
+ [sample6, /scratch/users/${users}/6.indels.vcf, /scratch/users/${users}/6.vcf]
+ [sample7, /scratch/users/${users}/7.indels.vcf, /scratch/users/${users}/7.vcf]
+ [sample8, /scratch/users/${users}/8.indels.vcf, /scratch/users/${users}/8.vcf]
+ [sample9, /scratch/users/${users}/9.indels.vcf, /scratch/users/${users}/9.vcf]
+ [sample10, /scratch/users${users}/10.indels.vcf, /scratch/users${users}/10.vcf]
+ [sample11, /scratch/users${users}/11.indels.vcf, /scratch/users${users}/11.vcf]
+ [sample12, /scratch/users${users}/12.indels.vcf, /scratch/users${users}/12.vcf]
+ [sample13, /scratch/users${users}/13.indels.vcf, /scratch/users${users}/13.vcf]
+ [sample14, /scratch/users${users}/14.indels.vcf, /scratch/users${users}/14.vcf]
+ [sample15, /scratch/users${users}/15.indels.vcf, /scratch/users${users}/15.vcf]
+ [sample16, /scratch/users${users}/16.indels.vcf, /scratch/users${users}/16.vcf]
+ [sample17, /scratch/users${users}/17.indels.vcf, /scratch/users${users}/17.vcf]
+ [sample18, /scratch/users${users}/18.indels.vcf, /scratch/users${users}/18.vcf]
+ [sample19, /scratch/users${users}/19.indels.vcf, /scratch/users${users}/19.vcf]
+ [sample20, /scratch/users${users}/20.indels.vcf, /scratch/users${users}/20.vcf]
+ [sample21, /scratch/users${users}/21.indels.vcf, /scratch/users${users}/21.vcf] [sample22, /scratch/users${users}/22.indels.vcf, /scratch/users${users}/22.vcf]
Exercise
+params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
+params.size = 100000
+
+process split_bed_by_chr {
+ debug true
+
+ input:
+ path bed
+ val chr
+
+ output:
+ path "*.bed"
+
+ script:
+ """
+ grep ^${chr}\t ${bed} > ${chr}.bed
+ """
+}
+
+workflow {
+ split_bed_by_chr(params.infile, Channel.from(1..22)) | view()
+}
+Challenge
+How do we include chr X
and Y
into the above split by chromosome?
+ workflow {
+ split_bed_by_chr(params.infile, Channel.from(1..22,'X','Y').flatten()) | view() }
8.3 Gather
+The gather
operation consolidates results from parallel computations (can be from scatter
) into a centralized process for aggregation and further processing.
Some of the Nextflow provided operators that facilitate this gather
operation, include:
-
+
- collect +
- collectFile +
- map + groupTuple +
8.3.1. Process all outputs altogether
+Exercise
+
+ params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
+ params.size = 100000
+
+ process split_bed_by_chr {
+ debug true
+
+ input:
+ path bed
+ val chr
+
+ output:
+ path "*.bed"
+
+ script:
+ """
+ grep ^${chr}\t ${bed} > ${chr}.bed
+ """
+ }
+
+ workflow {
+ split_bed_by_chr(params.infile, Channel.from(1..22,'X','Y').flatten()) | collect | view() }
8.3.2. Collect outputs into a file
+Exercise
+
+ params.infile = "/data/reference/bed_files/Agilent_CRE_v2/S30409818_Covered_MERGED.bed"
+ params.size = 100000
+
+ process split_bed_by_chr {
+ debug true
+
+ input:
+ path bed
+ val chr
+
+ output:
+ path "*.bed"
+
+ script:
+ """
+ grep ^${chr}\t ${bed} > ${chr}.bed
+ """
+ }
+
+ workflow {
+ split_bed_by_chr(params.infile, Channel.from(1..22,'X','Y').flatten()) | collectFile(name: 'merged.bed', newLine:true) | view() }
Exercise
+
+ workflow {
+ Channel.fromPath("/scratch/users/rlupat/nfWorkshop/dev1/training/nf-training/data/ggal/*_1.fq", checkIfExists: true) \
+ | collectFile(name: 'combined_1.fq', newLine:true) \
+ | view }