Skip to content

Commit

Permalink
MRG: avoid clones by using new Signature::try_into() -> KmerMinHash (
Browse files Browse the repository at this point in the history
…#471)

* refactor & rename & consolidate

* remove 'lower'

* add cargo doc output for private fn

* add a few comments/docs

* switch to dev version of sourmash

* tracking

* cleaner

* cleanup

* load rocksdb natively

* foo

* cargo fmt

* upd

* upd

* fix fmt

* MRG: create `MultiCollection` for collections that span multiple files (#434)

* preliminary victory

* compiles and mostly runs

* cleanup, split to new module

* cleanup and comment

* more cleanup of diff

* cargo fmt

* fix fmt

* restore n_failed

* comment failing test

* cleanup and de-vec

* create module/submodule structure

* comment for later

* get rid of vec

* beg for help

* cleanup and doc

* clippy fixes

* compiling again

* cleanup

* bump sourmash to v0.15.1

* check if is rocksdb

* weird error

* use remove_unwrap branch of sourmash

* get index to work with MultiCollection

* old bug now fixed

* clippy, format, and fix

* make names clearer

* ditch MultiCollection for index, at least for now

* testy testy

* getting closer

* update sourmash

* mark failing tests

* upd

* cargo fmt

* MRG: test exit from `pairwise` and `multisearch` if no loaded sketches (#437)

* upd

* check for appropriate multisearch error exits

* add more tests for pairwise, too

* cargo fmt

* MRG: switch to more efficient use of `Collection` by removing cloning (#438)

* remove unnecessary clones by switch to references in SmallSignature

* switch away from references for collections => avoid clones

* remove MultiCollection::iter

* MRG: add tests for RocksDB/RevIndex, standalone manifests, and flexible pathlists (#436)

* test using rocksdb as source of sketches

* test file lists of zips

* cargo fmt

* hackity hack hack a picklist

* ok that makes more sense

* it works

* comments around future par_iter

* support loading from a .sig.gz for index

* test pairwise loading from rocksdb

* add test for queries from Rocksdb

* decide not to implement lists of manifests :)

* reenable and fix test_fastgather.py::test_indexed_against

* impl Deref for MultiCollection

* clippy

* switch to using load_sketches method

* deref doesn't actually make sense for MultiCollection

* update to latest sourmash code

* update to latest sourmash code

* simplify

* update to latest sourmash code

* remove unnecessary flag

* MRG: support & test loading of standalone manifests within pathlists (#450)

* use recursion to load paths into a MultiCollection => mf support

* MRG: clean up index to use `MultiCollection` (#451)

* try making index work with standard code

* kinda working

* fmt

* refactor

* clear up the tests

* refactor/clean up

* cargo fmt

* add tests for index warning & error

* comment

* MRG: documentation updates based on new collection loading (#444)

* update docs for #430

* upd documentation

* upd

* Update src/lib.rs

Co-authored-by: Tessa Pierce Ward <[email protected]>

* switch unwrap to expect

* move unwrap to expect

* minor cleanup

* cargo fmt

* provide legacy method to avoid xfail on index loading

* switch to using reference

* update docs to reflect pathlist behavior

* test recursive nature of MultiCollection

* re-enable test that is now passing

* update to latest sourmash

* upd sourmash

* update sourmash

* mut MultiCollection

* cleanup

* update after merge of sourmash-bio/sourmash#3305

* fix contains_revindex

* add trace commands for tracing loading

* use released version of sourmash

* add support for ignoring abundance

* cargo fmt

* avoid downsampling until we know there is overlap

* change downsample to true; add panic assertion

* move downsampling side guard

* eliminate redundant overlap check

* move calc_abund_stats

* extract abundance code into own function; avoid downsampling if poss

* cleanup

* fmt

* update to next sourmash release

* cargo fmt

* upd sourmash

* correct numbers

* upd sourmash

* upd sourmash

* upd sourmash

* upd sourmash

* use new try_into() and eliminate several clone()s

* refactor a bit more

* use new try_into() in manysearch; flag clones

* avoid a few more clones

* eliminate more clone

* fix mismatched clauses

* note minhash

* fix mastiff_manygather

* avoid more clone

* resolve comments

* microchange

* microchange 2

* eliminate more clone: fastgather

* avoid more clone: fastmultigather

* refactor to avoid more clones

* rm one more clone

* cleanup

* cargo fmt

* cargo fmt

* deallocate collection?

* deallocate collection?

* upd sourmash

* cargo fmt

* fix merge foo

* try out new sourmash PR

* upd latest sourmash branch

* upd sourmash

---------

Co-authored-by: Tessa Pierce Ward <[email protected]>
  • Loading branch information
ctb and bluegenes authored Oct 15, 2024
1 parent 02fcbc5 commit a78e145
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 66 deletions.
27 changes: 15 additions & 12 deletions src/fastgather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ pub fn fastgather(
// get single query sig and minhash
let query_sig = query_collection.get_first_sig().expect("no queries!?");

// @CTB avoid clone?
let query_sig_ds = query_sig.clone().select(selection)?; // downsample
let query_mh = match query_sig_ds.minhash() {
Some(query_mh) => query_mh,
None => {
let query_filename = query_sig.filename();
let query_name = query_sig.name();
let query_md5 = query_sig.md5sum();

// clone here is necessary b/c we use full query_sig in consume_query_by_gather
let query_sig_ds = query_sig.select(selection)?; // downsample
let query_mh = match query_sig_ds.try_into() {
Ok(query_mh) => query_mh,
Err(_) => {
bail!("No query sketch matching selection parameters.");
}
};
Expand Down Expand Up @@ -68,7 +72,7 @@ pub fn fastgather(
);

// load a set of sketches, filtering for those with overlaps > threshold
let result = load_sketches_above_threshold(against_collection, query_mh, threshold_hashes)?;
let result = load_sketches_above_threshold(against_collection, &query_mh, threshold_hashes)?;
let matchlist = result.0;
let skipped_paths = result.1;
let failed_paths = result.2;
Expand All @@ -91,12 +95,9 @@ pub fn fastgather(
}

if prefetch_output.is_some() {
let query_filename = query_sig.filename();
let query_name = query_sig.name();
let query_md5 = query_sig.md5sum();
write_prefetch(
query_filename,
query_name,
query_filename.clone(),
query_name.clone(),
query_md5,
prefetch_output,
&matchlist,
Expand All @@ -106,7 +107,9 @@ pub fn fastgather(

// run the gather!
consume_query_by_gather(
query_sig,
query_name,
query_filename,
query_mh,
scaled as u64,
matchlist,
threshold_hashes,
Expand Down
33 changes: 22 additions & 11 deletions src/fastmultigather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,25 @@ pub fn fastmultigather(
let query_name = query_sig.name();
let query_md5 = query_sig.md5sum();

let query_mh = query_sig.minhash().expect("cannot get sketch");
let query_mh: KmerMinHash = query_sig.try_into().expect("cannot get sketch");

// CTB refactor
let query_scaled = query_mh.scaled();
let query_ksize = query_mh.ksize().try_into().unwrap();
let query_hash_function = query_mh.hash_function().clone();
let query_seed = query_mh.seed();
let query_num = query_mh.num();

let mut matching_hashes = if save_matches { Some(Vec::new()) } else { None };
let matchlist: BinaryHeap<PrefetchResult> = against
.iter()
.filter_map(|against| {
let mut mm: Option<PrefetchResult> = None;
if let Ok(overlap) = against.minhash.count_common(query_mh, false) {
if let Ok(overlap) = against.minhash.count_common(&query_mh, false) {
if overlap >= threshold_hashes {
if save_matches {
if let Ok(intersection) = against.minhash.intersection(query_mh)
if let Ok(intersection) =
against.minhash.intersection(&query_mh)
{
matching_hashes.as_mut().unwrap().extend(intersection.0);
}
Expand All @@ -126,8 +135,8 @@ pub fn fastmultigather(

// Save initial list of matches to prefetch output
write_prefetch(
query_filename,
query_name,
query_filename.clone(),
query_name.clone(),
query_md5,
Some(prefetch_output),
&matchlist,
Expand All @@ -136,7 +145,9 @@ pub fn fastmultigather(

// Now, do the gather!
consume_query_by_gather(
query_sig.clone(),
query_name,
query_filename,
query_mh,
scaled as u64,
matchlist,
threshold_hashes,
Expand All @@ -151,12 +162,12 @@ pub fn fastmultigather(
if let Ok(mut file) = File::create(&sig_filename) {
let unique_hashes: HashSet<u64> = hashes.into_iter().collect();
let mut new_mh = KmerMinHash::new(
query_mh.scaled(),
query_mh.ksize().try_into().unwrap(),
query_mh.hash_function().clone(),
query_mh.seed(),
query_scaled,
query_ksize,
query_hash_function,
query_seed,
false,
query_mh.num(),
query_num,
);
new_mh
.add_many(&unique_hashes.into_iter().collect::<Vec<_>>())
Expand Down
13 changes: 8 additions & 5 deletions src/manysearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,15 @@ pub fn manysearch(
// against downsampling happens here
match coll.sig_from_record(record) {
Ok(against_sig) => {
if let Some(against_mh) = against_sig.minhash() {
let against_name = against_sig.name();
let against_md5 = against_sig.md5sum();

if let Ok(against_mh) = against_sig.try_into() {
for query in query_sketchlist.iter() {
// avoid calculating details unless there is overlap
let overlap = query
.minhash
.count_common(against_mh, true)
.count_common(&against_mh, true)
.expect("incompatible sketches")
as f64;

Expand Down Expand Up @@ -115,7 +118,7 @@ pub fn manysearch(
median_abund,
std_abund,
) = if calc_abund_stats {
downsample_and_inflate_abundances(&query.minhash, against_mh)
downsample_and_inflate_abundances(&query.minhash, &against_mh)
.ok()?
} else {
(None, None, None, None, None)
Expand All @@ -124,10 +127,10 @@ pub fn manysearch(
results.push(SearchResult {
query_name: query.name.clone(),
query_md5: query.md5sum.clone(),
match_name: against_sig.name(),
match_name: against_name.clone(),
containment: containment_query_in_target,
intersect_hashes: overlap as usize,
match_md5: Some(against_sig.md5sum()),
match_md5: Some(against_md5.clone()),
jaccard: Some(jaccard),
max_containment: Some(max_containment),
average_abund,
Expand Down
18 changes: 11 additions & 7 deletions src/mastiff_manygather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,23 @@ pub fn mastiff_manygather(
// query downsampling happens here
match coll.sig_from_record(record) {
Ok(query_sig) => {
let query_filename = query_sig.filename();
let query_name = query_sig.name();
let query_md5 = query_sig.md5sum();

let mut results = vec![];
if let Some(query_mh) = query_sig.minhash() {
if let Ok(query_mh) = query_sig.try_into() {
let _ = processed_sigs.fetch_add(1, atomic::Ordering::SeqCst);
// Gather!
let (counter, query_colors, hash_to_color) =
db.prepare_gather_counters(query_mh);
db.prepare_gather_counters(&query_mh);

let matches = db.gather(
counter,
query_colors,
hash_to_color,
threshold,
query_mh,
&query_mh,
Some(selection.clone()),
);
if let Ok(matches) = matches {
Expand All @@ -94,9 +98,9 @@ pub fn mastiff_manygather(
unique_intersect_bp: match_.unique_intersect_bp(),
gather_result_rank: match_.gather_result_rank(),
remaining_bp: match_.remaining_bp(),
query_filename: query_sig.filename(),
query_name: query_sig.name().clone(),
query_md5: query_sig.md5sum().clone(),
query_filename: query_filename.clone(),
query_name: query_name.clone(),
query_md5: query_md5.clone(),
query_bp: query_mh.n_unique_kmers() as usize,
ksize: ksize as usize,
moltype: query_mh.hash_function().to_string(),
Expand Down Expand Up @@ -128,7 +132,7 @@ pub fn mastiff_manygather(
} else {
eprintln!(
"WARNING: no compatible sketches in path '{}'",
query_sig.filename()
query_filename
);
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
}
Expand Down
56 changes: 30 additions & 26 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use sourmash::manifest::{Manifest, Record};
use sourmash::selection::Selection;
use sourmash::signature::{Signature, SigsTrait};
use sourmash::sketch::minhash::KmerMinHash;
use sourmash::storage::SigStore;
use stats::{median, stddev};
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
Expand Down Expand Up @@ -662,9 +661,9 @@ pub fn report_on_collection_loading(
#[allow(clippy::too_many_arguments)]
pub fn branchwater_calculate_gather_stats(
orig_query: &KmerMinHash,
query: KmerMinHash,
query: &KmerMinHash,
// these are separate in PrefetchResult, so just pass them separately in here
match_mh: KmerMinHash,
match_mh: &KmerMinHash,
match_name: String,
match_md5: String,
match_size: usize,
Expand Down Expand Up @@ -749,7 +748,7 @@ pub fn branchwater_calculate_gather_stats(
average_abund = n_unique_weighted_found as f64 / abunds.len() as f64;

// todo: try to avoid clone for these?
median_abund = median(abunds.iter().cloned()).unwrap();
median_abund = median(abunds.iter().cloned()).expect("cannot calculate median");
std_abund = stddev(abunds.iter().cloned());
}

Expand Down Expand Up @@ -788,7 +787,9 @@ pub fn branchwater_calculate_gather_stats(
/// removing matches in 'matchlist' from 'query'.

pub fn consume_query_by_gather(
query: SigStore,
query_name: String,
query_filename: String,
orig_query_mh: KmerMinHash,
scaled: u64,
matchlist: BinaryHeap<PrefetchResult>,
threshold_hashes: u64,
Expand Down Expand Up @@ -817,56 +818,59 @@ pub fn consume_query_by_gather(

let mut last_matches = matching_sketches.len();

let location = query.filename();

let orig_query_mh = query.minhash().unwrap();
let query_bp = orig_query_mh.n_unique_kmers() as usize;
let query_n_hashes = orig_query_mh.size();
let mut query_moltype = orig_query_mh.hash_function().to_string();
if query_moltype.to_lowercase() == "dna" {
query_moltype = query_moltype.to_uppercase();
}
let query_md5sum: String = orig_query_mh.md5sum().clone();
let query_name = query.name().clone();
let query_scaled = orig_query_mh.scaled() as usize;

let mut query_mh = orig_query_mh.clone();
let mut orig_query_ds = orig_query_mh.clone().downsample_scaled(scaled)?;
// to do == use this to subtract hashes instead
// let mut query_mht = KmerMinHashBTree::from(orig_query_mh.clone());
let total_weighted_hashes = orig_query_mh.sum_abunds();
let ksize = orig_query_mh.ksize();
let calc_abund_stats = orig_query_mh.track_abundance();
let orig_query_size = orig_query_mh.size();
let mut last_hashes = orig_query_size;

let mut last_hashes = orig_query_mh.size();
// this clone is necessary because we iteratively change things!
// to do == use this to subtract hashes instead
// let mut query_mh = KmerMinHashBTree::from(orig_query_mh.clone());
let mut query_mh = orig_query_mh.clone();

// some items for full gather results
let mut orig_query_ds = orig_query_mh.downsample_scaled(scaled)?;

// track for full gather results
let mut sum_weighted_found = 0;
let total_weighted_hashes = orig_query_mh.sum_abunds();
let ksize = orig_query_mh.ksize();

// set some bools
let calc_abund_stats = orig_query_mh.track_abundance();
let calc_ani_ci = false;
let ani_confidence_interval_fraction = None;

eprintln!(
"{} iter {}: start: query hashes={} matches={}",
location,
query_filename,
rank,
orig_query_mh.size(),
orig_query_size,
matching_sketches.len()
);

while !matching_sketches.is_empty() {
let best_element = matching_sketches.peek().unwrap();

query_mh = query_mh.downsample_scaled(best_element.minhash.scaled())?;
orig_query_ds = orig_query_ds.downsample_scaled(best_element.minhash.scaled())?;

// CTB: won't need this if we do not allow multiple scaleds;
// see sourmash-bio/sourmash#2951
orig_query_ds = orig_query_ds
.downsample_scaled(best_element.minhash.scaled())
.expect("cannot downsample");

//calculate full gather stats
let match_ = branchwater_calculate_gather_stats(
&orig_query_ds,
query_mh.clone(),
// KmerMinHash::from(query.clone()),
best_element.minhash.clone(),
&query_mh,
&best_element.minhash,
best_element.name.clone(),
best_element.md5sum.clone(),
best_element.overlap as usize,
Expand Down Expand Up @@ -896,7 +900,7 @@ pub fn consume_query_by_gather(
unique_intersect_bp: match_.unique_intersect_bp,
gather_result_rank: match_.gather_result_rank,
remaining_bp: match_.remaining_bp,
query_filename: query.filename(),
query_filename: query_filename.clone(),
query_name: query_name.clone(),
query_md5: query_md5sum.clone(),
query_bp,
Expand Down Expand Up @@ -937,7 +941,7 @@ pub fn consume_query_by_gather(

eprintln!(
"{} iter {}: remaining: query hashes={}(-{}) matches={}(-{})",
location,
query_filename,
rank,
query_mh.size(),
sub_hashes,
Expand Down
13 changes: 8 additions & 5 deletions src/utils/multicollection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,16 @@ impl MultiCollection {
_idx,
record.internal_location()
);
let selected_sig = sig.clone().select(selection).ok()?;
let minhash = selected_sig.minhash()?.clone();

let sig_name = sig.name();
let sig_md5 = sig.md5sum();
let selected_sig = sig.select(selection).ok()?;
let minhash = selected_sig.try_into().expect("cannot extract sketch");

Some(SmallSignature {
location: record.internal_location().to_string(),
name: sig.name(),
md5sum: sig.md5sum(),
name: sig_name,
md5sum: sig_md5,
minhash,
})
}
Expand Down Expand Up @@ -357,7 +360,7 @@ impl MultiCollection {
.par_iter()
.filter_map(|(coll, _idx, record)| match coll.sig_from_record(record) {
Ok(sig) => {
let sig = sig.clone().select(selection).ok()?;
let sig = sig.select(selection).ok()?;
Some(Signature::from(sig))
}
Err(_) => {
Expand Down

0 comments on commit a78e145

Please sign in to comment.