From 309b2c1d1ff23384edd10656aa16a0074e8249a1 Mon Sep 17 00:00:00 2001 From: Boyu Yang Date: Thu, 11 Apr 2024 16:09:26 +0800 Subject: [PATCH] perf: cache the SPV instance to improve the performance of the JSON-RPC APIs --- src/components/api_service/mod.rs | 133 +++++++++++++++++++++++++++--- src/components/ckb_client.rs | 79 +++++++++++------- src/components/spv_service.rs | 6 -- 3 files changed, 172 insertions(+), 46 deletions(-) diff --git a/src/components/api_service/mod.rs b/src/components/api_service/mod.rs index 52271bd..7222d66 100644 --- a/src/components/api_service/mod.rs +++ b/src/components/api_service/mod.rs @@ -1,6 +1,6 @@ //! JSON-RPC APIs service. -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::RwLock, time::SystemTime}; use bitcoin::Txid; use ckb_bitcoin_spv_verifier::types::{ @@ -16,7 +16,7 @@ use jsonrpc_server_utils::{cors::AccessControlAllowOrigin, hosts::DomainsValidat use serde::Serialize; use crate::{ - components::{SpvClientCell, SpvService}, + components::{SpvInstance, SpvService}, prelude::*, result::{Error, Result}, }; @@ -25,6 +25,9 @@ mod error; pub use error::ApiErrorCode; +// Bitcoin target block time is 10 minutes. +const SPV_INSTANCE_CACHED_SECS: u64 = 60 * 10; + pub struct ApiServiceConfig { listen_address: SocketAddr, } @@ -48,6 +51,13 @@ pub trait SpvRpc { pub struct SpvRpcImpl { spv_service: SpvService, + cached_spv_instance: RwLock>, +} + +#[derive(Clone)] +struct CachedSpvInstance { + instance: SpvInstance, + expired_timestamp: u64, } impl ApiServiceConfig { @@ -74,7 +84,69 @@ impl ApiServiceConfig { impl SpvRpcImpl { pub fn new(spv_service: SpvService) -> Self { - Self { spv_service } + Self { + spv_service, + cached_spv_instance: RwLock::new(None), + } + } + + fn load_spv_instance(&self) -> Option { + if let Some(cached) = self + .cached_spv_instance + .read() + .ok() + .and_then(|locked| locked.as_ref().cloned()) + { + if let Ok(dur) = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) { + let current_timestamp = dur.as_secs(); + if current_timestamp > cached.expired_timestamp { + log::trace!( + "cached SPV instance is expired, expired at {}, now is {current_timestamp}", + cached.expired_timestamp + ); + None + } else { + log::trace!( + "cached SPV instance is loaded, will be expired at {}, now is {current_timestamp}", + cached.expired_timestamp + ); + Some(cached.instance) + } + } else { + log::warn!("failed to read current timestamp for load the cached SPV instance"); + None + } + } else { + log::debug!("failed to load cached SPV instance: not existed or lock error"); + None + } + } + + fn update_spv_instance(&self, instance: SpvInstance) { + match self.cached_spv_instance.write() { + Ok(mut locked) => { + if let Ok(dur) = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) { + let current_timestamp = dur.as_secs(); + let expired_timestamp = current_timestamp + SPV_INSTANCE_CACHED_SECS; + let cache = CachedSpvInstance { + instance, + expired_timestamp, + }; + *locked = Some(cache); + log::debug!( + "refreshed the cached SPV instance, it will be expired at {expired_timestamp}, \ + now is {current_timestamp}", + ); + } else { + log::warn!( + "failed to read current timestamp for update the cached SPV instance" + ); + } + } + Err(err) => { + log::debug!("failed to update the cached SPV instance since {err}"); + } + } } } @@ -85,7 +157,7 @@ impl SpvRpc for SpvRpcImpl { tx_index: u32, confirmations: u32, ) -> RpcResult { - log::trace!("Call getTxProof with params [{txid:#x}, {confirmations}]"); + log::debug!("Call getTxProof with params [{txid:#x}, {confirmations}]"); let spv = &self.spv_service; let (target_height, target_hash, raw_tx_out_proof) = @@ -126,7 +198,7 @@ impl SpvRpc for SpvRpcImpl { data: None, } })?; - log::trace!(">>> tip height in local storage is {stg_tip_height}"); + log::debug!(">>> tip height in local storage is {stg_tip_height}"); if stg_tip_height < target_height { let desc = format!( @@ -160,19 +232,56 @@ impl SpvRpc for SpvRpcImpl { return Err(ApiErrorCode::StorageHeaderUnmatched.with_desc(desc)); } - let spv_client_cell = tokio::task::block_in_place(|| -> RpcResult { - spv.find_best_spv_client(stg_tip_height).map_err(|err| { - let message = - format!("failed to get SPV cell base on height {stg_tip_height} from chain"); + let spv_type_script = spv.storage.spv_contract_type_script().map_err(|err| { + let message = "failed to get SPV contract type script from storage".to_owned(); + log::error!("{message} since {err}"); + RpcError { + code: RpcErrorCode::InternalError, + message, + data: None, + } + })?; + + log::debug!(">>> try the cached SPV instance at first"); + + let spv_instance = if let Some(spv_instance) = self.load_spv_instance() { + log::debug!(">>> the cached SPV instance is {spv_instance}"); + spv_instance + } else { + log::debug!(">>> fetch SPV instance from remote since cached is not satisfied"); + let spv_instance = tokio::task::block_in_place(|| -> RpcResult { + spv.ckb_cli.find_spv_cells(spv_type_script).map_err(|err| { + let message = format!( + "failed to get SPV cell base on height {stg_tip_height} from chain" + ); + log::error!("{message} since {err}"); + RpcError { + code: RpcErrorCode::InternalError, + message, + data: None, + } + }) + })?; + log::debug!(">>> the fetched SPV instance is {spv_instance}"); + self.update_spv_instance(spv_instance.clone()); + spv_instance + }; + + let spv_client_cell = spv_instance + .find_best_spv_client_include_height(stg_tip_height) + .map_err(|err| { + let message = format!( + "failed to get SPV cell base on height {stg_tip_height} from fetched data" + ); log::error!("{message} since {err}"); RpcError { code: RpcErrorCode::InternalError, message, data: None, } - }) - })?; - log::trace!(">>> the best SPV client is {}", spv_client_cell.client); + })?; + + log::debug!(">>> the best SPV client is {}", spv_client_cell.client); let spv_header_root = &spv_client_cell.client.headers_mmr_root; diff --git a/src/components/ckb_client.rs b/src/components/ckb_client.rs index 20a5df5..b735e65 100644 --- a/src/components/ckb_client.rs +++ b/src/components/ckb_client.rs @@ -1,6 +1,6 @@ //! Expand the functionality of the original CKB RPC client. -use std::collections::HashMap; +use std::{collections::HashMap, fmt}; use ckb_bitcoin_spv_verifier::types::{ core::{SpvClient, SpvInfo}, @@ -36,6 +36,7 @@ pub struct SpvClientCell { pub(crate) cell: LiveCell, } +#[derive(Clone)] pub struct SpvInstance { pub(crate) info: SpvInfoCell, pub(crate) clients: HashMap, @@ -76,35 +77,11 @@ pub trait CkbRpcClientExtension { spv_type_script: Script, height_opt: Option, ) -> Result { - let SpvInstance { mut info, clients } = self.find_spv_cells(spv_type_script)?; + let instance = self.find_spv_cells(spv_type_script)?; if let Some(height) = height_opt { - for _ in 0..clients.len() { - let cell = clients.get(&info.info.tip_client_id).ok_or_else(|| { - let msg = format!( - "the SPV client (id={}) is not found", - info.info.tip_client_id - ); - Error::other(msg) - })?; - if cell.client.headers_mmr_root.max_height <= height { - return Ok(cell.to_owned()); - } - info.info.tip_client_id = info.prev_tip_client_id(); - } - let msg = - format!("all SPV clients have better heights than server has (height: {height})"); - Err(Error::other(msg)) + instance.find_best_spv_client_include_height(height) } else { - clients - .get(&info.info.tip_client_id) - .ok_or_else(|| { - let msg = format!( - "the SPV client (id={}) is not found", - info.info.tip_client_id - ); - Error::other(msg) - }) - .cloned() + instance.find_tip_spv_client() } } } @@ -183,6 +160,52 @@ impl CkbRpcClientExtension for CkbRpcClient { } } +impl SpvInstance { + pub(crate) fn find_tip_spv_client(&self) -> Result { + self.clients + .get(&self.info.info.tip_client_id) + .ok_or_else(|| { + let msg = format!( + "the SPV client (id={}) is not found", + self.info.info.tip_client_id + ); + Error::other(msg) + }) + .cloned() + } + + pub(crate) fn find_best_spv_client_include_height(&self, height: u32) -> Result { + let SpvInstance { ref info, clients } = self; + let mut info = info.to_owned(); + for _ in 0..clients.len() { + let cell = clients.get(&info.info.tip_client_id).ok_or_else(|| { + let msg = format!( + "the SPV client (id={}) is not found", + info.info.tip_client_id + ); + Error::other(msg) + })?; + if cell.client.headers_mmr_root.max_height <= height { + return Ok(cell.to_owned()); + } + info.info.tip_client_id = info.prev_tip_client_id(); + } + let msg = format!("all SPV clients have better heights than server has (height: {height})"); + Err(Error::other(msg)) + } +} + +impl fmt::Display for SpvInstance { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{{ tip: {}, clients-count: {} }}", + self.info.info.tip_client_id, + self.clients.len() + ) + } +} + fn parse_raw_spv_cells(cells: Vec) -> Result { let mut spv_info_opt = None; let mut spv_clients = HashMap::new(); diff --git a/src/components/spv_service.rs b/src/components/spv_service.rs index b848ce7..607da98 100644 --- a/src/components/spv_service.rs +++ b/src/components/spv_service.rs @@ -39,12 +39,6 @@ pub enum SpvOperation { } impl SpvService { - pub(crate) fn find_best_spv_client(&self, height: u32) -> Result { - let spv_type_script = self.storage.spv_contract_type_script()?; - self.ckb_cli - .find_best_spv_client(spv_type_script, Some(height)) - } - pub(crate) fn select_operation(&self) -> Result { let spv_type_script = self.storage.spv_contract_type_script()?; let ins = self.ckb_cli.find_spv_cells(spv_type_script)?;