Skip to content

Refactor cbf, better rpc error logging and hard code checkpoints #90

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions client/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ impl App {
}

pub async fn run(&mut self, args: Vec<String>) -> anyhow::Result<()> {
let shutdown_receiver = self.shutdown.subscribe();
let spaced = Args::configure(args, shutdown_receiver).await?;
let spaced = Args::configure(args).await?;
self.setup_rpc_services(&spaced).await;
self.setup_sync_service(spaced).await;

Expand Down
7 changes: 7 additions & 0 deletions client/src/cbf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct CompactFilterSync {
total_filters: u32,
wait: Option<Instant>,
state: SyncState,
filters_queued: bool
}

enum SyncState {
Expand Down Expand Up @@ -63,6 +64,7 @@ impl CompactFilterSync {
total_filters: 0,
wait: None,
state: SyncState::SyncChecks,
filters_queued: false,
};
cbf.load_scripts(wallet);
cbf
Expand Down Expand Up @@ -123,6 +125,11 @@ impl CompactFilterSync {
return Ok(());
}
if info.filters != info.filter_headers {
if !self.filters_queued {
source.queue_filters()?;
self.filters_queued = true;
}

info!("Filters syncing, retrying...");
*progress = WalletProgressUpdate::CbfFilterSync {
total: info.filter_headers.unwrap_or(0),
Expand Down
1 change: 1 addition & 0 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub trait BlockSource {
fn get_blockchain_info(&self) -> Result<BlockchainInfo, BitcoinRpcError>;
fn get_block_filter_by_height(&self, height: u32) -> Result<Option<BlockFilterRpc>, BitcoinRpcError>;
fn queue_blocks(&self, heights: Vec<u32>) -> Result<(), BitcoinRpcError>;
fn queue_filters(&self) -> Result<(), BitcoinRpcError>;
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
4 changes: 2 additions & 2 deletions client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl ExtendedNetwork {
impl Args {
/// Configures spaced node by processing command line arguments
/// and configuration files
pub async fn configure(args: Vec<String>, shutdown: tokio::sync::broadcast::Receiver<()>) -> anyhow::Result<Spaced> {
pub async fn configure(args: Vec<String>) -> anyhow::Result<Spaced> {
let mut args = Args::try_parse_from(args)?;
let default_dirs = get_default_node_dirs();

Expand Down Expand Up @@ -147,7 +147,7 @@ impl Args {
!args.bitcoin_rpc_light
);

let genesis = Spaced::genesis(&rpc, args.chain, shutdown).await?;
let genesis = Spaced::genesis(args.chain);

fs::create_dir_all(data_dir.clone())?;

Expand Down
2 changes: 1 addition & 1 deletion client/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ impl WalletManager {
.rpc
.send_json(&client, &self.rpc.get_block_count())
.await?;
let height = std::cmp::max(count - 20, 0) as u32;
let height = std::cmp::max(count - 1, 0) as u32;

let hash = self
.rpc
Expand Down
80 changes: 73 additions & 7 deletions client/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
},
time::Duration,
};

use std::error::Error;
use base64::Engine;
use bitcoin::{Block, BlockHash, Txid};
use hex::FromHexError;
Expand Down Expand Up @@ -190,6 +190,11 @@ impl BitcoinRpc {
self.make_request("queueblocks", params)
}

pub fn queue_filters(&self) -> BitcoinRpcRequest {
let params = serde_json::json!([]);
self.make_request("queuefilters", params)
}

pub fn get_mempool_entry(&self, txid: &Txid) -> BitcoinRpcRequest {
let params = serde_json::json!([txid]);

Expand Down Expand Up @@ -276,7 +281,7 @@ impl BitcoinRpc {
{
Ok(res) => return Self::clean_rpc_response(res).await,
Err(e) if e.is_temporary() && attempt < max_retries - 1 => {
error!("Rpc: {} - retrying in {:?}...", e, delay);
log_rpc_error(&request.body, &e, delay);
last_error = Some(e.into());
tokio::time::sleep(delay).await;
delay *= 2;
Expand All @@ -302,15 +307,14 @@ impl BitcoinRpc {
if let Some(auth) = self.auth_token.as_ref() {
builder = builder.header("Authorization", format!("Basic {}", auth));
}

match builder
.json(&request.body)
.send()
.map_err(BitcoinRpcError::from)
{
Ok(res) => return Self::clean_rpc_response_blocking(res),
Err(e) if e.is_temporary() && attempt < max_retries - 1 => {
error!("Rpc: {} - retrying in {:?}...", e, delay);
log_rpc_error(&request.body, &e, delay);
last_error = Some(e.into());
std::thread::sleep(delay);
delay *= 2;
Expand Down Expand Up @@ -811,7 +815,28 @@ impl std::error::Error for BitcoinRpcError {

impl ErrorForRpc for reqwest::Response {
async fn error_for_rpc<T: DeserializeOwned>(self) -> Result<T, BitcoinRpcError> {
let rpc_res: JsonRpcResponse<T> = self.json().await?;
let text = self
.text()
.await
.map_err(|e| BitcoinRpcError::Other(format!("Could not read response body: {}", e)))?;

// Try to deserialize as JsonRpcResponse<T>
let rpc_res: JsonRpcResponse<T> = match serde_json::from_str(&text) {
Ok(rpc_res) => rpc_res,
Err(e) => {
// Try to decode without result
let error_res: Option<JsonRpcResponse<Option<String>>> = serde_json::from_str(&text).ok();
if let Some(error_res) = error_res {
if let Some(error) = error_res.error {
return Err(BitcoinRpcError::Rpc(error));
}
}
return Err(BitcoinRpcError::Other(
format!("Expected a JSON response, got '{}': {}", text, e),
));
}
};

if let Some(e) = rpc_res.error {
return Err(BitcoinRpcError::Rpc(e));
}
Expand All @@ -822,7 +847,28 @@ impl ErrorForRpc for reqwest::Response {

impl ErrorForRpcBlocking for reqwest::blocking::Response {
fn error_for_rpc<T: DeserializeOwned>(self) -> Result<T, BitcoinRpcError> {
let rpc_res: JsonRpcResponse<T> = self.json()?;
let text = self.text().map_err(|e| BitcoinRpcError::Other(
format!("Could not read response body: {}", e),
))?;

// Attempt to deserialize the text as JSON
let rpc_res: JsonRpcResponse<T> = match serde_json::from_str(&text)
{
Ok(rpc_res) => rpc_res,
Err(e) => {
// try to decode without result
let error_res : Option<JsonRpcResponse<Option<String>>> = serde_json::from_str(&text).ok();
if let Some(error_res) = error_res {
if let Some(error) = error_res.error {
return Err(BitcoinRpcError::Rpc(error));
}
}
return Err(BitcoinRpcError::Other(
format!("Expected a JSON response, got '{}': {}", text, e),
))
}
};

if let Some(e) = rpc_res.error {
return Err(BitcoinRpcError::Rpc(e));
}
Expand Down Expand Up @@ -905,7 +951,6 @@ impl BlockSource for BitcoinBlockSource {
.send_json_blocking(&self.client, &self.rpc.get_block_count())?)
}


fn get_best_chain(&self, tip: Option<u32>, expected_chain: Network) -> Result<Option<ChainAnchor>, BitcoinRpcError> {
#[derive(Deserialize)]
struct Info {
Expand Down Expand Up @@ -981,4 +1026,25 @@ impl BlockSource for BitcoinBlockSource {
.send_json_blocking::<()>(&self.client, &self.rpc.queue_blocks(heights))?;
Ok(())
}

fn queue_filters(&self) -> anyhow::Result<(), BitcoinRpcError> {
self
.rpc
.send_json_blocking::<()>(&self.client, &self.rpc.queue_filters())?;
Ok(())
}
}

fn log_rpc_error(request: &Value, e: &BitcoinRpcError, delay: Duration) {
let rpc_method = serde_json::to_string(&request.get("method"))
.unwrap_or("".to_string());
let rpc_params = serde_json::to_string(&request.get("params"))
.unwrap_or("".to_string());
let src = match e {
BitcoinRpcError::Transport(e) =>
e.source().map(|s| format!("({:?})", s)),
_ => None
}.unwrap_or("".to_string());

error!("Rpc {}{}: {}{} - retrying in {:?}...", rpc_method, rpc_params, e, src, delay);
}
79 changes: 4 additions & 75 deletions client/src/spaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{net::SocketAddr, path::PathBuf, time::Duration};
use anyhow::{anyhow, Context};
use log::{info, warn};
use spaces_protocol::{
bitcoin::{hashes::Hash, Block, BlockHash},
bitcoin::{Block, BlockHash},
constants::ChainAnchor,
hasher::BaseHash,
};
Expand Down Expand Up @@ -255,86 +255,15 @@ impl Spaced {
Ok(())
}

pub async fn genesis(
rpc: &BitcoinRpc,
pub fn genesis(
network: ExtendedNetwork,
mut shutdown: tokio::sync::broadcast::Receiver<()>
) -> anyhow::Result<ChainAnchor> {
let mut anchor = match network {
) -> ChainAnchor {
match network {
ExtendedNetwork::Testnet => ChainAnchor::TESTNET(),
ExtendedNetwork::Testnet4 => ChainAnchor::TESTNET4(),
ExtendedNetwork::Regtest => ChainAnchor::REGTEST(),
ExtendedNetwork::Mainnet => ChainAnchor::MAINNET(),
_ => panic!("unsupported network"),
};



// Wait for the RPC node to be ready
let mut attempts = 0;
let mut last_error = BitcoinRpcError::Other("Unknown error".to_string());
loop {
if shutdown.try_recv().is_ok() {
return Err(anyhow!("Fetching activation height terminated: shutdown requested"))
}
if attempts > 5 {
return Err(anyhow!(
"Could not retrieve activation height: {}",
last_error
));
}

let rpc_task = rpc.clone();
let net_task = network.fallback_network();
let best_chain = tokio::task::spawn_blocking(move || {
let source = BitcoinBlockSource::new(rpc_task);
source.get_best_chain(Some(anchor.height), net_task)
}).await.expect("join");

match best_chain {
Ok(Some(tip)) => {
info!("Connect to RPC node (tip: {})", tip.height);
if anchor.hash != BlockHash::all_zeros() {
break;
}

// Pull the activation block hash
let client = reqwest::Client::new();
anchor.hash = match rpc
.send_json(&client, &rpc.get_block_hash(anchor.height))
.await
{
Ok(hash) => hash,
Err(e) => {
warn!("Fetching height {}:{}, retrying in 1s ...", anchor.height, e);
last_error = e;
match &last_error {
BitcoinRpcError::Rpc(_) => {}
_ => attempts += 1,
}
continue;
}
};

break;
}
Ok(None) => {
warn!("Connected RPC node is still syncing, waiting 5s ...");
tokio::time::sleep(Duration::from_secs(5)).await;
}
Err(e) => {
warn!("Error fetching blockchain info: {}, retrying in 1s ...", e);
last_error = e;
tokio::time::sleep(Duration::from_secs(1)).await;
match &last_error {
BitcoinRpcError::Rpc(_) => {}
_ => attempts += 1,
}
}
}
}


Ok(anchor)
}
}
Loading
Loading