diff --git a/src/fastgather.rs b/src/fastgather.rs index 82362c85..ff4a07ea 100644 --- a/src/fastgather.rs +++ b/src/fastgather.rs @@ -5,7 +5,6 @@ use sourmash::selection::Selection; // use camino; use sourmash::prelude::Select; -use sourmash::signature::SigsTrait; use crate::utils::{ consume_query_by_gather, load_collection, load_sketches_above_threshold, write_prefetch, diff --git a/src/fastmultigather.rs b/src/fastmultigather.rs index d7537c8a..f28dcb85 100644 --- a/src/fastmultigather.rs +++ b/src/fastmultigather.rs @@ -2,19 +2,16 @@ use anyhow::Result; use rayon::prelude::*; -use serde::Serialize; -use sourmash::prelude::Select; use sourmash::selection::Selection; use sourmash::sketch::Sketch; use sourmash::storage::SigStore; -use sourmash::{selection, signature::Signature}; use std::sync::atomic; use std::sync::atomic::AtomicUsize; use std::collections::BinaryHeap; -use camino::{Utf8Path, Utf8PathBuf}; +use camino::Utf8Path; use crate::utils::{ consume_query_by_gather, load_collection, write_prefetch, PrefetchResult, ReportType, diff --git a/src/mastiff_manygather.rs b/src/mastiff_manygather.rs index 2175e759..6a80a647 100644 --- a/src/mastiff_manygather.rs +++ b/src/mastiff_manygather.rs @@ -2,7 +2,6 @@ use anyhow::Result; use rayon::prelude::*; -use sourmash::signature::Signature; use sourmash::sketch::Sketch; use std::path::Path; diff --git a/src/multisearch.rs b/src/multisearch.rs index be9989f6..0d772276 100644 --- a/src/multisearch.rs +++ b/src/multisearch.rs @@ -4,18 +4,14 @@ use rayon::prelude::*; use std::fs::File; use std::io::{BufWriter, Write}; -use std::path::Path; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -use sourmash::prelude::Select; use sourmash::selection::Selection; use sourmash::signature::SigsTrait; -use sourmash::sketch::Sketch; -use sourmash::storage::SigStore; -use crate::utils::{load_collection, ReportType}; +use crate::utils::{load_collection, load_mh_with_name_and_md5, ReportType}; /// Search many queries against a list of signatures. /// @@ -31,37 +27,14 @@ pub fn multisearch( ) -> Result<(), Box> { // Load all queries into memory at once. - // let queries = load_sketches_from_zip_or_pathlist(&querylist, &template, ReportType::Query)?; let query_collection = load_collection(query_filepath, selection, ReportType::Query)?; - let mut queries: Vec = vec![]; - for (idx, record) in query_collection.iter() { - if let Ok(sig) = query_collection.sig_from_record(record) - // .unwrap() - // .select(&selection) // if we select here, we downsample and the md5sum changes! - // ...which means we would lose the original md5sum that is used in the standard gather results. - { - queries.push(sig); - } else { - eprintln!("Failed to load 'against' record: {}", record.name()); - } - } + let queries = + load_mh_with_name_and_md5(query_collection, &selection, ReportType::Query).unwrap(); // Load all against sketches into memory at once. - // let against = load_sketches_from_zip_or_pathlist(&againstlist, &template, ReportType::Against)?; let against_collection = load_collection(against_filepath, selection, ReportType::Against)?; - let mut against: Vec = vec![]; - - for (idx, record) in against_collection.iter() { - if let Ok(sig) = against_collection.sig_from_record(record) - // .unwrap() - // .select(&selection) // if we select here, we downsample and the md5sum changes! - // ...which means we would lose the original md5sum that is used in the standard gather results. - { - against.push(sig); - } else { - eprintln!("Failed to load 'against' record: {}", record.name()); - } - } + let against = + load_mh_with_name_and_md5(against_collection, &selection, ReportType::Against).unwrap(); // set up a multi-producer, single-consumer channel. let (send, recv) = std::sync::mpsc::sync_channel(rayon::current_num_threads()); @@ -94,49 +67,42 @@ pub fn multisearch( let send = against .par_iter() - .filter_map(|target| { + .filter_map(|(against_mh, against_name, against_md5)| { let mut results = vec![]; - - let ds_against_sig = target.clone().select(&selection).unwrap(); - if let Some(against_mh) = ds_against_sig.minhash() { - // search for matches & save containment. - for query_sig in queries.iter() { - let i = processed_cmp.fetch_add(1, atomic::Ordering::SeqCst); - if i % 100000 == 0 { - eprintln!("Processed {} comparisons", i); - } - let ds_q = query_sig.clone().select(&selection).unwrap(); - let query_mh = ds_q.minhash()?; - let overlap = query_mh.count_common(&against_mh, false).unwrap() as f64; - // use downsampled sizes - let query_size = query_mh.size() as f64; - let target_size = against_mh.size() as f64; - - let containment_query_in_target = overlap / query_size; - let containment_in_target = overlap / target_size; - let max_containment = containment_query_in_target.max(containment_in_target); - let jaccard = overlap / (target_size + query_size - overlap); - - if containment_query_in_target > threshold { - results.push(( - query_sig.name(), - query_sig.md5sum(), - target.name(), - target.md5sum(), - containment_query_in_target, - max_containment, - jaccard, - overlap, - )) - } + // search for matches & save containment. + for (query_mh, query_name, query_md5) in queries.iter() { + let i = processed_cmp.fetch_add(1, atomic::Ordering::SeqCst); + if i % 100000 == 0 { + eprintln!("Processed {} comparisons", i); } - if results.is_empty() { - None - } else { - Some(results) + + let overlap = query_mh.count_common(&against_mh, false).unwrap() as f64; + // use downsampled sizes + let query_size = query_mh.size() as f64; + let target_size = against_mh.size() as f64; + + let containment_query_in_target = overlap / query_size; + let containment_in_target = overlap / target_size; + let max_containment = containment_query_in_target.max(containment_in_target); + let jaccard = overlap / (target_size + query_size - overlap); + + if containment_query_in_target > threshold { + results.push(( + query_name.clone(), + query_md5.clone(), + against_name.clone(), + against_md5.clone(), + containment_query_in_target, + max_containment, + jaccard, + overlap, + )) } - } else { + } + if results.is_empty() { None + } else { + Some(results) } }) .flatten() diff --git a/src/pairwise.rs b/src/pairwise.rs index c4c0a886..b6713d41 100644 --- a/src/pairwise.rs +++ b/src/pairwise.rs @@ -1,7 +1,6 @@ use anyhow::Result; /// pairwise: massively parallel in-memory pairwise comparisons. use rayon::prelude::*; -use sourmash::sketch::minhash::KmerMinHash; use std::fs::File; use std::io::{BufWriter, Write}; @@ -11,12 +10,9 @@ use std::sync::atomic; use std::sync::atomic::AtomicUsize; use sourmash::signature::SigsTrait; -use sourmash::sketch::Sketch; -use crate::utils::{load_collection, ReportType}; -use sourmash::prelude::Select; +use crate::utils::{load_collection, load_mh_with_name_and_md5, ReportType}; use sourmash::selection::Selection; -use sourmash::storage::SigStore; /// Perform pairwise comparisons of all signatures in a list. /// @@ -29,7 +25,7 @@ pub fn pairwise>( output: Option

, ) -> Result<(), Box> { // Load all sigs into memory at once. - let collection = load_collection(sigpath, selection, ReportType::Query)?; + let collection = load_collection(sigpath, selection, ReportType::Pairwise)?; if collection.len() <= 1 { bail!( @@ -37,17 +33,7 @@ pub fn pairwise>( &sigpath ) } - - let mut sketches: Vec<(KmerMinHash, String, String)> = Vec::new(); - for (_idx, record) in collection.iter() { - if let Ok(sig) = collection.sig_from_record(record) { - if let Some(ds_mh) = sig.clone().select(&selection)?.minhash().cloned() { - sketches.push((ds_mh, record.name().to_string(), record.md5().to_string())); - } - } else { - eprintln!("Failed to load record: {}", record.name()); - } - } + let sketches = load_mh_with_name_and_md5(collection, &selection, ReportType::Pairwise).unwrap(); // set up a multi-producer, single-consumer channel. let (send, recv) = std::sync::mpsc::sync_channel(rayon::current_num_threads()); diff --git a/src/python/tests/test_pairwise.py b/src/python/tests/test_pairwise.py index 55259e85..0dd67c05 100644 --- a/src/python/tests/test_pairwise.py +++ b/src/python/tests/test_pairwise.py @@ -147,7 +147,7 @@ def test_bad_query(runtmp, capfd): print(captured.err) assert "WARNING: could not load sketches from path 'no-exist'" in captured.err - assert "WARNING: 1 query paths failed to load. See error messages above." in captured.err + assert "WARNING: 1 signature paths failed to load. See error messages above." in captured.err def test_bad_query_2(runtmp, capfd): @@ -241,7 +241,7 @@ def test_nomatch_query(runtmp, capfd, zip_query): captured = capfd.readouterr() print(captured.err) - assert 'WARNING: skipped 1 query paths - no compatible signatures' in captured.err + assert 'WARNING: skipped 1 signature paths - no compatible signatures' in captured.err @pytest.mark.parametrize("zip_db", [False, True]) diff --git a/src/utils.rs b/src/utils.rs index eeff3ff9..5cd49de1 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -5,40 +5,26 @@ use sourmash::manifest::Manifest; use sourmash::selection::Select; use std::fs::{create_dir_all, File}; -use std::io::Read; use std::io::{BufRead, BufReader, BufWriter, Write}; use std::panic; use std::path::{Path, PathBuf}; -use tempfile::tempdir; -use zip::read::ZipArchive; - use std::sync::atomic; use std::sync::atomic::AtomicUsize; use std::collections::BinaryHeap; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, Result}; use std::cmp::{Ordering, PartialOrd}; -use sourmash::collection::{self, Collection}; -use sourmash::errors::SourmashError; +use sourmash::collection::Collection; use sourmash::manifest::Record; use sourmash::selection::Selection; use sourmash::signature::{Signature, SigsTrait}; -use sourmash::sketch::minhash::{max_hash_for_scaled, KmerMinHash}; +use sourmash::sketch::minhash::KmerMinHash; use sourmash::sketch::Sketch; use sourmash::storage::{FSStorage, InnerStorage, SigStore}; -/// Track a name/minhash. - -pub struct SmallSignature { - pub location: String, - pub name: String, - pub md5sum: String, - pub minhash: KmerMinHash, -} - /// Structure to hold overlap information from comparisons. pub struct PrefetchResult { @@ -68,86 +54,6 @@ impl PartialEq for PrefetchResult { impl Eq for PrefetchResult {} -/// check to see if two KmerMinHash are compatible. -/// -/// CTB note: despite the name, downsampling is not performed? -/// Although it checks if they are compatible in one direction... - -pub fn check_compatible_downsample( - me: &KmerMinHash, - other: &KmerMinHash, -) -> Result<(), sourmash::Error> { - /* // ignore num minhashes. - if self.num != other.num { - return Err(Error::MismatchNum { - n1: self.num, - n2: other.num, - } - .into()); - } - */ - use sourmash::Error; - - if me.ksize() != other.ksize() { - return Err(Error::MismatchKSizes); - } - if me.hash_function() != other.hash_function() { - // TODO: fix this error - return Err(Error::MismatchDNAProt); - } - if me.max_hash() < other.max_hash() { - return Err(Error::MismatchScaled); - } - if me.seed() != other.seed() { - return Err(Error::MismatchSeed); - } - Ok(()) -} - -/// Given a vec of search Signatures, each containing one or more sketches, -/// and a template Sketch, return a compatible (& now downsampled) -/// Sketch from the search Signatures.. -/// -/// CTB note: this will return the first acceptable match, I think, ignoring -/// all others. - -pub fn prepare_query( - search_sigs: &[Signature], - template: &Sketch, - location: &str, -) -> Option { - for search_sig in search_sigs.iter() { - // find exact match for template? - if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(template) { - return Some(SmallSignature { - location: location.to_string().clone(), - name: search_sig.name(), - md5sum: mh.md5sum(), - minhash: mh.clone(), - }); - } else { - // no - try to find one that can be downsampled - if let Sketch::MinHash(template_mh) = template { - for sketch in search_sig.sketches() { - if let Sketch::MinHash(ref_mh) = sketch { - if check_compatible_downsample(&ref_mh, template_mh).is_ok() { - let max_hash = max_hash_for_scaled(template_mh.scaled()); - let mh = ref_mh.downsample_max_hash(max_hash).unwrap(); - return Some(SmallSignature { - location: location.to_string().clone(), - name: search_sig.name(), - md5sum: ref_mh.md5sum(), // original - minhash: mh, // downsampled - }); - } - } - } - } - } - } - None -} - /// Find sketches in 'sketchlist' that overlap with 'query' above /// specified threshold. @@ -319,42 +225,26 @@ pub fn load_fasta_fromfile>( Ok(results) } -/// Load a collection of sketches from a file in parallel. -pub fn load_sketches( - sketchlist_paths: Vec, - template: &Sketch, -) -> Result<(Vec, usize, usize)> { - let skipped_paths = AtomicUsize::new(0); - let failed_paths = AtomicUsize::new(0); - - let sketchlist: Vec = sketchlist_paths - .par_iter() - .filter_map(|m| { - let filename = m.display().to_string(); - - match Signature::from_path(m) { - Ok(sigs) => { - let sm = prepare_query(&sigs, template, &filename); - if sm.is_none() { - // track number of paths that have no matching sigs - let _i = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); - } - sm - } - Err(err) => { - // failed to load from this path - print error & track. - eprintln!("Sketch loading error: {}", err); - eprintln!("WARNING: could not load sketches from path '{}'", filename); - let _i = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); - None - } +pub fn load_mh_with_name_and_md5( + collection: Collection, + selection: &Selection, + report_type: ReportType, +) -> Result> { + let mut sketchinfo: Vec<(KmerMinHash, String, String)> = Vec::new(); + for (_idx, record) in collection.iter() { + if let Ok(sig) = collection.sig_from_record(record) { + if let Some(ds_mh) = sig.clone().select(&selection)?.minhash().cloned() { + sketchinfo.push((ds_mh, record.name().to_string(), record.md5().to_string())); } - }) - .collect(); - - let skipped_paths = skipped_paths.load(atomic::Ordering::SeqCst); - let failed_paths = failed_paths.load(atomic::Ordering::SeqCst); - Ok((sketchlist, skipped_paths, failed_paths)) + } else { + bail!( + "Error: Failed to load {} record: {}", + report_type, + record.name() + ); + } + } + Ok(sketchinfo) } /// Load a collection of sketches from a file, filtering to keep only @@ -423,7 +313,7 @@ pub fn load_sketches_above_threshold( pub enum ReportType { Query, Against, - Index, + Pairwise, } impl std::fmt::Display for ReportType { @@ -431,7 +321,7 @@ impl std::fmt::Display for ReportType { let description = match self { ReportType::Query => "query", ReportType::Against => "search", - ReportType::Index => "index", + ReportType::Pairwise => "signature", }; write!(f, "{}", description) } @@ -447,7 +337,7 @@ pub fn load_collection( } let mut n_failed = 0; - let mut collection = if sigpath.extension().map_or(false, |ext| ext == "zip") { + let collection = if sigpath.extension().map_or(false, |ext| ext == "zip") { match Collection::from_zipfile(&sigpath) { Ok(collection) => collection, Err(_) => bail!("failed to load {} zipfile: '{}'", report_type, sigpath), @@ -514,34 +404,7 @@ pub fn load_collection( Ok(selected) } -pub fn report_on_collection_loading( - collection: &Collection, - skipped_paths: usize, - failed_paths: usize, - report_type: ReportType, -) -> Result<()> { - if failed_paths > 0 { - eprintln!( - "WARNING: {} {} paths failed to load. See error messages above.", - failed_paths, report_type - ); - } - if skipped_paths > 0 { - eprintln!( - "WARNING: skipped {} {} paths - no compatible signatures.", - skipped_paths, report_type - ); - } - - // Validate sketches - if collection.is_empty() { - bail!("No {} signatures loaded, exiting.", report_type); - } - eprintln!("Loaded {} {} signature(s)", collection.len(), report_type); - Ok(()) -} - -/// Uses the output of sketch loading functions to report the +/// Uses the output of collection loading function to report the /// total number of sketches loaded, as well as the number of files, /// if any, that failed to load or contained no compatible sketches. /// If no sketches were loaded, bail. @@ -563,8 +426,8 @@ pub fn report_on_collection_loading( /// /// Returns an error if: /// * No signatures were successfully loaded. -pub fn report_on_sketch_loading( - sketchlist: &[SmallSignature], +pub fn report_on_collection_loading( + collection: &Collection, skipped_paths: usize, failed_paths: usize, report_type: ReportType, @@ -583,10 +446,10 @@ pub fn report_on_sketch_loading( } // Validate sketches - eprintln!("Loaded {} {} signature(s)", sketchlist.len(), report_type); - if sketchlist.is_empty() { + if collection.is_empty() { bail!("No {} signatures loaded, exiting.", report_type); } + eprintln!("Loaded {} {} signature(s)", collection.len(), report_type); Ok(()) } @@ -687,26 +550,6 @@ pub fn consume_query_by_gather( Ok(()) } -pub fn build_template(ksize: u8, scaled: usize, moltype: &str) -> Sketch { - let hash_function = match moltype { - "dna" => HashFunctions::Murmur64Dna, - "protein" => HashFunctions::Murmur64Protein, - "dayhoff" => HashFunctions::Murmur64Dayhoff, - "hp" => HashFunctions::Murmur64Hp, - _ => panic!("Unknown molecule type: {}", moltype), - }; - //adjust ksize if not dna - let adjusted_ksize = if moltype == "dna" { ksize } else { ksize * 3 }; - let max_hash = max_hash_for_scaled(scaled as u64); - let template_mh = KmerMinHash::builder() - .num(0u32) - .ksize(adjusted_ksize as u32) - .max_hash(max_hash) - .hash_function(hash_function) - .build(); - Sketch::MinHash(template_mh) -} - pub fn build_selection(ksize: u8, scaled: usize, moltype: &str) -> Selection { let hash_function = match moltype { "dna" => HashFunctions::Murmur64Dna,