From 66d431cc736811b22cc59ec505098305ebba224e Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 7 Jun 2023 14:55:43 -0700 Subject: [PATCH 1/7] todos --- src/main.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index 2e9f833..78ea598 100644 --- a/src/main.rs +++ b/src/main.rs @@ -65,6 +65,7 @@ pub mod built_info { } struct FastqWriter { + // TODO: use Sender> tx: Sender>, lock: Mutex<()>, } @@ -76,6 +77,7 @@ impl FastqWriter { bounded(WRITER_CHANNEL_SIZE); std::thread::spawn(move || { + // TODO: one or two writers based on separate or interleaved output let mut maybe_writer: Option> = { if count { None @@ -124,7 +126,11 @@ impl FastqWriter { } } +// TODO: needs to know what read # it is, or if it is interleaved +// - enum: Single, Paired, Interleaved +// - enum: FirstOfPair, SecondOfPair fn spawn_reader(file: PathBuf, decompress: bool) -> Receiver> { + // TODO: Change these from Sender>... let (tx, rx) = bounded(READER_CHANNEL_SIZE); std::thread::spawn(move || { // Open the file or standad input @@ -266,8 +272,8 @@ struct Opts { /// Hidden option to capture stdout for testing /// - #[structopt(long, hidden = true)] - output: Option, + #[structopt(long, parse(from_os_str))] + output: Vec, } fn read_patterns(file: &PathBuf) -> Result> { From 6ec7df3505c9a0721b8c0ac69a3ebee298c2b5e2 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Wed, 7 Jun 2023 15:59:50 -0700 Subject: [PATCH 2/7] wip --- src/main.rs | 319 +++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 240 insertions(+), 79 deletions(-) diff --git a/src/main.rs b/src/main.rs index 78ea598..2f33432 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,7 @@ use parking_lot::Mutex; use proglog::{CountFormatterKind, ProgLog, ProgLogBuilder}; use rayon::prelude::*; use seq_io::fastq::{self, OwnedRecord, Record}; +use serde::{Deserialize, Serialize}; use std::process::ExitCode; use std::{ fs::File, @@ -64,46 +65,121 @@ pub mod built_info { } } +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum RecordType { + #[default] + FirstOfPair, + SecondOfPair, +} + +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum OutputType { + #[default] + Interleaved, + Fragment, + Paired, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +struct FastqOutputRecord { + pub record: OwnedRecord, + pub tpe: RecordType, +} + struct FastqWriter { - // TODO: use Sender> - tx: Sender>, + tx: Sender>, lock: Mutex<()>, } impl FastqWriter { - fn new(count_tx: Sender, count: bool, paired: bool, output: Option) -> Self { + fn new( + count_tx: Sender, + count: bool, + paired: bool, + output_type: OutputType, + output: Vec, + ) -> Self { // TODO: try making this unbounded - let (tx, rx): (Sender>, Receiver>) = - bounded(WRITER_CHANNEL_SIZE); + let (tx, rx): ( + Sender>, + Receiver>, + ) = bounded(WRITER_CHANNEL_SIZE); std::thread::spawn(move || { - // TODO: one or two writers based on separate or interleaved output - let mut maybe_writer: Option> = { - if count { - None - } else if let Some(file_path) = output { - Some(Box::new(BufWriter::with_capacity( - BUFSIZE, - File::create(file_path).unwrap(), - ))) - } else { - Some(Box::new(BufWriter::with_capacity( - BUFSIZE, - std::io::stdout(), - ))) - } - }; + let (mut r1_writer, mut r2_writer): (Option>, Option>) = + { + if count { + (None, None) + } else if output.is_empty() { + assert!(output_type != OutputType::Paired); + let writer: Option> = Some(Box::new( + BufWriter::with_capacity(BUFSIZE, std::io::stdout()), + )); + (writer, None) + } else if output.len() == 1 { + assert!(output_type != OutputType::Paired); + let writer: Option> = Some(Box::new( + BufWriter::with_capacity(BUFSIZE, File::create(&output[0]).unwrap()), + )); + (writer, None) + } else if output.len() == 2 { + assert!(output_type == OutputType::Paired); + assert!(!paired); + let w1: Option> = Some(Box::new(BufWriter::with_capacity( + BUFSIZE, + File::create(&output[0]).unwrap(), + ))); + let w2: Option> = Some(Box::new(BufWriter::with_capacity( + BUFSIZE, + File::create(&output[1]).unwrap(), + ))); + (w1, w2) + } else { + panic!("Expected 0-2 output files, got {}", output.len()); + } + }; let mut num_matches = 0; while let Ok(reads) = rx.recv() { num_matches += reads.len(); - if let Some(ref mut writer) = maybe_writer { + if !count { for read in reads { - fastq::write_to(&mut *writer, &read.head, &read.seq, &read.qual) - .expect("failed writing read"); + match read.tpe { + RecordType::FirstOfPair => { + if let Some(writer) = r1_writer.as_mut() { + fastq::write_to( + &mut *writer, + &read.record.head, + &read.record.seq, + &read.record.qual, + ) + .expect("failed writing read"); + } + } + RecordType::SecondOfPair => { + let r2_writer = if output_type == OutputType::Interleaved { + r1_writer.as_mut() + } else { + r2_writer.as_mut() + }; + if let Some(writer) = r2_writer { + fastq::write_to( + &mut *writer, + &read.record.head, + &read.record.seq, + &read.record.qual, + ) + .expect("failed writing read"); + } + } + } + + // fastq::write_to(&mut *writer, &read.head, &read.seq, &read.qual) + // .expect("failed writing read"); } - }; + } } + if paired { num_matches /= 2; } @@ -114,9 +190,13 @@ impl FastqWriter { .unwrap(); } - if let Some(mut writer) = maybe_writer { - writer.flush().expect("Error flushing writer"); + if let Some(mut w) = r1_writer { + w.flush().expect("Error flushing writer"); + }; + if let Some(mut w) = r2_writer { + w.flush().expect("Error flushing writer"); }; + count_tx .send(num_matches) .expect("failed sending final count"); @@ -126,10 +206,13 @@ impl FastqWriter { } } -// TODO: needs to know what read # it is, or if it is interleaved -// - enum: Single, Paired, Interleaved -// - enum: FirstOfPair, SecondOfPair -fn spawn_reader(file: PathBuf, decompress: bool) -> Receiver> { +// record_type is None if the input is assumed to be interleaved paired end, FirstOfPair for +// fragment reads or first end of pair reads, and SecondOfPair for the second end of paired reads. +fn spawn_reader( + file: PathBuf, + record_type: Option, + decompress: bool, +) -> Receiver> { // TODO: Change these from Sender>... let (tx, rx) = bounded(READER_CHANNEL_SIZE); std::thread::spawn(move || { @@ -154,14 +237,37 @@ fn spawn_reader(file: PathBuf, decompress: bool) -> Receiver> { } }; // Open a FASTQ reader, get an iterator over the records, and chunk them - let fastq_reader = fastq::Reader::with_capacity(maybe_decoder_handle, BUFSIZE) - .into_records() - .chunks(CHUNKSIZE * num_cpus::get()); - // Iterate over the chunks - for chunk in &fastq_reader { - tx.send(chunk.map(|r| r.expect("Error reading")).collect()) - .expect("Error sending"); - } + let fastq_reader = + fastq::Reader::with_capacity(maybe_decoder_handle, BUFSIZE).into_records(); + + match record_type { + None => { + let iter = fastq_reader.into_iter().enumerate().map(|(index, record)| { + let bit = index & 0x1; + let tpe = if bit == 0 { + RecordType::FirstOfPair + } else { + RecordType::SecondOfPair + }; + FastqOutputRecord { + record: record.expect("Error reading"), + tpe, + } + }); + for chunk in &iter.chunks(CHUNKSIZE * num_cpus::get()) { + tx.send(chunk.collect()).expect("Error sending"); + } + } + Some(tpe) => { + let iter = fastq_reader.into_iter().map(|record| FastqOutputRecord { + record: record.expect("Error reading"), + tpe: tpe.clone(), + }); + for chunk in &iter.chunks(CHUNKSIZE * num_cpus::get()) { + tx.send(chunk.collect()).expect("Error sending"); + } + } + }; }); rx } @@ -385,6 +491,31 @@ fn fqgrep_from_opts(opts: &Opts) -> Result { } } + // set output_type + let output_type = if opts.paired { + if opts.output.len() <= 1 { + OutputType::Interleaved + } else { + OutputType::Paired + } + } else { + OutputType::Fragment + }; + match output_type { + OutputType::Interleaved | OutputType::Fragment => assert!( + opts.output.len() <= 1, + "{} output files specified, but expected {:?} output", + opts.output.len(), + OutputType::Fragment + ), + OutputType::Paired => assert!( + opts.output.len() == 2, + "{} output files specified, but expected {:?} output", + opts.output.len(), + OutputType::Paired + ), + } + // Set up a progress logger if desired let progress_logger = if opts.progress { Some( @@ -421,7 +552,13 @@ fn fqgrep_from_opts(opts: &Opts) -> Result { let (count_tx, count_rx): (Sender, Receiver) = bounded(1); // The writer of final counts or matching records - let writer = FastqWriter::new(count_tx, opts.count, opts.paired, opts.output.clone()); + let writer = FastqWriter::new( + count_tx, + opts.count, + opts.paired, + output_type, + opts.output.clone(), + ); // The main loop pool.install(|| { @@ -435,11 +572,11 @@ fn fqgrep_from_opts(opts: &Opts) -> Result { if files.len() == 1 { // Interleaved paired end FASTQ // The channel FASTQ record chunks are received after being read in - let rx = spawn_reader(files[0].clone(), opts.decompress); + let rx = spawn_reader(files[0].clone(), None, opts.decompress); for reads in izip!(rx.iter()) { let paired_reads = reads .into_iter() - .tuples::<(OwnedRecord, OwnedRecord)>() + .tuples::<(FastqOutputRecord, FastqOutputRecord)>() .collect_vec(); process_paired_reads(paired_reads, &matcher, &writer, &progress_logger); } @@ -447,8 +584,16 @@ fn fqgrep_from_opts(opts: &Opts) -> Result { // Pairs of FASTQ files for file_pairs in files.chunks_exact(2) { // The channels for R1 and R2 with FASTQ record chunks that are received after being read in - let rx1 = spawn_reader(file_pairs[0].clone(), opts.decompress); - let rx2 = spawn_reader(file_pairs[1].clone(), opts.decompress); + let rx1 = spawn_reader( + file_pairs[0].clone(), + Some(RecordType::FirstOfPair), + opts.decompress, + ); + let rx2 = spawn_reader( + file_pairs[1].clone(), + Some(RecordType::SecondOfPair), + opts.decompress, + ); for (reads1, reads2) in izip!(rx1.iter(), rx2.iter()) { let paired_reads = reads1.into_iter().zip(reads2.into_iter()).collect_vec(); process_paired_reads(paired_reads, &matcher, &writer, &progress_logger); @@ -459,17 +604,17 @@ fn fqgrep_from_opts(opts: &Opts) -> Result { // Process one FASTQ at a time for file in files { // The channel FASTQ record chunks are received after being read in - let rx = spawn_reader(file.clone(), opts.decompress); + let rx = spawn_reader(file.clone(), Some(RecordType::FirstOfPair), opts.decompress); for reads in rx.iter() { // Get the matched reads - let matched_reads: Vec = reads + let matched_reads: Vec = reads .into_par_iter() - .map(|mut read| -> Option { + .map(|mut record| -> Option { if let Some(progress) = &progress_logger { progress.record(); } - if matcher.read_match(&mut read) { - Some(read) + if matcher.read_match(&mut record.record) { + Some(record) } else { None } @@ -496,32 +641,34 @@ fn fqgrep_from_opts(opts: &Opts) -> Result { /// Process a chunk of paired end records in parallel. #[allow(clippy::borrowed_box)] // FIXME: remove me later and solve fn process_paired_reads( - reads: Vec<(OwnedRecord, OwnedRecord)>, + reads: Vec<(FastqOutputRecord, FastqOutputRecord)>, matcher: &Box, writer: &FastqWriter, progress_logger: &Option, ) { let matched_reads = reads .into_par_iter() - .map(|(mut read1, mut read2)| { + .map(|(mut record1, mut record2)| { + let r1 = &mut record1.record; + let r2 = &mut record2.record; if let Some(progress) = progress_logger { progress.record(); progress.record(); } assert_eq!( - read1.id_bytes(), - read2.id_bytes(), + r1.id_bytes(), + r2.id_bytes(), "Mismatching read pair! R1: {} R2: {}", - read1.id().unwrap(), - read2.id().unwrap() + r1.id().unwrap(), + r2.id().unwrap() ); // NB: if the output is to be colored, always call read_match on read2, regardless of // whether or not read1 had a match, so that read2 is always colored. If the output // isn't to be colored, only search for a match in read2 if read1 does not have a match - let match1 = matcher.read_match(&mut read1); - let match2 = (!matcher.opts().color && match1) || matcher.read_match(&mut read2); + let match1 = matcher.read_match(r1); + let match2 = (!matcher.opts().color && match1) || matcher.read_match(r2); if match1 || match2 { - Some((read1, read2)) + Some((record1, record2)) } else { None } @@ -647,7 +794,7 @@ pub mod tests { seqs: &Vec>, regexp: &Vec, pattern_from_file: bool, - output: Option, + output: Vec, compression: String, ) -> Opts { let fq_path = write_fastq(&dir, &seqs, compression); @@ -660,16 +807,10 @@ pub mod tests { } }; - let return_opts = Opts { + Opts { threads: 4, color: Color::Never, - count: { - if &output == &None { - true - } else { - false - } - }, + count: output.is_empty(), regexp: pattern_string, fixed_strings: false, file: pattern_file, @@ -678,10 +819,9 @@ pub mod tests { paired: false, reverse_complement: false, progress: true, - args: fq_path.to_vec(), - output: output, - }; - return_opts + args: fq_path, + output, + } } /// Returns sequences from fastq @@ -746,7 +886,7 @@ pub mod tests { vec!["GGTT", "GGCC"], ]; let pattern = pattern.iter().map(|&s| s.to_owned()).collect::>(); - let mut opts = build_opts(&dir, &seqs, &pattern, true, None, String::from(".fq")); + let mut opts = build_opts(&dir, &seqs, &pattern, true, Vec::new(), String::from(".fq")); opts.paired = paired; let result = fqgrep_from_opts(&opts); assert_eq!(result.unwrap(), expected); @@ -791,7 +931,7 @@ pub mod tests { &seqs, &pattern, true, - Some(out_path), + vec![out_path], String::from(".fq"), ); @@ -828,7 +968,14 @@ pub mod tests { let dir = TempDir::new().unwrap(); let seqs = vec![vec!["GGGG", "GGGG"], vec!["AAAA", "CCCC"]]; let pattern = vec![String::from("TTTT")]; - let mut opts = build_opts(&dir, &seqs, &pattern, false, None, String::from(".fq")); + let mut opts = build_opts( + &dir, + &seqs, + &pattern, + false, + Vec::new(), + String::from(".fq"), + ); opts.paired = paired; opts.invert_match = invert_match; @@ -857,7 +1004,7 @@ pub mod tests { let dir = TempDir::new().unwrap(); let seqs = vec![vec!["GGGG", "TTTT"], vec!["AAAA", "CCCC"]]; - let mut opts = build_opts(&dir, &seqs, &pattern, true, None, String::from(".fq")); + let mut opts = build_opts(&dir, &seqs, &pattern, true, Vec::new(), String::from(".fq")); opts.fixed_strings = fixed_strings; let _result = fqgrep_from_opts(&opts); @@ -879,7 +1026,14 @@ pub mod tests { ]; let test_pattern = vec![String::from("A")]; - let mut opts_test = build_opts(&dir, &seqs, &test_pattern, true, None, String::from(".fq")); + let mut opts_test = build_opts( + &dir, + &seqs, + &test_pattern, + true, + Vec::new(), + String::from(".fq"), + ); opts_test.paired = true; let _num_matches = fqgrep_from_opts(&opts_test); @@ -900,7 +1054,14 @@ pub mod tests { ]; let test_pattern = vec![String::from("^G")]; - let mut opts_test = build_opts(&dir, &seqs, &test_pattern, true, None, String::from(".fq")); + let mut opts_test = build_opts( + &dir, + &seqs, + &test_pattern, + true, + Vec::new(), + String::from(".fq"), + ); // Test pattern from file let result = fqgrep_from_opts(&opts_test); @@ -931,7 +1092,7 @@ pub mod tests { let test_pattern = vec![String::from("^G")]; - let opts = build_opts(&dir, &seqs, &test_pattern, true, None, extension); + let opts = build_opts(&dir, &seqs, &test_pattern, true, Vec::new(), extension); let result = fqgrep_from_opts(&opts); assert_eq!(result.unwrap(), expected); } @@ -952,7 +1113,7 @@ pub mod tests { ) { let dir = TempDir::new().unwrap(); let seqs = vec![vec!["GTCAGC"], vec!["AGTGCG"], vec!["GGGTCTG"]]; - let mut opts = build_opts(&dir, &seqs, &pattern, true, None, String::from(".fq")); + let mut opts = build_opts(&dir, &seqs, &pattern, true, Vec::new(), String::from(".fq")); opts.fixed_strings = fixed_strings; assert_eq!(fqgrep(&opts), expected); } From 35a6c8366dddc9713dadb1e7e7de000fb308a39e Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Thu, 8 Jun 2023 08:13:30 -0700 Subject: [PATCH 3/7] Update main.rs --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 2f33432..e0848da 100644 --- a/src/main.rs +++ b/src/main.rs @@ -124,7 +124,7 @@ impl FastqWriter { (writer, None) } else if output.len() == 2 { assert!(output_type == OutputType::Paired); - assert!(!paired); + assert!(paired); let w1: Option> = Some(Box::new(BufWriter::with_capacity( BUFSIZE, File::create(&output[0]).unwrap(), From 0152b72bf9805ae5c2297d32d154a532f300dad6 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Thu, 8 Jun 2023 08:17:22 -0700 Subject: [PATCH 4/7] Update main.rs --- src/main.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index e0848da..933594f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -99,6 +99,8 @@ impl FastqWriter { output_type: OutputType, output: Vec, ) -> Self { + type OptionWriter = Option>; + // TODO: try making this unbounded let (tx, rx): ( Sender>, @@ -106,7 +108,7 @@ impl FastqWriter { ) = bounded(WRITER_CHANNEL_SIZE); std::thread::spawn(move || { - let (mut r1_writer, mut r2_writer): (Option>, Option>) = + let (mut r1_writer, mut r2_writer): (OptionWriter, OptionWriter) = { if count { (None, None) From da8a57bf1c28eb376fb936e140cb8e9c3304d79f Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Thu, 8 Jun 2023 09:05:50 -0700 Subject: [PATCH 5/7] Adding tests --- src/main.rs | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 66 insertions(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index 933594f..1010a80 100644 --- a/src/main.rs +++ b/src/main.rs @@ -175,9 +175,6 @@ impl FastqWriter { } } } - - // fastq::write_to(&mut *writer, &read.head, &read.seq, &read.qual) - // .expect("failed writing read"); } } } @@ -970,6 +967,7 @@ pub mod tests { let dir = TempDir::new().unwrap(); let seqs = vec![vec!["GGGG", "GGGG"], vec!["AAAA", "CCCC"]]; let pattern = vec![String::from("TTTT")]; + let mut opts = build_opts( &dir, &seqs, @@ -987,6 +985,71 @@ pub mod tests { assert_eq!(result.unwrap(), expected); } + fn slurp_fastq(path: &PathBuf) -> Vec { + let handle = File::open(&path).unwrap(); + + // Wrap it in a buffer + let buf_handle = BufReader::with_capacity(BUFSIZE, handle); + // Maybe wrap it in a decompressor + let maybe_decoder_handle = { + if is_gzip_path(&path) { + Box::new(MultiGzDecoder::new(buf_handle)) as Box + } else { + Box::new(buf_handle) as Box + } + }; + // Open a FASTQ reader, get an iterator over the records, and chunk them + fastq::Reader::with_capacity(maybe_decoder_handle, BUFSIZE) + .into_records() + .map(|r| r.expect("Error reading")) + .collect::>() + } + + #[rstest] + fn test_paired_outputs() { + let dir: TempDir = TempDir::new().unwrap(); + let seqs = vec![ + // both r2 r1 neither + vec!["GGGG", "AAAA", "AGGG", "TGCA"], + vec!["GGGG", "GGGA", "CCCC", "ACGT"], + ]; + let pattern = vec![String::from("GGG")]; + let mut opts = build_opts( + &dir, + &seqs, + &pattern, + false, + Vec::new(), + String::from(".fq"), + ); + + let r1_output = dir.path().join("out.r1.fq"); + let r2_output = dir.path().join("out.r2.fq"); + + opts.paired = true; + opts.invert_match = false; + opts.reverse_complement = false; + opts.output = vec![r1_output, r2_output]; + opts.count = false; + + let result = fqgrep_from_opts(&opts); + assert_eq!(result.unwrap(), 3); + + // Check the output FASTQs + let r1_records = slurp_fastq(&opts.output[0]); + let r2_records = slurp_fastq(&opts.output[1]); + assert_eq!(r1_records.len(), 3); + assert_eq!(r2_records.len(), 3); + for (i, rec) in r1_records.iter().enumerate() { + let seq: String = String::from_utf8_lossy(rec.seq()).to_string(); + assert_eq!(seq, seqs[0][i]); + } + for (i, rec) in r2_records.iter().enumerate() { + let seq: String = String::from_utf8_lossy(rec.seq()).to_string(); + assert_eq!(seq, seqs[1][i]); + } + } + // ############################################################################################ // Tests that an error is returned when fixed_strings is true and regex is present // ############################################################################################ From e81bdea6f0cc27cf05ba68d3fb3d0ad63623fa06 Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Thu, 8 Jun 2023 09:15:59 -0700 Subject: [PATCH 6/7] add some docs --- src/main.rs | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/main.rs b/src/main.rs index 1010a80..24d1683 100644 --- a/src/main.rs +++ b/src/main.rs @@ -65,33 +65,66 @@ pub mod built_info { } } +/// If the `OwnedRecord` is the first of pair or fragment (`FirstOfPair`), or second of pair +/// (`SecondOfPair`). Used to determine to which FASTQ writer the record should be written +/// when the output is paired (i.e. R1 and R2 output FASTQs). #[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum RecordType { + /// First of pair or fragment. #[default] FirstOfPair, + /// Second of pair SecondOfPair, } +/// Specifies how to output the matched records. #[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum OutputType { + /// The output is an interleaved FASTQ. For paired end output, this will interleave R1 and R2. #[default] Interleaved, + /// The output is all fragment records. There are no paired end records. Fragment, + /// The output is written to two seperate FASTQs, one for the first of pair reads and one for + /// the second of pair reads. Paired, } +/// The matching FASTQ records to write to the output. The `RecordType` determines to which FASTQ +/// the record should be written when `tpe` is `OutputType::Paired`. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] struct FastqOutputRecord { pub record: OwnedRecord, pub tpe: RecordType, } +/// The FASTQ writer that wraps an underlying writer in a thread. Uses a bounded channel to +/// send records to the underlying writer. struct FastqWriter { tx: Sender>, lock: Mutex<()>, } impl FastqWriter { + /// Creates a new `FastqWriter` that spanws an underlying thread(s) for writing. + /// + /// If `output` is empty, then `output_type` must not be `OutputType::Paired`. The output will + /// be written to standard output, with paired end records interleaved. + /// + /// If `output` has length one, then the above must also be true, except the output will be + /// written to the provided path. + /// + /// If `output` has length two, then the `output_type` must be `OutputType::Paired`. The + /// output will be written to the two separate FASTQs, one for the first of pair reads and one + /// for the second of pair reads. `paired` must be `true` in this case. + /// + /// Arguments: + /// - `count_tx`: the channel to send the total number of matches found upon termination + /// - `count`: true to only send the count to the count channel, false to write to output FASTQs + /// - `paired`: true if the input is paired end, false if the input is all fragment records + /// - `output_type`: the type of output to write to. For paired end input, this determines if + /// the output is interleaved (a single FASTQ output) or split between R1 and R2 FASTQ + /// output. fn new( count_tx: Sender, count: bool, From 0b69857e3546d67f1f7338a463a60516d8126cca Mon Sep 17 00:00:00 2001 From: Nils Homer Date: Thu, 8 Jun 2023 09:17:41 -0700 Subject: [PATCH 7/7] remove TODO --- src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 24d1683..606a1f3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -245,7 +245,6 @@ fn spawn_reader( record_type: Option, decompress: bool, ) -> Receiver> { - // TODO: Change these from Sender>... let (tx, rx) = bounded(READER_CHANNEL_SIZE); std::thread::spawn(move || { // Open the file or standad input