Skip to content

Commit

Permalink
MRG: refactor fastgather/fastmultigather CSV output to use mpsc chann…
Browse files Browse the repository at this point in the history
…els (#567)

* refactor CSV output for fastgather/fastmultigather to use mpsc

* cargo fmt

* tests mostly pass

* fix skipmer test

* upd comment

* black

* switch to min_max_scaled for rocksdb

* black

* ensure overlap is > 0

* rm print

* cleanup

* fix clippy messages about too-complex returns

* cargo fmt

* upd overlap

* fix

* fix tests

* update docs

* upd

* break test again

* do heinous dev stuff

* fix fix comment

* upd

* upd

* do not require -o after all
  • Loading branch information
ctb authored Jan 7, 2025
1 parent 48bb1c9 commit 40880f0
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 131 deletions.
34 changes: 14 additions & 20 deletions doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ sourmash scripts fastgather query.sig.gz database.zip -o results.csv --cores 4

### Running `fastmultigather`

`fastmultigather` takes a collection of query metagenomes and a collection of sketches as a database, and outputs many CSVs:
`fastmultigather` takes a collection of query metagenomes and a collection of sketches as a database, and outputs a CSV file containing all the matches.
```
sourmash scripts fastmultigather queries.manifest.csv database.zip --cores 4 --save-matches
sourmash scripts fastmultigather queries.manifest.csv database.zip --cores 4 --save-matches -o results.csv
```

We suggest using standalone manifest CSVs wherever possible, especially if
Expand All @@ -327,32 +327,26 @@ this can be a significant time savings for large databases.

#### Output files for `fastmultigather`

On a database of sketches (but not on RocksDB indexes)
`fastmultigather` will output two CSV files for each query, a
`prefetch` file containing all overlapping matches between that query
and the database, and a `gather` file containing the minimum
metagenome cover for that query in the database.
`fastmultigather` will output a gather file containing all results in
one file, specified with `-o/--output`. `fastmultigather` gather CSVs
provide the same columns as `fastgather`, above.

The prefetch CSV will be named `{signame}.prefetch.csv`, and the
gather CSV will be named `{signame}.gather.csv`. Here, `{signame}` is
the name of your sourmash signature.
In addition, on a database of sketches (but not on RocksDB indexes)
`fastmultigather` will output a `prefetch` file containing all
overlapping matches between that query and the database. The prefetch
CSV will be named `{signame}.prefetch.csv`, where `{signame}` is the
name of your sourmash signature.

`--save-matches` is an optional flag that will save the matched hashes
for each query in a separate sourmash signature
`{signame}.matches.sig`. This can be useful for debugging or for
further analysis.

When searching against a RocksDB index, `fastmultigather` will output
a single file containing all gather results, specified with
`-o/--output`. No prefetch results will be output.

`fastmultigather` gather CSVs provide the same columns as `fastgather`, above.

**Warning:** At the moment, if two different queries have the same
`{signame}`, the CSVs for one of the queries will be overwritten by
the other query. The behavior here is undefined in practice, because
of multithreading: we don't know what queries will be executed when
or files will be written first.
`{signame}`, the output files for one query will be overwritten by
the results from the other query. The behavior here is undefined in
practice, because of multithreading: we don't know what queries will
be executed when or files will be written first.

### Running `manysearch`

Expand Down
16 changes: 13 additions & 3 deletions src/fastgather.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
/// fastgather: Run gather with a query against a list of files.
use anyhow::Result;

use sourmash::prelude::Select;
use sourmash::selection::Selection;
use sourmash::sketch::minhash::KmerMinHash;

use crate::utils::{
consume_query_by_gather, load_collection, load_sketches_above_threshold, write_prefetch,
ReportType,
consume_query_by_gather, csvwriter_thread, load_collection, load_sketches_above_threshold,
write_prefetch, BranchwaterGatherResult, ReportType,
};

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -109,6 +110,10 @@ pub fn fastgather(
.ok();
}

let (send, recv) =
std::sync::mpsc::sync_channel::<BranchwaterGatherResult>(rayon::current_num_threads());
let gather_out_thrd = csvwriter_thread(recv, gather_output);

// run the gather!
consume_query_by_gather(
query_name,
Expand All @@ -117,8 +122,13 @@ pub fn fastgather(
scaled as u32,
matchlist,
threshold_hashes,
gather_output,
Some(send),
)
.ok();

if let Err(e) = gather_out_thrd.join() {
eprintln!("Unable to join internal thread: {:?}", e);
}

Ok(())
}
38 changes: 18 additions & 20 deletions src/fastmultigather.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/// fastmultigather: Run gather for multiple queries against a list of files.
use anyhow::Result;
use rayon::prelude::*;
use rayon::iter::ParallelIterator;

use sourmash::prelude::{Storage, ToWriter};
use sourmash::{selection::Selection, signature::SigsTrait};
Expand All @@ -22,7 +22,8 @@ use sourmash::sketch::minhash::KmerMinHash;
use sourmash::sketch::Sketch;

use crate::utils::{
consume_query_by_gather, load_collection, write_prefetch, PrefetchResult, ReportType,
consume_query_by_gather, csvwriter_thread, load_collection, write_prefetch,
BranchwaterGatherResult, PrefetchResult, ReportType,
};

#[allow(clippy::too_many_arguments)]
Expand All @@ -34,6 +35,7 @@ pub fn fastmultigather(
selection: Selection,
allow_failed_sigpaths: bool,
save_matches: bool,
output_path: Option<String>,
create_empty_results: bool,
) -> Result<()> {
let _ = env_logger::try_init();
Expand Down Expand Up @@ -82,6 +84,13 @@ pub fn fastmultigather(
// load against sketches into memory
let against = against_collection.load_sketches()?;

// set up a multi-producer, single-consumer channel.
let (send, recv) =
std::sync::mpsc::sync_channel::<BranchwaterGatherResult>(rayon::current_num_threads());

// spawn a thread that is dedicated to printing to a buffered output
let gather_out_thrd = csvwriter_thread(recv, output_path);

// Iterate over all queries => do prefetch and gather!
let processed_queries = AtomicUsize::new(0);
let skipped_paths = AtomicUsize::new(0);
Expand Down Expand Up @@ -144,9 +153,8 @@ pub fn fastmultigather(
})
.collect();

if !matchlist.is_empty() {
if !matchlist.is_empty() || create_empty_results {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);

// Save initial list of matches to prefetch output
write_prefetch(
Expand All @@ -166,7 +174,7 @@ pub fn fastmultigather(
common_scaled,
matchlist,
threshold_hashes,
Some(gather_output),
Some(send.clone()),
)
.ok();

Expand Down Expand Up @@ -200,21 +208,6 @@ pub fn fastmultigather(
}
} else {
println!("No matches to '{}'", location);
if create_empty_results {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);
// touch output files
match std::fs::File::create(&prefetch_output) {
Ok(_) => {}
Err(e) => {
eprintln!("Failed to create empty prefetch output: {}", e)
}
}
match std::fs::File::create(&gather_output) {
Ok(_) => {}
Err(e) => eprintln!("Failed to create empty gather output: {}", e),
}
}
}
}
Err(_) => {
Expand All @@ -228,6 +221,11 @@ pub fn fastmultigather(
}
});

drop(send);
if let Err(e) = gather_out_thrd.join() {
eprintln!("Unable to join internal thread: {:?}", e);
}

println!(
"DONE. Processed {} queries total.",
processed_queries.into_inner()
Expand Down
7 changes: 2 additions & 5 deletions src/fastmultigather_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,9 @@ pub fn fastmultigather_rocksdb(
println!("Loaded DB");

// grab scaled from the database.
let max_db_scaled = db
let (_, max_db_scaled) = db
.collection()
.manifest()
.iter()
.map(|r| r.scaled())
.max()
.min_max_scaled()
.expect("no records in db?!");

let selection_scaled: u32 = match selection.scaled() {
Expand Down
4 changes: 1 addition & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,6 @@ fn do_fastmultigather(
}
}
} else {
if output_path.is_some() {
bail!("output path specified, but not running fastmultigather against a rocksdb. See issue #239");
}
match fastmultigather::fastmultigather(
query_filenames,
siglist_path,
Expand All @@ -166,6 +163,7 @@ fn do_fastmultigather(
selection,
allow_failed_sigpaths,
save_matches,
output_path,
create_empty_results,
) {
Ok(_) => Ok(0),
Expand Down
19 changes: 9 additions & 10 deletions src/manysearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ use sourmash::signature::SigsTrait;
use sourmash::sketch::minhash::KmerMinHash;
use sourmash::storage::SigStore;

type AbundanceStats = (
Option<u64>,
Option<u64>,
Option<f64>,
Option<f64>,
Option<f64>,
);

pub fn manysearch(
query_filepath: String,
against_filepath: String,
Expand Down Expand Up @@ -174,16 +182,7 @@ pub fn manysearch(
fn inflate_abundances(
query: &KmerMinHash,
against: &KmerMinHash,
) -> Result<
(
Option<u64>,
Option<u64>,
Option<f64>,
Option<f64>,
Option<f64>,
),
SourmashError,
> {
) -> Result<AbundanceStats, SourmashError> {
let abunds: Vec<u64>;
let sum_weighted: u64;
let sum_all_abunds: u64 = against.sum_abunds();
Expand Down
7 changes: 2 additions & 5 deletions src/manysearch_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,9 @@ pub fn manysearch_rocksdb(
println!("Loaded DB");

// grab scaled from the database.
let max_db_scaled = db
let (_, max_db_scaled) = db
.collection()
.manifest()
.iter()
.map(|r| r.scaled())
.max()
.min_max_scaled()
.expect("no records in db?!");

let selection_scaled: u32 = match selection.scaled() {
Expand Down
16 changes: 9 additions & 7 deletions src/multisearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ use crate::utils::multicollection::SmallSignature;
use crate::utils::{csvwriter_thread, load_collection, MultiSearchResult, ReportType};
use sourmash::ani_utils::ani_from_containment;

type OverlapStatsReturn = (
f64,
HashMap<u64, f64>,
HashMap<u64, f64>,
HashMap<String, HashMap<u64, f64>>,
HashMap<u64, f64>,
);

#[derive(Default, Clone, Debug)]
struct ProbOverlapStats {
prob_overlap: f64,
Expand Down Expand Up @@ -71,13 +79,7 @@ fn compute_single_prob_overlap(
fn compute_prob_overlap_stats(
queries: &Vec<SmallSignature>,
againsts: &Vec<SmallSignature>,
) -> (
f64,
HashMap<u64, f64>,
HashMap<u64, f64>,
HashMap<String, HashMap<u64, f64>>,
HashMap<u64, f64>,
) {
) -> OverlapStatsReturn {
let n_comparisons = againsts.len() as f64 * queries.len() as f64;

// Combine all the queries and against into a single signature each
Expand Down
5 changes: 3 additions & 2 deletions src/python/sourmash_plugin_branchwater/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,14 @@ def __init__(self, p):
p.add_argument(
"-o",
"--output",
help="CSV output file for matches. Used for non-rocksdb searches only.",
help="CSV output file containing gather matches",
)
p.add_argument(
"--create-empty-results",
"--create-empty-prefetch-results",
action="store_true",
default=False,
help="create empty results file(s) even if no matches",
help="create empty prefetch results file for each query, even if no matches (non-RockSDB only)",
)
p.add_argument(
"--save-matches",
Expand Down
Loading

0 comments on commit 40880f0

Please sign in to comment.