diff --git a/doc/README.md b/doc/README.md index 0bb346ba..63a26538 100644 --- a/doc/README.md +++ b/doc/README.md @@ -312,9 +312,9 @@ sourmash scripts fastgather query.sig.gz database.zip -o results.csv --cores 4 ### Running `fastmultigather` -`fastmultigather` takes a collection of query metagenomes and a collection of sketches as a database, and outputs many CSVs: +`fastmultigather` takes a collection of query metagenomes and a collection of sketches as a database, and outputs a CSV file containing all the matches. ``` -sourmash scripts fastmultigather queries.manifest.csv database.zip --cores 4 --save-matches +sourmash scripts fastmultigather queries.manifest.csv database.zip --cores 4 --save-matches -o results.csv ``` We suggest using standalone manifest CSVs wherever possible, especially if @@ -327,32 +327,26 @@ this can be a significant time savings for large databases. #### Output files for `fastmultigather` -On a database of sketches (but not on RocksDB indexes) -`fastmultigather` will output two CSV files for each query, a -`prefetch` file containing all overlapping matches between that query -and the database, and a `gather` file containing the minimum -metagenome cover for that query in the database. +`fastmultigather` will output a gather file containing all results in +one file, specified with `-o/--output`. `fastmultigather` gather CSVs +provide the same columns as `fastgather`, above. -The prefetch CSV will be named `{signame}.prefetch.csv`, and the -gather CSV will be named `{signame}.gather.csv`. Here, `{signame}` is -the name of your sourmash signature. +In addition, on a database of sketches (but not on RocksDB indexes) +`fastmultigather` will output a `prefetch` file containing all +overlapping matches between that query and the database. The prefetch +CSV will be named `{signame}.prefetch.csv`, where `{signame}` is the +name of your sourmash signature. `--save-matches` is an optional flag that will save the matched hashes for each query in a separate sourmash signature `{signame}.matches.sig`. This can be useful for debugging or for further analysis. -When searching against a RocksDB index, `fastmultigather` will output -a single file containing all gather results, specified with -`-o/--output`. No prefetch results will be output. - -`fastmultigather` gather CSVs provide the same columns as `fastgather`, above. - **Warning:** At the moment, if two different queries have the same - `{signame}`, the CSVs for one of the queries will be overwritten by - the other query. The behavior here is undefined in practice, because - of multithreading: we don't know what queries will be executed when - or files will be written first. + `{signame}`, the output files for one query will be overwritten by + the results from the other query. The behavior here is undefined in + practice, because of multithreading: we don't know what queries will + be executed when or files will be written first. ### Running `manysearch` diff --git a/src/fastgather.rs b/src/fastgather.rs index 80da3f7a..31ab7217 100644 --- a/src/fastgather.rs +++ b/src/fastgather.rs @@ -1,12 +1,13 @@ /// fastgather: Run gather with a query against a list of files. use anyhow::Result; + use sourmash::prelude::Select; use sourmash::selection::Selection; use sourmash::sketch::minhash::KmerMinHash; use crate::utils::{ - consume_query_by_gather, load_collection, load_sketches_above_threshold, write_prefetch, - ReportType, + consume_query_by_gather, csvwriter_thread, load_collection, load_sketches_above_threshold, + write_prefetch, BranchwaterGatherResult, ReportType, }; #[allow(clippy::too_many_arguments)] @@ -109,6 +110,10 @@ pub fn fastgather( .ok(); } + let (send, recv) = + std::sync::mpsc::sync_channel::(rayon::current_num_threads()); + let gather_out_thrd = csvwriter_thread(recv, gather_output); + // run the gather! consume_query_by_gather( query_name, @@ -117,8 +122,13 @@ pub fn fastgather( scaled as u32, matchlist, threshold_hashes, - gather_output, + Some(send), ) .ok(); + + if let Err(e) = gather_out_thrd.join() { + eprintln!("Unable to join internal thread: {:?}", e); + } + Ok(()) } diff --git a/src/fastmultigather.rs b/src/fastmultigather.rs index 5380704b..cbfdbc7b 100644 --- a/src/fastmultigather.rs +++ b/src/fastmultigather.rs @@ -1,6 +1,6 @@ /// fastmultigather: Run gather for multiple queries against a list of files. use anyhow::Result; -use rayon::prelude::*; +use rayon::iter::ParallelIterator; use sourmash::prelude::{Storage, ToWriter}; use sourmash::{selection::Selection, signature::SigsTrait}; @@ -22,7 +22,8 @@ use sourmash::sketch::minhash::KmerMinHash; use sourmash::sketch::Sketch; use crate::utils::{ - consume_query_by_gather, load_collection, write_prefetch, PrefetchResult, ReportType, + consume_query_by_gather, csvwriter_thread, load_collection, write_prefetch, + BranchwaterGatherResult, PrefetchResult, ReportType, }; #[allow(clippy::too_many_arguments)] @@ -34,6 +35,7 @@ pub fn fastmultigather( selection: Selection, allow_failed_sigpaths: bool, save_matches: bool, + output_path: Option, create_empty_results: bool, ) -> Result<()> { let _ = env_logger::try_init(); @@ -82,6 +84,13 @@ pub fn fastmultigather( // load against sketches into memory let against = against_collection.load_sketches()?; + // set up a multi-producer, single-consumer channel. + let (send, recv) = + std::sync::mpsc::sync_channel::(rayon::current_num_threads()); + + // spawn a thread that is dedicated to printing to a buffered output + let gather_out_thrd = csvwriter_thread(recv, output_path); + // Iterate over all queries => do prefetch and gather! let processed_queries = AtomicUsize::new(0); let skipped_paths = AtomicUsize::new(0); @@ -144,9 +153,8 @@ pub fn fastmultigather( }) .collect(); - if !matchlist.is_empty() { + if !matchlist.is_empty() || create_empty_results { let prefetch_output = format!("{}.prefetch.csv", location); - let gather_output = format!("{}.gather.csv", location); // Save initial list of matches to prefetch output write_prefetch( @@ -166,7 +174,7 @@ pub fn fastmultigather( common_scaled, matchlist, threshold_hashes, - Some(gather_output), + Some(send.clone()), ) .ok(); @@ -200,21 +208,6 @@ pub fn fastmultigather( } } else { println!("No matches to '{}'", location); - if create_empty_results { - let prefetch_output = format!("{}.prefetch.csv", location); - let gather_output = format!("{}.gather.csv", location); - // touch output files - match std::fs::File::create(&prefetch_output) { - Ok(_) => {} - Err(e) => { - eprintln!("Failed to create empty prefetch output: {}", e) - } - } - match std::fs::File::create(&gather_output) { - Ok(_) => {} - Err(e) => eprintln!("Failed to create empty gather output: {}", e), - } - } } } Err(_) => { @@ -228,6 +221,11 @@ pub fn fastmultigather( } }); + drop(send); + if let Err(e) = gather_out_thrd.join() { + eprintln!("Unable to join internal thread: {:?}", e); + } + println!( "DONE. Processed {} queries total.", processed_queries.into_inner() diff --git a/src/fastmultigather_rocksdb.rs b/src/fastmultigather_rocksdb.rs index 5adbdc9b..17c0b20f 100644 --- a/src/fastmultigather_rocksdb.rs +++ b/src/fastmultigather_rocksdb.rs @@ -31,12 +31,9 @@ pub fn fastmultigather_rocksdb( println!("Loaded DB"); // grab scaled from the database. - let max_db_scaled = db + let (_, max_db_scaled) = db .collection() - .manifest() - .iter() - .map(|r| r.scaled()) - .max() + .min_max_scaled() .expect("no records in db?!"); let selection_scaled: u32 = match selection.scaled() { diff --git a/src/lib.rs b/src/lib.rs index 8226a633..9df4958f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -155,9 +155,6 @@ fn do_fastmultigather( } } } else { - if output_path.is_some() { - bail!("output path specified, but not running fastmultigather against a rocksdb. See issue #239"); - } match fastmultigather::fastmultigather( query_filenames, siglist_path, @@ -166,6 +163,7 @@ fn do_fastmultigather( selection, allow_failed_sigpaths, save_matches, + output_path, create_empty_results, ) { Ok(_) => Ok(0), diff --git a/src/manysearch.rs b/src/manysearch.rs index d5d629ca..2ce8bcab 100644 --- a/src/manysearch.rs +++ b/src/manysearch.rs @@ -19,6 +19,14 @@ use sourmash::signature::SigsTrait; use sourmash::sketch::minhash::KmerMinHash; use sourmash::storage::SigStore; +type AbundanceStats = ( + Option, + Option, + Option, + Option, + Option, +); + pub fn manysearch( query_filepath: String, against_filepath: String, @@ -174,16 +182,7 @@ pub fn manysearch( fn inflate_abundances( query: &KmerMinHash, against: &KmerMinHash, -) -> Result< - ( - Option, - Option, - Option, - Option, - Option, - ), - SourmashError, -> { +) -> Result { let abunds: Vec; let sum_weighted: u64; let sum_all_abunds: u64 = against.sum_abunds(); diff --git a/src/manysearch_rocksdb.rs b/src/manysearch_rocksdb.rs index 85fd8542..2ca36fe4 100644 --- a/src/manysearch_rocksdb.rs +++ b/src/manysearch_rocksdb.rs @@ -36,12 +36,9 @@ pub fn manysearch_rocksdb( println!("Loaded DB"); // grab scaled from the database. - let max_db_scaled = db + let (_, max_db_scaled) = db .collection() - .manifest() - .iter() - .map(|r| r.scaled()) - .max() + .min_max_scaled() .expect("no records in db?!"); let selection_scaled: u32 = match selection.scaled() { diff --git a/src/multisearch.rs b/src/multisearch.rs index fb030a2b..3ebd24f8 100644 --- a/src/multisearch.rs +++ b/src/multisearch.rs @@ -17,6 +17,14 @@ use crate::utils::multicollection::SmallSignature; use crate::utils::{csvwriter_thread, load_collection, MultiSearchResult, ReportType}; use sourmash::ani_utils::ani_from_containment; +type OverlapStatsReturn = ( + f64, + HashMap, + HashMap, + HashMap>, + HashMap, +); + #[derive(Default, Clone, Debug)] struct ProbOverlapStats { prob_overlap: f64, @@ -71,13 +79,7 @@ fn compute_single_prob_overlap( fn compute_prob_overlap_stats( queries: &Vec, againsts: &Vec, -) -> ( - f64, - HashMap, - HashMap, - HashMap>, - HashMap, -) { +) -> OverlapStatsReturn { let n_comparisons = againsts.len() as f64 * queries.len() as f64; // Combine all the queries and against into a single signature each diff --git a/src/python/sourmash_plugin_branchwater/__init__.py b/src/python/sourmash_plugin_branchwater/__init__.py index 7e3c2d46..aa4ca34c 100755 --- a/src/python/sourmash_plugin_branchwater/__init__.py +++ b/src/python/sourmash_plugin_branchwater/__init__.py @@ -277,13 +277,14 @@ def __init__(self, p): p.add_argument( "-o", "--output", - help="CSV output file for matches. Used for non-rocksdb searches only.", + help="CSV output file containing gather matches", ) p.add_argument( "--create-empty-results", + "--create-empty-prefetch-results", action="store_true", default=False, - help="create empty results file(s) even if no matches", + help="create empty prefetch results file for each query, even if no matches (non-RockSDB only)", ) p.add_argument( "--save-matches", diff --git a/src/python/tests/test_fastmultigather.py b/src/python/tests/test_fastmultigather.py index d47f62ad..69eab9cf 100644 --- a/src/python/tests/test_fastmultigather.py +++ b/src/python/tests/test_fastmultigather.py @@ -49,6 +49,8 @@ def test_simple(runtmp, zip_against): "100000", "-t", "0", + "-o", + runtmp.output("SRR606249.gather.csv"), in_directory=runtmp.output(""), ) @@ -109,6 +111,8 @@ def test_simple_list_of_zips(runtmp): "100000", "-t", "0", + "-o", + runtmp.output("SRR606249.gather.csv"), in_dir=runtmp.output(""), ) @@ -172,6 +176,8 @@ def test_simple_space_in_signame(runtmp): "100000", "-t", "0", + "-o", + runtmp.output("my-favorite-signame.gather.csv"), in_directory=runtmp.output(""), ) @@ -207,6 +213,8 @@ def test_simple_zip_query(runtmp): "100000", "-t", "0", + "-o", + runtmp.output("SRR606249.gather.csv"), in_directory=runtmp.output(""), ) @@ -269,6 +277,8 @@ def test_simple_read_manifests(runtmp): "100000", "-t", "0", + "-o", + runtmp.output("SRR606249.gather.csv"), in_directory=runtmp.output(""), ) @@ -499,14 +509,13 @@ def test_sig_query(runtmp, capfd, indexed): make_file_list(against_list, [sig2, sig47, sig63]) + g_output = runtmp.output("out.csv") + output_params = ["-o", g_output] + if indexed: against_list = index_siglist(runtmp, against_list, runtmp.output("db")) - g_output = runtmp.output("out.csv") - output_params = ["-o", g_output] else: - g_output = runtmp.output("SRR606249.gather.csv") p_output = runtmp.output("SRR606249.prefetch.csv") - output_params = [] runtmp.sourmash( "scripts", @@ -648,7 +657,9 @@ def test_sig_against(runtmp, capfd): g_output = runtmp.output("SRR606249.gather.csv") p_output = runtmp.output("SRR606249.prefetch.csv") - runtmp.sourmash("scripts", "fastmultigather", query, sig2, "-s", "100000") + runtmp.sourmash( + "scripts", "fastmultigather", query, sig2, "-s", "100000", "-o", g_output + ) captured = capfd.readouterr() print(captured.err) @@ -779,6 +790,8 @@ def test_md5(runtmp, zip_query): "100000", "-t", "0", + "-o", + runtmp.output("SRR606249.gather.csv"), in_directory=runtmp.output(""), ) @@ -930,6 +943,8 @@ def test_csv_columns_vs_sourmash_prefetch(runtmp, zip_query, zip_against): "100000", "-t", "0", + "-o", + runtmp.output("SRR606249.gather.csv"), in_directory=runtmp.output(""), ) @@ -1003,7 +1018,9 @@ def test_csv_columns_vs_sourmash_gather_fullresults(runtmp): "100000", "-t", "0", - ) # '-o', g_output, + "-o", + runtmp.output("SRR606249.gather.csv"), + ) assert os.path.exists(g_output) # now run sourmash gather @@ -1149,6 +1166,7 @@ def test_simple_protein(runtmp): sig_names = ["GCA_001593935", "GCA_001593925"] + gather_out = runtmp.output("xxx.csv") runtmp.sourmash( "scripts", "fastmultigather", @@ -1160,16 +1178,17 @@ def test_simple_protein(runtmp): "protein", "-k", "19", + "-o", + gather_out, ) + all_df = pandas.read_csv(gather_out) for qsig in sig_names: - g_output = runtmp.output(os.path.join(qsig + ".gather.csv")) p_output = runtmp.output(os.path.join(qsig + ".prefetch.csv")) - print(g_output) - assert os.path.exists(g_output) assert os.path.exists(p_output) - df = pandas.read_csv(g_output) + df = all_df[all_df["match_name"] == qsig] + assert len(df) == 1 keys = set(df.keys()) assert { @@ -1183,7 +1202,7 @@ def test_simple_protein(runtmp): }.issubset(keys) print(df) # since we're just matching to identical sigs, the md5s should be the same - assert df["query_md5"][0] == df["match_md5"][0] + assert set(df["query_md5"]) == set(df["match_md5"]) def test_simple_dayhoff(runtmp): @@ -1192,6 +1211,7 @@ def test_simple_dayhoff(runtmp): sig_names = ["GCA_001593935", "GCA_001593925"] + gather_out = runtmp.output("xxx.csv") runtmp.sourmash( "scripts", "fastmultigather", @@ -1203,16 +1223,17 @@ def test_simple_dayhoff(runtmp): "dayhoff", "-k", "19", + "-o", + gather_out, ) + all_df = pandas.read_csv(gather_out) for qsig in sig_names: - g_output = runtmp.output(os.path.join(qsig + ".gather.csv")) p_output = runtmp.output(os.path.join(qsig + ".prefetch.csv")) - print(g_output) - assert os.path.exists(g_output) assert os.path.exists(p_output) - df = pandas.read_csv(g_output) + df = all_df[all_df["match_name"] == qsig] + assert len(df) == 1 keys = set(df.keys()) assert { @@ -1226,7 +1247,7 @@ def test_simple_dayhoff(runtmp): }.issubset(keys) print(df) # since we're just matching to identical sigs, the md5s should be the same - assert df["query_md5"][0] == df["match_md5"][0] + assert set(df["query_md5"]) == set(df["match_md5"]) def test_simple_hp(runtmp): @@ -1235,6 +1256,7 @@ def test_simple_hp(runtmp): sig_names = ["GCA_001593935", "GCA_001593925"] + gather_out = runtmp.output("xxx.csv") runtmp.sourmash( "scripts", "fastmultigather", @@ -1246,16 +1268,16 @@ def test_simple_hp(runtmp): "hp", "-k", "19", + "-o", + gather_out, ) + all_df = pandas.read_csv(gather_out) for qsig in sig_names: - g_output = runtmp.output(os.path.join(qsig + ".gather.csv")) p_output = runtmp.output(os.path.join(qsig + ".prefetch.csv")) - print(g_output) - assert os.path.exists(g_output) assert os.path.exists(p_output) - df = pandas.read_csv(g_output) + df = all_df[all_df["match_name"] == qsig] assert len(df) == 1 keys = set(df.keys()) assert { @@ -1269,7 +1291,7 @@ def test_simple_hp(runtmp): }.issubset(keys) print(df) # since we're just matching to identical sigs, the md5s should be the same - assert df["query_md5"][0] == df["match_md5"][0] + assert set(df["query_md5"]) == set(df["match_md5"]) def test_simple_protein_indexed(runtmp): @@ -1551,7 +1573,7 @@ def test_indexed_full_output(runtmp): # check a few columns average_ani = set(df["average_containment_ani"]) avg_ani = set([round(x, 4) for x in average_ani]) - assert avg_ani == {0.9221, 0.9306, 0.9316} # @CTB check against py gather + assert avg_ani == {0.9221, 0.9306, 0.9316} f_unique_weighted = set(df["f_unique_weighted"]) f_unique_weighted = set([round(x, 4) for x in f_unique_weighted]) @@ -1584,6 +1606,8 @@ def test_nonindexed_full_vs_sourmash_gather(runtmp): "100000", "-t", "0", + "-o", + g_output, ) print(runtmp.last_result.out) @@ -1842,6 +1866,8 @@ def test_save_matches(runtmp): "-t", "0", "--save-matches", + "-o", + runtmp.output("SRR606249.gather.csv"), in_directory=runtmp.output(""), ) @@ -1898,7 +1924,7 @@ def test_save_matches(runtmp): assert mg_ss.minhash.contained_by(match_mh) < 1 -def test_create_empty_results(runtmp): +def test_create_empty_prefetch_results(runtmp): # sig2 has 0 hashes in common with 47 and 63 sig2 = get_test_data("2.fa.sig.gz") sig47 = get_test_data("47.fa.sig.gz") @@ -1910,6 +1936,7 @@ def test_create_empty_results(runtmp): make_file_list(query_list, [sig2]) make_file_list(against_list, [sig47, sig63]) + gather_out = runtmp.output("SRR606249.gather.csv") runtmp.sourmash( "scripts", "fastmultigather", @@ -1920,12 +1947,13 @@ def test_create_empty_results(runtmp): "-t", "0", "--create-empty-results", + "-o", + gather_out, in_directory=runtmp.output(""), ) print(os.listdir(runtmp.output(""))) - g_output = runtmp.output("CP001071.1.gather.csv") p_output = runtmp.output("CP001071.1.prefetch.csv") assert os.path.exists(p_output) @@ -1985,6 +2013,8 @@ def test_simple_query_scaled(runtmp): against_list, "-t", "0", + "-o", + runtmp.output("SRR606249.gather.csv"), in_directory=runtmp.output(""), ) @@ -2089,9 +2119,6 @@ def test_equal_matches(runtmp, indexed): against_list, runtmp.output("db"), ) - out_args = ("-o", outfile) - else: - out_args = () runtmp.sourmash( "scripts", @@ -2099,7 +2126,8 @@ def test_equal_matches(runtmp, indexed): "mg.sig", against_list, "--threshold-bp=0", - *out_args, + "-o", + outfile, ) df = pandas.read_csv(runtmp.output(outfile)) @@ -2122,14 +2150,12 @@ def test_explicit_scaled(runtmp, indexed): against_list = zip_siglist(runtmp, against_list, runtmp.output("against.zip")) outfile = runtmp.output("SRR606249.gather.csv") - out_args = () if indexed: against_list = index_siglist( runtmp, against_list, runtmp.output("db"), ) - out_args = ("-o", outfile) runtmp.sourmash( "scripts", @@ -2140,7 +2166,8 @@ def test_explicit_scaled(runtmp, indexed): "150000", "-t", "0", - *out_args, + "-o", + outfile, in_directory=runtmp.output(""), ) diff --git a/src/python/tests/test_multisearch.py b/src/python/tests/test_multisearch.py index 5b78223d..caeeea7f 100644 --- a/src/python/tests/test_multisearch.py +++ b/src/python/tests/test_multisearch.py @@ -802,7 +802,6 @@ def test_empty_query(runtmp, capfd): captured = capfd.readouterr() print(captured.err) assert "No query signatures loaded, exiting." in captured.err - # @CTB def test_nomatch_query_warn(runtmp, capfd, zip_query): diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 3c3e82f8..2ee44c84 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -19,7 +19,7 @@ use std::io::{BufWriter, Write}; use std::panic; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::Receiver; +use std::sync::mpsc::{Receiver, SyncSender}; use std::thread::JoinHandle; use zip::write::{FileOptions, ZipWriter}; use zip::CompressionMethod; @@ -82,7 +82,7 @@ pub fn prefetch( let searchsig = &result.minhash; let overlap = searchsig.count_common(query_mh, false); if let Ok(overlap) = overlap { - if overlap >= threshold_hashes { + if overlap > 0 && overlap >= threshold_hashes { let result = PrefetchResult { overlap, ..result }; mm = Some(result); } @@ -461,7 +461,7 @@ pub fn load_sketches_above_threshold( // good? ok, store as candidate from prefetch. if let Ok(overlap) = against_mh_ds.count_common(query, false) { - if overlap >= threshold_hashes { + if overlap > 0 && overlap >= threshold_hashes { let result = PrefetchResult { name: against_record.name().to_string(), md5sum: against_md5, @@ -807,26 +807,8 @@ pub fn consume_query_by_gather( scaled: u32, matchlist: BinaryHeap, threshold_hashes: u64, - gather_output: Option, + gather_output: Option>, ) -> Result<()> { - // Define the writer to stdout by default - let mut writer: Box = Box::new(std::io::stdout()); - - if let Some(output_path) = &gather_output { - // Account for potential missing dir in output path - let directory_path = Path::new(output_path).parent(); - - // If a directory path exists in the filename, create it if it doesn't already exist - if let Some(dir) = directory_path { - create_dir_all(dir)?; - } - - let file = File::create(output_path)?; - writer = Box::new(BufWriter::new(file)); - } - // create csv writer - let mut csv_writer = Writer::from_writer(writer); - let mut matching_sketches = matchlist; let mut rank = 0; @@ -937,8 +919,10 @@ pub fn consume_query_by_gather( match_containment_ani_ci_high: match_.match_containment_ani_ci_high, }; sum_weighted_found = gather_result.sum_weighted_found; - // serialize result to file. - csv_writer.serialize(gather_result)?; + // send result to channel => CSV file. + if let Some(ref s) = gather_output { + s.send(gather_result)?; + } // remove! query_mh.remove_from(&best_element.minhash)?; @@ -966,6 +950,7 @@ pub fn consume_query_by_gather( last_hashes = query_mh.size(); last_matches = matching_sketches.len(); } + Ok(()) }