Skip to content

Commit bfe86cd

Browse files
committed
Retry failed RPC calls on error
Whenever the transport errors with "WouldBlock" we wait a bit and retry the call.
1 parent 9f9ffd0 commit bfe86cd

File tree

1 file changed

+78
-1
lines changed

1 file changed

+78
-1
lines changed

src/blockchain/rpc.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,17 @@ use bitcoincore_rpc::json::{
4545
ListUnspentResultEntry, ScanningDetails,
4646
};
4747
use bitcoincore_rpc::jsonrpc::serde_json::{json, Value};
48+
use bitcoincore_rpc::jsonrpc::{
49+
self, simple_http::SimpleHttpTransport, Error as JsonRpcError, Request, Response, Transport,
50+
};
4851
use bitcoincore_rpc::Auth as RpcAuth;
4952
use bitcoincore_rpc::{Client, RpcApi};
5053
use log::{debug, info};
5154
use serde::{Deserialize, Serialize};
5255
use std::collections::{HashMap, HashSet};
56+
use std::fmt;
5357
use std::path::PathBuf;
58+
use std::sync::atomic::{AtomicU8, Ordering};
5459
use std::thread;
5560
use std::time::Duration;
5661

@@ -80,6 +85,10 @@ pub struct RpcConfig {
8085
pub wallet_name: String,
8186
/// Sync parameters
8287
pub sync_params: Option<RpcSyncParams>,
88+
/// Max number of attempts before giving up and returning an error
89+
///
90+
/// Set to `0` preserve the old behavior of erroring immediately
91+
pub max_tries: u8,
8392
}
8493

8594
/// Sync parameters for Bitcoin Core RPC.
@@ -195,6 +204,51 @@ impl WalletSync for RpcBlockchain {
195204
}
196205
}
197206

207+
struct SimpleHttpWithRetry {
208+
inner: SimpleHttpTransport,
209+
attempts: AtomicU8,
210+
limit: u8,
211+
}
212+
213+
macro_rules! impl_inner {
214+
($self:expr, $method:ident, $req:expr) => {{
215+
while $self.attempts.load(Ordering::Relaxed) < $self.limit {
216+
match $self.inner.$method($req.clone()) {
217+
Ok(r) => {
218+
$self.attempts.store(0, Ordering::Relaxed);
219+
return Ok(r);
220+
}
221+
Err(JsonRpcError::Transport(e)) if &e.to_string() == "Would block" => {
222+
let attempt = $self.attempts.fetch_add(1, Ordering::Relaxed);
223+
let delay = std::cmp::min(1000, 100 << attempt as u64);
224+
std::thread::sleep(std::time::Duration::from_millis(delay));
225+
}
226+
Err(e) => {
227+
$self.attempts.store(0, Ordering::Relaxed);
228+
return Err(e);
229+
}
230+
}
231+
}
232+
233+
$self.attempts.store(0, Ordering::Relaxed);
234+
Err(JsonRpcError::Transport("All attempts errored".into()))
235+
}};
236+
}
237+
238+
impl Transport for SimpleHttpWithRetry {
239+
fn send_request(&self, req: Request) -> Result<Response, JsonRpcError> {
240+
impl_inner!(self, send_request, req)
241+
}
242+
243+
fn send_batch(&self, reqs: &[Request]) -> Result<Vec<Response>, JsonRpcError> {
244+
impl_inner!(self, send_batch, reqs)
245+
}
246+
247+
fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result {
248+
self.inner.fmt_target(f)
249+
}
250+
}
251+
198252
impl ConfigurableBlockchain for RpcBlockchain {
199253
type Config = RpcConfig;
200254

@@ -203,7 +257,23 @@ impl ConfigurableBlockchain for RpcBlockchain {
203257
fn from_config(config: &Self::Config) -> Result<Self, Error> {
204258
let wallet_url = format!("{}/wallet/{}", config.url, &config.wallet_name);
205259

206-
let client = Client::new(wallet_url.as_str(), config.auth.clone().into())?;
260+
let mut builder = SimpleHttpTransport::builder()
261+
.url(&wallet_url)
262+
.map_err(|e| bitcoincore_rpc::Error::JsonRpc(e.into()))?;
263+
264+
let (user, pass) = bitcoincore_rpc::Auth::from(config.auth.clone()).get_user_pass()?;
265+
if let Some(user) = user {
266+
builder = builder.auth(user, pass);
267+
}
268+
269+
let transport = SimpleHttpWithRetry {
270+
inner: builder.build(),
271+
attempts: AtomicU8::new(0),
272+
limit: config.max_tries,
273+
};
274+
let jsonrpc_client = jsonrpc::client::Client::with_transport(transport);
275+
276+
let client = Client::from_jsonrpc(jsonrpc_client);
207277
let rpc_version = client.version()?;
208278

209279
info!("connected to '{}' with auth: {:?}", wallet_url, config.auth);
@@ -835,6 +905,10 @@ pub struct RpcBlockchainFactory {
835905
pub default_skip_blocks: u32,
836906
/// Sync parameters
837907
pub sync_params: Option<RpcSyncParams>,
908+
/// Max number of attempts before giving up and returning an error
909+
///
910+
/// Set to `0` preserve the old behavior of erroring immediately
911+
pub max_tries: u8,
838912
}
839913

840914
impl BlockchainFactory for RpcBlockchainFactory {
@@ -855,6 +929,7 @@ impl BlockchainFactory for RpcBlockchainFactory {
855929
checksum
856930
),
857931
sync_params: self.sync_params.clone(),
932+
max_tries: self.max_tries,
858933
})
859934
}
860935
}
@@ -882,6 +957,7 @@ mod test {
882957
network: Network::Regtest,
883958
wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ),
884959
sync_params: None,
960+
max_tries: 5,
885961
};
886962
RpcBlockchain::from_config(&config).unwrap()
887963
}
@@ -899,6 +975,7 @@ mod test {
899975
wallet_name_prefix: Some("prefix-".into()),
900976
default_skip_blocks: 0,
901977
sync_params: None,
978+
max_tries: 3,
902979
};
903980

904981
(test_client, factory)

0 commit comments

Comments
 (0)