Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MRG: refactor fastgather/fastmultigather CSV output to use mpsc channels #567

Merged
merged 29 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 14 additions & 20 deletions doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ sourmash scripts fastgather query.sig.gz database.zip -o results.csv --cores 4

### Running `fastmultigather`

`fastmultigather` takes a collection of query metagenomes and a collection of sketches as a database, and outputs many CSVs:
`fastmultigather` takes a collection of query metagenomes and a collection of sketches as a database, and outputs a CSV file containing all the matches.
```
sourmash scripts fastmultigather queries.manifest.csv database.zip --cores 4 --save-matches
sourmash scripts fastmultigather queries.manifest.csv database.zip --cores 4 --save-matches -o results.csv
```

We suggest using standalone manifest CSVs wherever possible, especially if
Expand All @@ -327,32 +327,26 @@ this can be a significant time savings for large databases.

#### Output files for `fastmultigather`

On a database of sketches (but not on RocksDB indexes)
`fastmultigather` will output two CSV files for each query, a
`prefetch` file containing all overlapping matches between that query
and the database, and a `gather` file containing the minimum
metagenome cover for that query in the database.
`fastmultigather` will output a gather file containing all results in
one file, specified with `-o/--output`. `fastmultigather` gather CSVs
provide the same columns as `fastgather`, above.

The prefetch CSV will be named `{signame}.prefetch.csv`, and the
gather CSV will be named `{signame}.gather.csv`. Here, `{signame}` is
the name of your sourmash signature.
In addition, on a database of sketches (but not on RocksDB indexes)
`fastmultigather` will output a `prefetch` file containing all
overlapping matches between that query and the database. The prefetch
CSV will be named `{signame}.prefetch.csv`, where `{signame}` is the
name of your sourmash signature.

`--save-matches` is an optional flag that will save the matched hashes
for each query in a separate sourmash signature
`{signame}.matches.sig`. This can be useful for debugging or for
further analysis.

When searching against a RocksDB index, `fastmultigather` will output
a single file containing all gather results, specified with
`-o/--output`. No prefetch results will be output.

`fastmultigather` gather CSVs provide the same columns as `fastgather`, above.

**Warning:** At the moment, if two different queries have the same
`{signame}`, the CSVs for one of the queries will be overwritten by
the other query. The behavior here is undefined in practice, because
of multithreading: we don't know what queries will be executed when
or files will be written first.
`{signame}`, the output files for one query will be overwritten by
the results from the other query. The behavior here is undefined in
practice, because of multithreading: we don't know what queries will
be executed when or files will be written first.

### Running `manysearch`

Expand Down
16 changes: 13 additions & 3 deletions src/fastgather.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
/// fastgather: Run gather with a query against a list of files.
use anyhow::Result;

use sourmash::prelude::Select;
use sourmash::selection::Selection;
use sourmash::sketch::minhash::KmerMinHash;

use crate::utils::{
consume_query_by_gather, load_collection, load_sketches_above_threshold, write_prefetch,
ReportType,
consume_query_by_gather, csvwriter_thread, load_collection, load_sketches_above_threshold,
write_prefetch, BranchwaterGatherResult, ReportType,
};

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -109,6 +110,10 @@ pub fn fastgather(
.ok();
}

let (send, recv) =
std::sync::mpsc::sync_channel::<BranchwaterGatherResult>(rayon::current_num_threads());
let gather_out_thrd = csvwriter_thread(recv, gather_output);

// run the gather!
consume_query_by_gather(
query_name,
Expand All @@ -117,8 +122,13 @@ pub fn fastgather(
scaled as u32,
matchlist,
threshold_hashes,
gather_output,
Some(send),
)
.ok();

if let Err(e) = gather_out_thrd.join() {
eprintln!("Unable to join internal thread: {:?}", e);
}

Ok(())
}
38 changes: 18 additions & 20 deletions src/fastmultigather.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/// fastmultigather: Run gather for multiple queries against a list of files.
use anyhow::Result;
use rayon::prelude::*;
use rayon::iter::ParallelIterator;

use sourmash::prelude::{Storage, ToWriter};
use sourmash::{selection::Selection, signature::SigsTrait};
Expand All @@ -22,7 +22,8 @@ use sourmash::sketch::minhash::KmerMinHash;
use sourmash::sketch::Sketch;

use crate::utils::{
consume_query_by_gather, load_collection, write_prefetch, PrefetchResult, ReportType,
consume_query_by_gather, csvwriter_thread, load_collection, write_prefetch,
BranchwaterGatherResult, PrefetchResult, ReportType,
};

#[allow(clippy::too_many_arguments)]
Expand All @@ -34,6 +35,7 @@ pub fn fastmultigather(
selection: Selection,
allow_failed_sigpaths: bool,
save_matches: bool,
output_path: Option<String>,
create_empty_results: bool,
) -> Result<()> {
let _ = env_logger::try_init();
Expand Down Expand Up @@ -82,6 +84,13 @@ pub fn fastmultigather(
// load against sketches into memory
let against = against_collection.load_sketches()?;

// set up a multi-producer, single-consumer channel.
let (send, recv) =
std::sync::mpsc::sync_channel::<BranchwaterGatherResult>(rayon::current_num_threads());

// spawn a thread that is dedicated to printing to a buffered output
let gather_out_thrd = csvwriter_thread(recv, output_path);

// Iterate over all queries => do prefetch and gather!
let processed_queries = AtomicUsize::new(0);
let skipped_paths = AtomicUsize::new(0);
Expand Down Expand Up @@ -144,9 +153,8 @@ pub fn fastmultigather(
})
.collect();

if !matchlist.is_empty() {
if !matchlist.is_empty() || create_empty_results {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);

// Save initial list of matches to prefetch output
write_prefetch(
Expand All @@ -166,7 +174,7 @@ pub fn fastmultigather(
common_scaled,
matchlist,
threshold_hashes,
Some(gather_output),
Some(send.clone()),
)
.ok();

Expand Down Expand Up @@ -200,21 +208,6 @@ pub fn fastmultigather(
}
} else {
println!("No matches to '{}'", location);
if create_empty_results {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);
// touch output files
match std::fs::File::create(&prefetch_output) {
Ok(_) => {}
Err(e) => {
eprintln!("Failed to create empty prefetch output: {}", e)
}
}
match std::fs::File::create(&gather_output) {
Ok(_) => {}
Err(e) => eprintln!("Failed to create empty gather output: {}", e),
}
}
}
}
Err(_) => {
Expand All @@ -228,6 +221,11 @@ pub fn fastmultigather(
}
});

drop(send);
if let Err(e) = gather_out_thrd.join() {
eprintln!("Unable to join internal thread: {:?}", e);
}

println!(
"DONE. Processed {} queries total.",
processed_queries.into_inner()
Expand Down
7 changes: 2 additions & 5 deletions src/fastmultigather_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,9 @@ pub fn fastmultigather_rocksdb(
println!("Loaded DB");

// grab scaled from the database.
let max_db_scaled = db
let (_, max_db_scaled) = db
.collection()
.manifest()
.iter()
.map(|r| r.scaled())
.max()
.min_max_scaled()
.expect("no records in db?!");

let selection_scaled: u32 = match selection.scaled() {
Expand Down
4 changes: 1 addition & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,6 @@ fn do_fastmultigather(
}
}
} else {
if output_path.is_some() {
bail!("output path specified, but not running fastmultigather against a rocksdb. See issue #239");
}
match fastmultigather::fastmultigather(
query_filenames,
siglist_path,
Expand All @@ -166,6 +163,7 @@ fn do_fastmultigather(
selection,
allow_failed_sigpaths,
save_matches,
output_path,
create_empty_results,
) {
Ok(_) => Ok(0),
Expand Down
19 changes: 9 additions & 10 deletions src/manysearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ use sourmash::signature::SigsTrait;
use sourmash::sketch::minhash::KmerMinHash;
use sourmash::storage::SigStore;

type AbundanceStats = (
Option<u64>,
Option<u64>,
Option<f64>,
Option<f64>,
Option<f64>,
);

pub fn manysearch(
query_filepath: String,
against_filepath: String,
Expand Down Expand Up @@ -174,16 +182,7 @@ pub fn manysearch(
fn inflate_abundances(
query: &KmerMinHash,
against: &KmerMinHash,
) -> Result<
(
Option<u64>,
Option<u64>,
Option<f64>,
Option<f64>,
Option<f64>,
),
SourmashError,
> {
) -> Result<AbundanceStats, SourmashError> {
let abunds: Vec<u64>;
let sum_weighted: u64;
let sum_all_abunds: u64 = against.sum_abunds();
Expand Down
7 changes: 2 additions & 5 deletions src/manysearch_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,9 @@ pub fn manysearch_rocksdb(
println!("Loaded DB");

// grab scaled from the database.
let max_db_scaled = db
let (_, max_db_scaled) = db
.collection()
.manifest()
.iter()
.map(|r| r.scaled())
.max()
.min_max_scaled()
.expect("no records in db?!");

let selection_scaled: u32 = match selection.scaled() {
Expand Down
16 changes: 9 additions & 7 deletions src/multisearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ use crate::utils::multicollection::SmallSignature;
use crate::utils::{csvwriter_thread, load_collection, MultiSearchResult, ReportType};
use sourmash::ani_utils::ani_from_containment;

type OverlapStatsReturn = (
f64,
HashMap<u64, f64>,
HashMap<u64, f64>,
HashMap<String, HashMap<u64, f64>>,
HashMap<u64, f64>,
);

#[derive(Default, Clone, Debug)]
struct ProbOverlapStats {
prob_overlap: f64,
Expand Down Expand Up @@ -71,13 +79,7 @@ fn compute_single_prob_overlap(
fn compute_prob_overlap_stats(
queries: &Vec<SmallSignature>,
againsts: &Vec<SmallSignature>,
) -> (
f64,
HashMap<u64, f64>,
HashMap<u64, f64>,
HashMap<String, HashMap<u64, f64>>,
HashMap<u64, f64>,
) {
) -> OverlapStatsReturn {
let n_comparisons = againsts.len() as f64 * queries.len() as f64;

// Combine all the queries and against into a single signature each
Expand Down
5 changes: 3 additions & 2 deletions src/python/sourmash_plugin_branchwater/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,14 @@ def __init__(self, p):
p.add_argument(
"-o",
"--output",
help="CSV output file for matches. Used for non-rocksdb searches only.",
help="CSV output file containing gather matches",
)
p.add_argument(
"--create-empty-results",
ctb marked this conversation as resolved.
Show resolved Hide resolved
"--create-empty-prefetch-results",
action="store_true",
default=False,
help="create empty results file(s) even if no matches",
help="create empty prefetch results file for each query, even if no matches (non-RockSDB only)",
)
p.add_argument(
"--save-matches",
Expand Down
Loading
Loading