From ee8bcfdd9c9948888fe12d3e6a593c548a26092c Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Tue, 9 Mar 2021 02:45:44 +0200 Subject: [PATCH] Don't use RPC batching with bitcoind This actually hurts performance because the batched response has to be bueffered, as @TheBlueMatt explains at https://github.com/romanz/electrs/issues/373#issuecomment-785533444 Instead, send multiple RPC requests in parallel using a thread pool. Also see https://github.com/romanz/electrs/pull/374 --- src/daemon.rs | 32 ++++++++++++-------------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 529369954..3f1835ffe 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -11,6 +11,7 @@ use bitcoin::util::hash::BitcoinHash; use bitcoin::{BlockHash, Txid}; use glob; use hex; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use serde_json::{from_str, from_value, Value}; #[cfg(not(feature = "liquid"))] @@ -378,26 +379,16 @@ impl Daemon { Ok(result) } - fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { + fn handle_request(&self, method: &str, params: &Value) -> Result { let id = self.message_id.next(); - let reqs = params_list - .iter() - .map(|params| json!({"method": method, "params": params, "id": id})) - .collect(); - let mut results = vec![]; - let mut replies = self.call_jsonrpc(method, &reqs)?; - if let Some(replies_vec) = replies.as_array_mut() { - for reply in replies_vec { - results.push(parse_jsonrpc_reply(reply.take(), method, id)?) - } - return Ok(results); - } - bail!("non-array replies: {:?}", replies); + let req = json!({"method": method, "params": params, "id": id}); + let reply = self.call_jsonrpc(method, &req)?; + parse_jsonrpc_reply(reply, method, id) } - fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { + fn retry_request(&self, method: &str, params: &Value) -> Result { loop { - match self.handle_request_batch(method, params_list) { + match self.handle_request(method, ¶ms) { Err(Error(ErrorKind::Connection(msg), _)) => { warn!("reconnecting to bitcoind: {}", msg); self.signal.wait(Duration::from_secs(3), false)?; @@ -411,13 +402,14 @@ impl Daemon { } fn request(&self, method: &str, params: Value) -> Result { - let mut values = self.retry_request_batch(method, &[params])?; - assert_eq!(values.len(), 1); - Ok(values.remove(0)) + self.retry_request(method, ¶ms) } fn requests(&self, method: &str, params_list: &[Value]) -> Result> { - self.retry_request_batch(method, params_list) + params_list + .par_iter() + .map(|params| self.retry_request(method, params)) + .collect::>>() } // bitcoind JSONRPC API: