diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index fb25e68a8..4d4eb1c17 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -49,6 +49,7 @@ fn run_server(config: Arc) -> Result<()> { &config.daemon_dir, &config.blocks_dir, config.daemon_rpc_addr, + config.daemon_parallelism, config.cookie_getter(), config.network_type, signal.clone(), @@ -81,14 +82,11 @@ fn run_server(config: Arc) -> Result<()> { &metrics, Arc::clone(&config), ))); - loop { - match Mempool::update(&mempool, &daemon) { - Ok(_) => break, - Err(e) => { - warn!("Error performing initial mempool update, trying again in 5 seconds: {}", e.display_chain()); - signal.wait(Duration::from_secs(5), false)?; - }, - } + + while !Mempool::update(&mempool, &daemon, &tip)? { + // Mempool syncing was aborted because the chain tip moved; + // Index the new block(s) and try again. + tip = indexer.update(&daemon)?; } #[cfg(feature = "liquid")] @@ -117,7 +115,6 @@ fn run_server(config: Arc) -> Result<()> { )); loop { - main_loop_count.inc(); if let Err(err) = signal.wait(Duration::from_secs(5), true) { @@ -130,14 +127,12 @@ fn run_server(config: Arc) -> Result<()> { // Index new blocks let current_tip = daemon.getbestblockhash()?; if current_tip != tip { - indexer.update(&daemon)?; - tip = current_tip; + tip = indexer.update(&daemon)?; }; // Update mempool - if let Err(e) = Mempool::update(&mempool, &daemon) { - // Log the error if the result is an Err - warn!("Error updating mempool, skipping mempool update: {}", e.display_chain()); + if !Mempool::update(&mempool, &daemon, &tip)? { + warn!("skipped failed mempool update, trying again in 5 seconds"); } // Update subscribed clients diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index afe980f8c..94a3821ab 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -33,6 +33,7 @@ fn main() { &config.daemon_dir, &config.blocks_dir, config.daemon_rpc_addr, + config.daemon_parallelism, config.cookie_getter(), config.network_type, signal, diff --git a/src/config.rs b/src/config.rs index 8696ecf8f..9cf07d89b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -25,6 +25,7 @@ pub struct Config { pub daemon_dir: PathBuf, pub blocks_dir: PathBuf, pub daemon_rpc_addr: SocketAddr, + pub daemon_parallelism: usize, pub cookie: Option, pub electrum_rpc_addr: SocketAddr, pub http_addr: SocketAddr, @@ -132,6 +133,12 @@ impl Config { .help("Bitcoin daemon JSONRPC 'addr:port' to connect (default: 127.0.0.1:8332 for mainnet, 127.0.0.1:18332 for testnet and 127.0.0.1:18443 for regtest)") .takes_value(true), ) + .arg( + Arg::with_name("daemon_parallelism") + .long("daemon-parallelism") + .help("Number of JSONRPC requests to send in parallel") + .default_value("4") + ) .arg( Arg::with_name("monitoring_addr") .long("monitoring-addr") @@ -386,6 +393,7 @@ impl Config { daemon_dir, blocks_dir, daemon_rpc_addr, + daemon_parallelism: value_t_or_exit!(m, "daemon_parallelism", usize), cookie, utxos_limit: value_t_or_exit!(m, "utxos_limit", usize), electrum_rpc_addr, diff --git a/src/daemon.rs b/src/daemon.rs index 457bf4230..c9c0a835f 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,3 +1,4 @@ +use std::cell::OnceCell; use std::collections::{HashMap, HashSet}; use std::env; use std::io::{BufRead, BufReader, Lines, Write}; @@ -8,8 +9,9 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use base64::prelude::{Engine, BASE64_STANDARD}; +use error_chain::ChainedError; use hex::FromHex; -use itertools::Itertools; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use serde_json::{from_str, from_value, Value}; #[cfg(not(feature = "liquid"))] @@ -79,13 +81,13 @@ fn parse_error_code(err: &Value) -> Option { fn parse_jsonrpc_reply(mut reply: Value, method: &str, expected_id: u64) -> Result { if let Some(reply_obj) = reply.as_object_mut() { - if let Some(err) = reply_obj.get("error") { + if let Some(err) = reply_obj.get_mut("error") { if !err.is_null() { if let Some(code) = parse_error_code(&err) { match code { // RPC_IN_WARMUP -> retry by later reconnection -28 => bail!(ErrorKind::Connection(err.to_string())), - _ => bail!("{} RPC error: {}", method, err), + code => bail!(ErrorKind::RpcError(code, err.take(), method.to_string())), } } } @@ -248,7 +250,7 @@ impl Connection { Ok(if status == "HTTP/1.1 200 OK" { contents } else if status == "HTTP/1.1 500 Internal Server Error" { - warn!("HTTP status: {}", status); + debug!("RPC HTTP 500 error: {}", contents); contents // the contents should have a JSONRPC error field } else { bail!( @@ -287,6 +289,8 @@ pub struct Daemon { message_id: Counter, // for monotonic JSONRPC 'id' signal: Waiter, + rpc_threads: Arc, + // monitoring latency: HistogramVec, size: HistogramVec, @@ -297,6 +301,7 @@ impl Daemon { daemon_dir: &PathBuf, blocks_dir: &PathBuf, daemon_rpc_addr: SocketAddr, + daemon_parallelism: usize, cookie_getter: Arc, network: Network, signal: Waiter, @@ -313,6 +318,13 @@ impl Daemon { )?), message_id: Counter::new(), signal: signal.clone(), + rpc_threads: Arc::new( + rayon::ThreadPoolBuilder::new() + .num_threads(daemon_parallelism) + .thread_name(|i| format!("rpc-requests-{}", i)) + .build() + .unwrap(), + ), latency: metrics.histogram_vec( HistogramOpts::new("daemon_rpc", "Bitcoind RPC latency (in seconds)"), &["method"], @@ -361,6 +373,7 @@ impl Daemon { conn: Mutex::new(self.conn.lock().unwrap().reconnect()?), message_id: Counter::new(), signal: self.signal.clone(), + rpc_threads: self.rpc_threads.clone(), latency: self.latency.clone(), size: self.size.clone(), }) @@ -398,33 +411,18 @@ impl Daemon { Ok(result) } - fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { + fn handle_request(&self, method: &str, params: &Value) -> Result { let id = self.message_id.next(); - let chunks = params_list - .iter() - .map(|params| json!({"method": method, "params": params, "id": id})) - .chunks(50_000); // Max Amount of batched requests - let mut results = vec![]; - for chunk in &chunks { - let reqs = chunk.collect(); - let mut replies = self.call_jsonrpc(method, &reqs)?; - if let Some(replies_vec) = replies.as_array_mut() { - for reply in replies_vec { - results.push(parse_jsonrpc_reply(reply.take(), method, id)?) - } - } else { - bail!("non-array replies: {:?}", replies); - } - } - - Ok(results) + let req = json!({"method": method, "params": params, "id": id}); + let reply = self.call_jsonrpc(method, &req)?; + parse_jsonrpc_reply(reply, method, id) } - fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { + fn retry_request(&self, method: &str, params: &Value) -> Result { loop { - match self.handle_request_batch(method, params_list) { - Err(Error(ErrorKind::Connection(msg), _)) => { - warn!("reconnecting to bitcoind: {}", msg); + match self.handle_request(method, ¶ms) { + Err(e @ Error(ErrorKind::Connection(_), _)) => { + warn!("reconnecting to bitcoind: {}", e.display_chain()); self.signal.wait(Duration::from_secs(3), false)?; let mut conn = self.conn.lock().unwrap(); *conn = conn.reconnect()?; @@ -436,13 +434,47 @@ impl Daemon { } fn request(&self, method: &str, params: Value) -> Result { - let mut values = self.retry_request_batch(method, &[params])?; - assert_eq!(values.len(), 1); - Ok(values.remove(0)) + self.retry_request(method, ¶ms) } - fn requests(&self, method: &str, params_list: &[Value]) -> Result> { - self.retry_request_batch(method, params_list) + fn retry_reconnect(&self) -> Daemon { + // XXX add a max reconnection attempts limit? + loop { + match self.reconnect() { + Ok(daemon) => break daemon, + Err(e) => { + warn!("failed connecting to RPC daemon: {}", e.display_chain()); + } + } + } + } + + // Send requests in parallel over multiple RPC connections as individual JSON-RPC requests (with no JSON-RPC batching), + // buffering the replies into a vector. If any of the requests fail, processing is terminated and an Err is returned. + fn requests(&self, method: &str, params_list: Vec) -> Result> { + self.requests_iter(method, params_list).collect() + } + + // Send requests in parallel over multiple RPC connections, iterating over the results without buffering them. + // Errors are included in the iterator and do not terminate other pending requests. + fn requests_iter<'a>( + &'a self, + method: &'a str, + params_list: Vec, + ) -> impl ParallelIterator> + IndexedParallelIterator + 'a { + self.rpc_threads.install(move || { + params_list.into_par_iter().map(move |params| { + // Store a local per-thread Daemon, each with its own TCP connection. These will + // get initialized as necessary for the `rpc_threads` pool thread managed by rayon. + thread_local!(static DAEMON_INSTANCE: OnceCell = OnceCell::new()); + + DAEMON_INSTANCE.with(|daemon| { + daemon + .get_or_init(|| self.retry_reconnect()) + .retry_request(&method, ¶ms) + }) + }) + }) } // bitcoind JSONRPC API: @@ -468,12 +500,12 @@ impl Daemon { pub fn getblockheaders(&self, heights: &[usize]) -> Result> { let heights: Vec = heights.iter().map(|height| json!([height])).collect(); let params_list: Vec = self - .requests("getblockhash", &heights)? + .requests("getblockhash", heights)? .into_iter() .map(|hash| json!([hash, /*verbose=*/ false])) .collect(); let mut result = vec![]; - for h in self.requests("getblockheader", ¶ms_list)? { + for h in self.requests("getblockheader", params_list)? { result.push(header_from_value(h)?); } Ok(result) @@ -495,7 +527,7 @@ impl Daemon { .iter() .map(|hash| json!([hash, /*verbose=*/ false])) .collect(); - let values = self.requests("getblock", ¶ms_list)?; + let values = self.requests("getblock", params_list)?; let mut blocks = vec![]; for value in values { blocks.push(block_from_value(value)?); @@ -503,19 +535,30 @@ impl Daemon { Ok(blocks) } - pub fn gettransactions(&self, txhashes: &[&Txid]) -> Result> { - let params_list: Vec = txhashes + /// Fetch the given transactions in parallel over multiple threads and RPC connections, + /// ignoring any missing ones and returning whatever is available. + pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result> { + const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5; + + let params_list: Vec = txids .iter() .map(|txhash| json!([txhash, /*verbose=*/ false])) .collect(); - let values = self.requests("getrawtransaction", ¶ms_list)?; - let mut txs = vec![]; - for value in values { - txs.push(tx_from_value(value)?); - } - assert_eq!(txhashes.len(), txs.len()); - Ok(txs) + self.requests_iter("getrawtransaction", params_list) + .zip(txids) + .filter_map(|(res, txid)| match res { + Ok(val) => Some(tx_from_value(val).map(|tx| (**txid, tx))), + // Ignore 'tx not found' errors + Err(Error(ErrorKind::RpcError(code, _, _), _)) + if code == RPC_INVALID_ADDRESS_OR_KEY => + { + None + } + // Terminate iteration if any other errors are encountered + Err(e) => Some(Err(e)), + }) + .collect() } pub fn gettransaction_raw( @@ -556,7 +599,7 @@ impl Daemon { let params_list: Vec = conf_targets.iter().map(|t| json!([t, "ECONOMICAL"])).collect(); Ok(self - .requests("estimatesmartfee", ¶ms_list)? + .requests("estimatesmartfee", params_list)? .iter() .zip(conf_targets) .filter_map(|(reply, target)| { diff --git a/src/errors.rs b/src/errors.rs index cec50ccef..c708d7dda 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -9,6 +9,11 @@ error_chain! { display("Connection error: {}", msg) } + RpcError(code: i64, error: serde_json::Value, method: String) { + description("RPC error") + display("{} RPC error {}: {}", method, code, error) + } + Interrupt(sig: i32) { description("Interruption by external signal") display("Iterrupted by signal {}", sig) diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 94ba7a41d..4ccd4cd59 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -11,7 +11,7 @@ use std::iter::FromIterator; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; -use crate::chain::{deserialize, Network, OutPoint, Transaction, TxOut, Txid}; +use crate::chain::{deserialize, BlockHash, Network, OutPoint, Transaction, TxOut, Txid}; use crate::config::Config; use crate::daemon::Daemon; use crate::errors::*; @@ -21,7 +21,7 @@ use crate::new_index::{ SpendingInfo, SpendingInput, TxHistoryInfo, Utxo, }; use crate::util::fees::{make_fee_histogram, TxFeeInfo}; -use crate::util::{extract_tx_prevouts, full_hash, has_prevout, is_spendable, Bytes}; +use crate::util::{extract_tx_prevouts, full_hash, get_prev_outpoints, is_spendable, Bytes}; #[cfg(feature = "liquid")] use crate::elements::asset; @@ -276,7 +276,7 @@ impl Mempool { &self.backlog_stats.0 } - pub fn old_txids(&self) -> HashSet { + pub fn txids_set(&self) -> HashSet { return HashSet::from_iter(self.txstore.keys().cloned()); } @@ -288,40 +288,55 @@ impl Mempool { self.backlog_stats = (BacklogStats::new(&self.feeinfo), Instant::now()); } - pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) { - if self.txstore.get(txid).is_none() { + pub fn add_by_txid(&mut self, daemon: &Daemon, txid: Txid) -> Result<()> { + if self.txstore.get(&txid).is_none() { if let Ok(tx) = daemon.getmempooltx(&txid) { - self.add(vec![tx]) + let mut txs_map = HashMap::new(); + txs_map.insert(txid, tx); + self.add(txs_map) + } else { + bail!("add_by_txid cannot find {}", txid); } + } else { + Ok(()) } } - fn add(&mut self, txs: Vec) { + fn add(&mut self, txs_map: HashMap) -> Result<()> { self.delta .with_label_values(&["add"]) - .observe(txs.len() as f64); + .observe(txs_map.len() as f64); let _timer = self.latency.with_label_values(&["add"]).start_timer(); - let mut txids = vec![]; - // Phase 1: add to txstore - for tx in txs { - let txid = tx.txid(); - txids.push(txid); + let spent_prevouts = get_prev_outpoints(txs_map.values()); + + // Lookup spent prevouts that were funded within the same `add` batch + let mut txos = HashMap::new(); + let remain_prevouts = spent_prevouts + .into_iter() + .filter(|prevout| { + if let Some(prevtx) = txs_map.get(&prevout.txid) { + if let Some(out) = prevtx.output.get(prevout.vout as usize) { + txos.insert(prevout.clone(), out.clone()); + // remove from the list of remaining `prevouts` + return false; + } + } + true + }) + .collect(); + + // Lookup remaining spent prevouts in mempool & on-chain + // Fails if any are missing. + txos.extend(self.lookup_txos(remain_prevouts)?); + + // Add to txstore and indexes + for (txid, tx) in txs_map { self.txstore.insert(txid, tx); - } - // Phase 2: index history and spend edges (can fail if some txos cannot be found) - let txos = match self.lookup_txos(self.get_prevouts(&txids)) { - Ok(txos) => txos, - Err(err) => { - warn!("lookup txouts failed: {}", err); - // TODO: should we remove txids from txstore? - return; - } - }; - for txid in txids { - let tx = self.txstore.get(&txid).expect("missing mempool tx"); - let txid_bytes = full_hash(&txid[..]); + let tx = self.txstore.get(&txid).expect("was just added"); + let prevouts = extract_tx_prevouts(&tx, &txos, false); + let txid_bytes = full_hash(&txid[..]); // Get feeinfo for caching and recent tx overview let feeinfo = TxFeeInfo::new(&tx, &prevouts, self.config.network_type); @@ -395,6 +410,8 @@ impl Mempool { &mut self.asset_issuance, ); } + + Ok(()) } fn lookup_txo(&self, outpoint: &OutPoint) -> Option { @@ -423,24 +440,6 @@ impl Mempool { Ok(txos) } - fn get_prevouts(&self, txids: &[Txid]) -> BTreeSet { - let _timer = self - .latency - .with_label_values(&["get_prevouts"]) - .start_timer(); - - txids - .iter() - .map(|txid| self.txstore.get(txid).expect("missing mempool tx")) - .flat_map(|tx| { - tx.input - .iter() - .filter(|txin| has_prevout(txin)) - .map(|txin| txin.previous_output) - }) - .collect() - } - fn remove(&mut self, to_remove: HashSet<&Txid>) { self.delta .with_label_values(&["remove"]) @@ -486,31 +485,74 @@ impl Mempool { .map_or_else(|| vec![], |entries| self._history(entries, limit)) } - pub fn update(mempool: &Arc>, daemon: &Daemon) -> Result<()> { + /// Sync our local view of the mempool with the bitcoind Daemon RPC. If the chain tip moves before + /// the mempool is fetched in full, syncing is aborted and an Ok(false) is returned. + pub fn update( + mempool: &Arc>, + daemon: &Daemon, + tip: &BlockHash, + ) -> Result { let _timer = mempool.read().unwrap().latency.with_label_values(&["update"]).start_timer(); - // 1. Determine which transactions are no longer in the daemon's mempool and which ones have newly entered it - let old_txids = mempool.read().unwrap().old_txids(); - let all_txids = daemon - .getmempooltxids() - .chain_err(|| "failed to update mempool from daemon")?; - let txids_to_remove: HashSet<&Txid> = old_txids.difference(&all_txids).collect(); + // Continuously attempt to fetch mempool transactions until we're able to get them in full + let mut fetched_txs = HashMap::::new(); + let mut indexed_txids = mempool.read().unwrap().txids_set(); + loop { + // Get bitcoind's current list of mempool txids + let all_txids = daemon + .getmempooltxids() + .chain_err(|| "failed to update mempool from daemon")?; + + // Remove evicted mempool transactions + mempool + .write() + .unwrap() + .remove(indexed_txids.difference(&all_txids).collect()); + + indexed_txids.retain(|txid| all_txids.contains(txid)); + fetched_txs.retain(|txid, _| all_txids.contains(txid)); + + // Fetch missing transactions from bitcoind + let new_txids = all_txids + .iter() + .filter(|&txid| !fetched_txs.contains_key(txid) && !indexed_txids.contains(txid)) + .collect::>(); + if new_txids.is_empty() { + break; + } + debug!( + "mempool with total {} txs, {} fetched, {} missing", + all_txids.len(), + indexed_txids.len() + fetched_txs.len(), + new_txids.len() + ); + let new_txs = daemon.gettransactions_available(&new_txids)?; - // 2. Remove missing transactions. Even if we are unable to download new transactions from - // the daemon, we still want to remove the transactions that are no longer in the mempool. - mempool.write().unwrap().remove(txids_to_remove); + // Abort if the chain tip moved while fetching transactions + if daemon.getbestblockhash()? != *tip { + warn!("chain tip moved while updating mempool"); + return Ok(false); + } - // 3. Download the new transactions from the daemon's mempool - let new_txids: Vec<&Txid> = all_txids.difference(&old_txids).collect(); - let txs_to_add = daemon - .gettransactions(&new_txids) - .chain_err(|| format!("failed to get {} transactions", new_txids.len()))?; + let fetched_count = new_txs.len(); + fetched_txs.extend(new_txs); + + // Retry if any transactions were evicted form the mempool before we managed to get them + if fetched_count != new_txids.len() { + warn!( + "failed to fetch {} mempool txs, retrying...", + new_txids.len() - fetched_count + ); + } else { + break; + } + } - // 4. Update local mempool to match daemon's state + // Add fetched transactions to our view of the mempool { let mut mempool = mempool.write().unwrap(); - // Add new transactions - mempool.add(txs_to_add); + + mempool.add(fetched_txs)?; mempool .count @@ -523,7 +565,9 @@ impl Mempool { } } - Ok(()) + trace!("mempool is synced"); + + Ok(true) } } diff --git a/src/new_index/query.rs b/src/new_index/query.rs index 60d7510ab..26d983734 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -71,10 +71,11 @@ impl Query { pub fn broadcast_raw(&self, txhex: &str) -> Result { let txid = self.daemon.broadcast_raw(txhex)?; - self.mempool + let _ = self + .mempool .write() .unwrap() - .add_by_txid(&self.daemon, &txid); + .add_by_txid(&self.daemon, txid); Ok(txid) } diff --git a/src/util/mod.rs b/src/util/mod.rs index 3b03d41ce..0a9701b8e 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -12,8 +12,8 @@ pub use self::block::{ pub use self::fees::get_tx_fee; pub use self::script::{get_innerscripts, ScriptToAddr, ScriptToAsm}; pub use self::transaction::{ - extract_tx_prevouts, has_prevout, is_coinbase, is_spendable, serialize_outpoint, - TransactionStatus, TxInput, + extract_tx_prevouts, get_prev_outpoints, has_prevout, is_coinbase, is_spendable, + serialize_outpoint, TransactionStatus, TxInput, }; use std::collections::HashMap; diff --git a/src/util/transaction.rs b/src/util/transaction.rs index dda5e4f7c..4daa8547b 100644 --- a/src/util/transaction.rs +++ b/src/util/transaction.rs @@ -1,7 +1,7 @@ use crate::chain::{BlockHash, OutPoint, Transaction, TxIn, TxOut, Txid}; use crate::util::BlockId; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; #[cfg(feature = "liquid")] lazy_static! { @@ -96,6 +96,16 @@ pub fn extract_tx_prevouts<'a>( .collect() } +pub fn get_prev_outpoints<'a>(txs: impl Iterator) -> BTreeSet { + txs.flat_map(|tx| { + tx.input + .iter() + .filter(|txin| has_prevout(txin)) + .map(|txin| txin.previous_output) + }) + .collect() +} + pub fn serialize_outpoint(outpoint: &OutPoint, serializer: S) -> Result where S: serde::ser::Serializer, diff --git a/tests/common.rs b/tests/common.rs index c85ebf7d9..1c06c9ed7 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -89,6 +89,7 @@ impl TestRunner { network_type, db_path: electrsdb.path().to_path_buf(), daemon_dir: daemon_subdir.clone(), + daemon_parallelism: 3, blocks_dir: daemon_subdir.join("blocks"), daemon_rpc_addr: params.rpc_socket.into(), cookie: None, @@ -127,6 +128,7 @@ impl TestRunner { &config.daemon_dir, &config.blocks_dir, config.daemon_rpc_addr, + config.daemon_parallelism, config.cookie_getter(), config.network_type, signal.clone(), @@ -147,7 +149,7 @@ impl TestRunner { }; let mut indexer = Indexer::open(Arc::clone(&store), fetch_from, &config, &metrics); - indexer.update(&daemon)?; + let tip = indexer.update(&daemon)?; indexer.fetch_from(FetchFrom::Bitcoind); let chain = Arc::new(ChainQuery::new( @@ -162,7 +164,7 @@ impl TestRunner { &metrics, Arc::clone(&config), ))); - Mempool::update(&mempool, &daemon)?; + assert!(Mempool::update(&mempool, &daemon, &tip)?); let query = Arc::new(Query::new( Arc::clone(&chain), @@ -193,8 +195,8 @@ impl TestRunner { } pub fn sync(&mut self) -> Result<()> { - self.indexer.update(&self.daemon)?; - Mempool::update(&self.mempool, &self.daemon)?; + let tip = self.indexer.update(&self.daemon)?; + assert!(Mempool::update(&self.mempool, &self.daemon, &tip)?); // force an update for the mempool stats, which are normally cached self.mempool.write().unwrap().update_backlog_stats(); Ok(())