diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 8e4c35e30..afe980f8c 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -83,12 +83,12 @@ fn main() { //info!("{:?},{:?}", txid, blockid); let prevouts = chain.lookup_txos( - &tx.input + tx.input .iter() .filter(|txin| has_prevout(txin)) .map(|txin| txin.previous_output) .collect(), - ); + ).unwrap(); let total_out: u64 = tx.output.iter().map(|out| out.value.to_sat()).sum(); let small_out = tx diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 8d895050d..f68da233c 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -196,6 +196,14 @@ impl DB { self.db.get(key).unwrap().map(|v| v.to_vec()) } + pub fn multi_get(&self, keys: I) -> Vec>, rocksdb::Error>> + where + K: AsRef<[u8]>, + I: IntoIterator, + { + self.db.multi_get(keys) + } + fn verify_compatibility(&self, config: &Config) { let mut compatibility_bytes = bincode::serialize_little(&DB_VERSION).unwrap(); diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 179829fd2..94ba7a41d 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -1,5 +1,5 @@ use arraydeque::{ArrayDeque, Wrapping}; -use itertools::Itertools; +use itertools::{Either, Itertools}; #[cfg(not(feature = "liquid"))] use bitcoin::consensus::encode::serialize; @@ -310,7 +310,7 @@ impl Mempool { self.txstore.insert(txid, tx); } // Phase 2: index history and spend edges (can fail if some txos cannot be found) - let txos = match self.lookup_txos(&self.get_prevouts(&txids)) { + let txos = match self.lookup_txos(self.get_prevouts(&txids)) { Ok(txos) => txos, Err(err) => { warn!("lookup txouts failed: {}", err); @@ -397,34 +397,29 @@ impl Mempool { } } - pub fn lookup_txo(&self, outpoint: &OutPoint) -> Result { - let mut outpoints = BTreeSet::new(); - outpoints.insert(*outpoint); - Ok(self.lookup_txos(&outpoints)?.remove(outpoint).unwrap()) + fn lookup_txo(&self, outpoint: &OutPoint) -> Option { + self.txstore + .get(&outpoint.txid) + .and_then(|tx| tx.output.get(outpoint.vout as usize).cloned()) } - pub fn lookup_txos(&self, outpoints: &BTreeSet) -> Result> { + pub fn lookup_txos(&self, outpoints: BTreeSet) -> Result> { let _timer = self .latency .with_label_values(&["lookup_txos"]) .start_timer(); - let confirmed_txos = self.chain.lookup_avail_txos(outpoints); + // Get the txos available in the mempool, skipping over (and collecting) missing ones + let (mut txos, remain_outpoints): (HashMap<_, _>, _) = outpoints + .into_iter() + .partition_map(|outpoint| match self.lookup_txo(&outpoint) { + Some(txout) => Either::Left((outpoint, txout)), + None => Either::Right(outpoint), + }); - let mempool_txos = outpoints - .iter() - .filter(|outpoint| !confirmed_txos.contains_key(outpoint)) - .map(|outpoint| { - self.txstore - .get(&outpoint.txid) - .and_then(|tx| tx.output.get(outpoint.vout as usize).cloned()) - .map(|txout| (*outpoint, txout)) - .chain_err(|| format!("missing outpoint {:?}", outpoint)) - }) - .collect::>>()?; + // Get the remaining txos from the chain (fails if any are missing) + txos.extend(self.chain.lookup_txos(remain_outpoints)?); - let mut txos = confirmed_txos; - txos.extend(mempool_txos); Ok(txos) } diff --git a/src/new_index/query.rs b/src/new_index/query.rs index 1e621ac0d..60d7510ab 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -118,7 +118,7 @@ impl Query { .or_else(|| self.mempool().lookup_raw_txn(txid)) } - pub fn lookup_txos(&self, outpoints: &BTreeSet) -> HashMap { + pub fn lookup_txos(&self, outpoints: BTreeSet) -> HashMap { // the mempool lookup_txos() internally looks up confirmed txos as well self.mempool() .lookup_txos(outpoints) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index d5eba9a51..bbb2e2946 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -327,7 +327,7 @@ impl Indexer { fn index(&self, blocks: &[BlockEntry]) { let previous_txos_map = { let _timer = self.start_timer("index_lookup"); - lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false) + lookup_txos(&self.store.txstore_db, get_previous_txos(blocks)).unwrap() }; let rows = { let _timer = self.start_timer("index_process"); @@ -859,14 +859,9 @@ impl ChainQuery { lookup_txo(&self.store.txstore_db, outpoint) } - pub fn lookup_txos(&self, outpoints: &BTreeSet) -> HashMap { + pub fn lookup_txos(&self, outpoints: BTreeSet) -> Result> { let _timer = self.start_timer("lookup_txos"); - lookup_txos(&self.store.txstore_db, outpoints, false) - } - - pub fn lookup_avail_txos(&self, outpoints: &BTreeSet) -> HashMap { - let _timer = self.start_timer("lookup_available_txos"); - lookup_txos(&self.store.txstore_db, outpoints, true) + lookup_txos(&self.store.txstore_db, outpoints) } pub fn lookup_spend(&self, outpoint: &OutPoint) -> Option { @@ -1033,31 +1028,19 @@ fn get_previous_txos(block_entries: &[BlockEntry]) -> BTreeSet { .collect() } -fn lookup_txos( - txstore_db: &DB, - outpoints: &BTreeSet, - allow_missing: bool, -) -> HashMap { - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(16) // we need to saturate SSD IOPS - .thread_name(|i| format!("lookup-txo-{}", i)) - .build() - .unwrap(); - pool.install(|| { - outpoints - .par_iter() - .filter_map(|outpoint| { - lookup_txo(&txstore_db, &outpoint) - .or_else(|| { - if !allow_missing { - panic!("missing txo {} in {:?}", outpoint, txstore_db); - } - None - }) - .map(|txo| (*outpoint, txo)) - }) - .collect() - }) +fn lookup_txos(txstore_db: &DB, outpoints: BTreeSet) -> Result> { + let keys = outpoints.iter().map(TxOutRow::key).collect::>(); + txstore_db + .multi_get(keys) + .into_iter() + .zip(outpoints) + .map(|(res, outpoint)| { + let txo = res + .unwrap() + .ok_or_else(|| format!("missing txo {}", outpoint))?; + Ok((outpoint, deserialize(&txo).expect("failed to parse TxOut"))) + }) + .collect() } fn lookup_txo(txstore_db: &DB, outpoint: &OutPoint) -> Option { diff --git a/src/rest.rs b/src/rest.rs index 336c43f4a..43eab5c60 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -468,7 +468,7 @@ fn prepare_txs( }) .collect(); - let prevouts = query.lookup_txos(&outpoints); + let prevouts = query.lookup_txos(outpoints); txs.into_iter() .map(|(tx, blockid)| TransactionValue::new(tx, blockid, &prevouts, config))