Skip to content

Commit cb4d0f5

Browse files
authored
Merge pull request #90 from buffrr/cbf-constants
Refactor cbf, better rpc error logging and hard code checkpoints
2 parents 33f6c96 + 09913cf commit cb4d0f5

File tree

9 files changed

+146
-122
lines changed

9 files changed

+146
-122
lines changed

client/src/app.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,7 @@ impl App {
100100
}
101101

102102
pub async fn run(&mut self, args: Vec<String>) -> anyhow::Result<()> {
103-
let shutdown_receiver = self.shutdown.subscribe();
104-
let spaced = Args::configure(args, shutdown_receiver).await?;
103+
let spaced = Args::configure(args).await?;
105104
self.setup_rpc_services(&spaced).await;
106105
self.setup_sync_service(spaced).await;
107106

client/src/cbf.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub struct CompactFilterSync {
2929
total_filters: u32,
3030
wait: Option<Instant>,
3131
state: SyncState,
32+
filters_queued: bool
3233
}
3334

3435
enum SyncState {
@@ -63,6 +64,7 @@ impl CompactFilterSync {
6364
total_filters: 0,
6465
wait: None,
6566
state: SyncState::SyncChecks,
67+
filters_queued: false,
6668
};
6769
cbf.load_scripts(wallet);
6870
cbf
@@ -123,6 +125,11 @@ impl CompactFilterSync {
123125
return Ok(());
124126
}
125127
if info.filters != info.filter_headers {
128+
if !self.filters_queued {
129+
source.queue_filters()?;
130+
self.filters_queued = true;
131+
}
132+
126133
info!("Filters syncing, retrying...");
127134
*progress = WalletProgressUpdate::CbfFilterSync {
128135
total: info.filter_headers.unwrap_or(0),

client/src/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub trait BlockSource {
3333
fn get_blockchain_info(&self) -> Result<BlockchainInfo, BitcoinRpcError>;
3434
fn get_block_filter_by_height(&self, height: u32) -> Result<Option<BlockFilterRpc>, BitcoinRpcError>;
3535
fn queue_blocks(&self, heights: Vec<u32>) -> Result<(), BitcoinRpcError>;
36+
fn queue_filters(&self) -> Result<(), BitcoinRpcError>;
3637
}
3738

3839
#[derive(Debug, Clone, Serialize, Deserialize)]

client/src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl ExtendedNetwork {
101101
impl Args {
102102
/// Configures spaced node by processing command line arguments
103103
/// and configuration files
104-
pub async fn configure(args: Vec<String>, shutdown: tokio::sync::broadcast::Receiver<()>) -> anyhow::Result<Spaced> {
104+
pub async fn configure(args: Vec<String>) -> anyhow::Result<Spaced> {
105105
let mut args = Args::try_parse_from(args)?;
106106
let default_dirs = get_default_node_dirs();
107107

@@ -147,7 +147,7 @@ impl Args {
147147
!args.bitcoin_rpc_light
148148
);
149149

150-
let genesis = Spaced::genesis(&rpc, args.chain, shutdown).await?;
150+
let genesis = Spaced::genesis(args.chain);
151151

152152
fs::create_dir_all(data_dir.clone())?;
153153

client/src/rpc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,7 @@ impl WalletManager {
642642
.rpc
643643
.send_json(&client, &self.rpc.get_block_count())
644644
.await?;
645-
let height = std::cmp::max(count - 20, 0) as u32;
645+
let height = std::cmp::max(count - 1, 0) as u32;
646646

647647
let hash = self
648648
.rpc

client/src/source.rs

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88
},
99
time::Duration,
1010
};
11-
11+
use std::error::Error;
1212
use base64::Engine;
1313
use bitcoin::{Block, BlockHash, Txid};
1414
use hex::FromHexError;
@@ -190,6 +190,11 @@ impl BitcoinRpc {
190190
self.make_request("queueblocks", params)
191191
}
192192

193+
pub fn queue_filters(&self) -> BitcoinRpcRequest {
194+
let params = serde_json::json!([]);
195+
self.make_request("queuefilters", params)
196+
}
197+
193198
pub fn get_mempool_entry(&self, txid: &Txid) -> BitcoinRpcRequest {
194199
let params = serde_json::json!([txid]);
195200

@@ -276,7 +281,7 @@ impl BitcoinRpc {
276281
{
277282
Ok(res) => return Self::clean_rpc_response(res).await,
278283
Err(e) if e.is_temporary() && attempt < max_retries - 1 => {
279-
error!("Rpc: {} - retrying in {:?}...", e, delay);
284+
log_rpc_error(&request.body, &e, delay);
280285
last_error = Some(e.into());
281286
tokio::time::sleep(delay).await;
282287
delay *= 2;
@@ -302,15 +307,14 @@ impl BitcoinRpc {
302307
if let Some(auth) = self.auth_token.as_ref() {
303308
builder = builder.header("Authorization", format!("Basic {}", auth));
304309
}
305-
306310
match builder
307311
.json(&request.body)
308312
.send()
309313
.map_err(BitcoinRpcError::from)
310314
{
311315
Ok(res) => return Self::clean_rpc_response_blocking(res),
312316
Err(e) if e.is_temporary() && attempt < max_retries - 1 => {
313-
error!("Rpc: {} - retrying in {:?}...", e, delay);
317+
log_rpc_error(&request.body, &e, delay);
314318
last_error = Some(e.into());
315319
std::thread::sleep(delay);
316320
delay *= 2;
@@ -811,7 +815,28 @@ impl std::error::Error for BitcoinRpcError {
811815

812816
impl ErrorForRpc for reqwest::Response {
813817
async fn error_for_rpc<T: DeserializeOwned>(self) -> Result<T, BitcoinRpcError> {
814-
let rpc_res: JsonRpcResponse<T> = self.json().await?;
818+
let text = self
819+
.text()
820+
.await
821+
.map_err(|e| BitcoinRpcError::Other(format!("Could not read response body: {}", e)))?;
822+
823+
// Try to deserialize as JsonRpcResponse<T>
824+
let rpc_res: JsonRpcResponse<T> = match serde_json::from_str(&text) {
825+
Ok(rpc_res) => rpc_res,
826+
Err(e) => {
827+
// Try to decode without result
828+
let error_res: Option<JsonRpcResponse<Option<String>>> = serde_json::from_str(&text).ok();
829+
if let Some(error_res) = error_res {
830+
if let Some(error) = error_res.error {
831+
return Err(BitcoinRpcError::Rpc(error));
832+
}
833+
}
834+
return Err(BitcoinRpcError::Other(
835+
format!("Expected a JSON response, got '{}': {}", text, e),
836+
));
837+
}
838+
};
839+
815840
if let Some(e) = rpc_res.error {
816841
return Err(BitcoinRpcError::Rpc(e));
817842
}
@@ -822,7 +847,28 @@ impl ErrorForRpc for reqwest::Response {
822847

823848
impl ErrorForRpcBlocking for reqwest::blocking::Response {
824849
fn error_for_rpc<T: DeserializeOwned>(self) -> Result<T, BitcoinRpcError> {
825-
let rpc_res: JsonRpcResponse<T> = self.json()?;
850+
let text = self.text().map_err(|e| BitcoinRpcError::Other(
851+
format!("Could not read response body: {}", e),
852+
))?;
853+
854+
// Attempt to deserialize the text as JSON
855+
let rpc_res: JsonRpcResponse<T> = match serde_json::from_str(&text)
856+
{
857+
Ok(rpc_res) => rpc_res,
858+
Err(e) => {
859+
// try to decode without result
860+
let error_res : Option<JsonRpcResponse<Option<String>>> = serde_json::from_str(&text).ok();
861+
if let Some(error_res) = error_res {
862+
if let Some(error) = error_res.error {
863+
return Err(BitcoinRpcError::Rpc(error));
864+
}
865+
}
866+
return Err(BitcoinRpcError::Other(
867+
format!("Expected a JSON response, got '{}': {}", text, e),
868+
))
869+
}
870+
};
871+
826872
if let Some(e) = rpc_res.error {
827873
return Err(BitcoinRpcError::Rpc(e));
828874
}
@@ -905,7 +951,6 @@ impl BlockSource for BitcoinBlockSource {
905951
.send_json_blocking(&self.client, &self.rpc.get_block_count())?)
906952
}
907953

908-
909954
fn get_best_chain(&self, tip: Option<u32>, expected_chain: Network) -> Result<Option<ChainAnchor>, BitcoinRpcError> {
910955
#[derive(Deserialize)]
911956
struct Info {
@@ -981,4 +1026,25 @@ impl BlockSource for BitcoinBlockSource {
9811026
.send_json_blocking::<()>(&self.client, &self.rpc.queue_blocks(heights))?;
9821027
Ok(())
9831028
}
1029+
1030+
fn queue_filters(&self) -> anyhow::Result<(), BitcoinRpcError> {
1031+
self
1032+
.rpc
1033+
.send_json_blocking::<()>(&self.client, &self.rpc.queue_filters())?;
1034+
Ok(())
1035+
}
1036+
}
1037+
1038+
fn log_rpc_error(request: &Value, e: &BitcoinRpcError, delay: Duration) {
1039+
let rpc_method = serde_json::to_string(&request.get("method"))
1040+
.unwrap_or("".to_string());
1041+
let rpc_params = serde_json::to_string(&request.get("params"))
1042+
.unwrap_or("".to_string());
1043+
let src = match e {
1044+
BitcoinRpcError::Transport(e) =>
1045+
e.source().map(|s| format!("({:?})", s)),
1046+
_ => None
1047+
}.unwrap_or("".to_string());
1048+
1049+
error!("Rpc {}{}: {}{} - retrying in {:?}...", rpc_method, rpc_params, e, src, delay);
9841050
}

client/src/spaces.rs

Lines changed: 4 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{net::SocketAddr, path::PathBuf, time::Duration};
33
use anyhow::{anyhow, Context};
44
use log::{info, warn};
55
use spaces_protocol::{
6-
bitcoin::{hashes::Hash, Block, BlockHash},
6+
bitcoin::{Block, BlockHash},
77
constants::ChainAnchor,
88
hasher::BaseHash,
99
};
@@ -255,86 +255,15 @@ impl Spaced {
255255
Ok(())
256256
}
257257

258-
pub async fn genesis(
259-
rpc: &BitcoinRpc,
258+
pub fn genesis(
260259
network: ExtendedNetwork,
261-
mut shutdown: tokio::sync::broadcast::Receiver<()>
262-
) -> anyhow::Result<ChainAnchor> {
263-
let mut anchor = match network {
260+
) -> ChainAnchor {
261+
match network {
264262
ExtendedNetwork::Testnet => ChainAnchor::TESTNET(),
265263
ExtendedNetwork::Testnet4 => ChainAnchor::TESTNET4(),
266264
ExtendedNetwork::Regtest => ChainAnchor::REGTEST(),
267265
ExtendedNetwork::Mainnet => ChainAnchor::MAINNET(),
268266
_ => panic!("unsupported network"),
269-
};
270-
271-
272-
273-
// Wait for the RPC node to be ready
274-
let mut attempts = 0;
275-
let mut last_error = BitcoinRpcError::Other("Unknown error".to_string());
276-
loop {
277-
if shutdown.try_recv().is_ok() {
278-
return Err(anyhow!("Fetching activation height terminated: shutdown requested"))
279-
}
280-
if attempts > 5 {
281-
return Err(anyhow!(
282-
"Could not retrieve activation height: {}",
283-
last_error
284-
));
285-
}
286-
287-
let rpc_task = rpc.clone();
288-
let net_task = network.fallback_network();
289-
let best_chain = tokio::task::spawn_blocking(move || {
290-
let source = BitcoinBlockSource::new(rpc_task);
291-
source.get_best_chain(Some(anchor.height), net_task)
292-
}).await.expect("join");
293-
294-
match best_chain {
295-
Ok(Some(tip)) => {
296-
info!("Connect to RPC node (tip: {})", tip.height);
297-
if anchor.hash != BlockHash::all_zeros() {
298-
break;
299-
}
300-
301-
// Pull the activation block hash
302-
let client = reqwest::Client::new();
303-
anchor.hash = match rpc
304-
.send_json(&client, &rpc.get_block_hash(anchor.height))
305-
.await
306-
{
307-
Ok(hash) => hash,
308-
Err(e) => {
309-
warn!("Fetching height {}:{}, retrying in 1s ...", anchor.height, e);
310-
last_error = e;
311-
match &last_error {
312-
BitcoinRpcError::Rpc(_) => {}
313-
_ => attempts += 1,
314-
}
315-
continue;
316-
}
317-
};
318-
319-
break;
320-
}
321-
Ok(None) => {
322-
warn!("Connected RPC node is still syncing, waiting 5s ...");
323-
tokio::time::sleep(Duration::from_secs(5)).await;
324-
}
325-
Err(e) => {
326-
warn!("Error fetching blockchain info: {}, retrying in 1s ...", e);
327-
last_error = e;
328-
tokio::time::sleep(Duration::from_secs(1)).await;
329-
match &last_error {
330-
BitcoinRpcError::Rpc(_) => {}
331-
_ => attempts += 1,
332-
}
333-
}
334-
}
335267
}
336-
337-
338-
Ok(anchor)
339268
}
340269
}

0 commit comments

Comments
 (0)