diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index fb25e68a8..ee92d7e0d 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -49,6 +49,7 @@ fn run_server(config: Arc) -> Result<()> { &config.daemon_dir, &config.blocks_dir, config.daemon_rpc_addr, + config.daemon_parallelism, config.cookie_getter(), config.network_type, signal.clone(), diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 8e4c35e30..3dd9de999 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -33,6 +33,7 @@ fn main() { &config.daemon_dir, &config.blocks_dir, config.daemon_rpc_addr, + config.daemon_parallelism, config.cookie_getter(), config.network_type, signal, diff --git a/src/config.rs b/src/config.rs index 8696ecf8f..e740e99e5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -25,6 +25,7 @@ pub struct Config { pub daemon_dir: PathBuf, pub blocks_dir: PathBuf, pub daemon_rpc_addr: SocketAddr, + pub daemon_parallelism: usize, pub cookie: Option, pub electrum_rpc_addr: SocketAddr, pub http_addr: SocketAddr, @@ -132,6 +133,12 @@ impl Config { .help("Bitcoin daemon JSONRPC 'addr:port' to connect (default: 127.0.0.1:8332 for mainnet, 127.0.0.1:18332 for testnet and 127.0.0.1:18443 for regtest)") .takes_value(true), ) + .arg( + Arg::with_name("daemon_parallelism") + .long("daemon-parallelism") + .help("Number of JSONRPC requests to send in parallel") + .default_value("8") + ) .arg( Arg::with_name("monitoring_addr") .long("monitoring-addr") @@ -386,6 +393,7 @@ impl Config { daemon_dir, blocks_dir, daemon_rpc_addr, + daemon_parallelism: value_t_or_exit!(m, "daemon_parallelism", usize), cookie, utxos_limit: value_t_or_exit!(m, "utxos_limit", usize), electrum_rpc_addr, diff --git a/src/daemon.rs b/src/daemon.rs index 90e38a931..efd990887 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -8,7 +8,7 @@ use std::time::Duration; use base64::prelude::{Engine, BASE64_STANDARD}; use hex::FromHex; -use itertools::Itertools; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use serde_json::{from_str, from_value, Value}; #[cfg(not(feature = "liquid"))] @@ -260,6 +260,7 @@ impl Counter { pub struct Daemon { daemon_dir: PathBuf, + daemon_parallelism: usize, blocks_dir: PathBuf, network: Network, conn: Mutex, @@ -276,6 +277,7 @@ impl Daemon { daemon_dir: &PathBuf, blocks_dir: &PathBuf, daemon_rpc_addr: SocketAddr, + daemon_parallelism: usize, cookie_getter: Arc, network: Network, signal: Waiter, @@ -283,6 +285,7 @@ impl Daemon { ) -> Result { let daemon = Daemon { daemon_dir: daemon_dir.clone(), + daemon_parallelism, blocks_dir: blocks_dir.clone(), network, conn: Mutex::new(Connection::new( @@ -335,6 +338,7 @@ impl Daemon { pub fn reconnect(&self) -> Result { Ok(Daemon { daemon_dir: self.daemon_dir.clone(), + daemon_parallelism: self.daemon_parallelism, blocks_dir: self.blocks_dir.clone(), network: self.network, conn: Mutex::new(self.conn.lock().unwrap().reconnect()?), @@ -377,31 +381,16 @@ impl Daemon { Ok(result) } - fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { + fn handle_request(&self, method: &str, params: &Value) -> Result { let id = self.message_id.next(); - let chunks = params_list - .iter() - .map(|params| json!({"method": method, "params": params, "id": id})) - .chunks(50_000); // Max Amount of batched requests - let mut results = vec![]; - for chunk in &chunks { - let reqs = chunk.collect(); - let mut replies = self.call_jsonrpc(method, &reqs)?; - if let Some(replies_vec) = replies.as_array_mut() { - for reply in replies_vec { - results.push(parse_jsonrpc_reply(reply.take(), method, id)?) - } - } else { - bail!("non-array replies: {:?}", replies); - } - } - - Ok(results) + let req = json!({"method": method, "params": params, "id": id}); + let reply = self.call_jsonrpc(method, &req)?; + parse_jsonrpc_reply(reply, method, id) } - fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { + fn retry_request(&self, method: &str, params: &Value) -> Result { loop { - match self.handle_request_batch(method, params_list) { + match self.handle_request(method, ¶ms) { Err(Error(ErrorKind::Connection(msg), _)) => { warn!("reconnecting to bitcoind: {}", msg); self.signal.wait(Duration::from_secs(3), false)?; @@ -415,13 +404,23 @@ impl Daemon { } fn request(&self, method: &str, params: Value) -> Result { - let mut values = self.retry_request_batch(method, &[params])?; - assert_eq!(values.len(), 1); - Ok(values.remove(0)) + self.retry_request(method, ¶ms) } fn requests(&self, method: &str, params_list: &[Value]) -> Result> { - self.retry_request_batch(method, params_list) + // Send in parallel as individual JSONRPC requests, with no batching. + // See https://github.com/Blockstream/electrs/pull/33 + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(self.daemon_parallelism) + .thread_name(|i| format!("rpc-requests-{}", i)) + .build() + .unwrap(); + pool.install(|| { + params_list + .par_iter() + .map(|params| self.retry_request(method, params)) + .collect() + }) } // bitcoind JSONRPC API: diff --git a/tests/common.rs b/tests/common.rs index 78932aa50..6aa4af19d 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -89,6 +89,7 @@ impl TestRunner { network_type, db_path: electrsdb.path().to_path_buf(), daemon_dir: daemon_subdir.clone(), + daemon_parallelism: 3, blocks_dir: daemon_subdir.join("blocks"), daemon_rpc_addr: params.rpc_socket.into(), cookie: None, @@ -127,6 +128,7 @@ impl TestRunner { &config.daemon_dir, &config.blocks_dir, config.daemon_rpc_addr, + config.daemon_parallelism, config.cookie_getter(), config.network_type, signal.clone(),