Skip to content

Commit

Permalink
Merge pull request #25 from ckb-cell/refactor/cache-spv-cells
Browse files Browse the repository at this point in the history
      perf: cache the SPV instance to improve the performance of the JSON-RPC APIs
  • Loading branch information
yangby-cryptape authored Apr 18, 2024
2 parents 01772d2 + 309b2c1 commit 0ab417f
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 46 deletions.
133 changes: 121 additions & 12 deletions src/components/api_service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! JSON-RPC APIs service.
use std::net::SocketAddr;
use std::{net::SocketAddr, sync::RwLock, time::SystemTime};

use bitcoin::Txid;
use ckb_bitcoin_spv_verifier::types::{
Expand All @@ -16,7 +16,7 @@ use jsonrpc_server_utils::{cors::AccessControlAllowOrigin, hosts::DomainsValidat
use serde::Serialize;

use crate::{
components::{SpvClientCell, SpvService},
components::{SpvInstance, SpvService},
prelude::*,
result::{Error, Result},
};
Expand All @@ -25,6 +25,9 @@ mod error;

pub use error::ApiErrorCode;

// Bitcoin target block time is 10 minutes.
const SPV_INSTANCE_CACHED_SECS: u64 = 60 * 10;

pub struct ApiServiceConfig {
listen_address: SocketAddr,
}
Expand All @@ -48,6 +51,13 @@ pub trait SpvRpc {

pub struct SpvRpcImpl {
spv_service: SpvService,
cached_spv_instance: RwLock<Option<CachedSpvInstance>>,
}

#[derive(Clone)]
struct CachedSpvInstance {
instance: SpvInstance,
expired_timestamp: u64,
}

impl ApiServiceConfig {
Expand All @@ -74,7 +84,69 @@ impl ApiServiceConfig {

impl SpvRpcImpl {
pub fn new(spv_service: SpvService) -> Self {
Self { spv_service }
Self {
spv_service,
cached_spv_instance: RwLock::new(None),
}
}

fn load_spv_instance(&self) -> Option<SpvInstance> {
if let Some(cached) = self
.cached_spv_instance
.read()
.ok()
.and_then(|locked| locked.as_ref().cloned())
{
if let Ok(dur) = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
let current_timestamp = dur.as_secs();
if current_timestamp > cached.expired_timestamp {
log::trace!(
"cached SPV instance is expired, expired at {}, now is {current_timestamp}",
cached.expired_timestamp
);
None
} else {
log::trace!(
"cached SPV instance is loaded, will be expired at {}, now is {current_timestamp}",
cached.expired_timestamp
);
Some(cached.instance)
}
} else {
log::warn!("failed to read current timestamp for load the cached SPV instance");
None
}
} else {
log::debug!("failed to load cached SPV instance: not existed or lock error");
None
}
}

fn update_spv_instance(&self, instance: SpvInstance) {
match self.cached_spv_instance.write() {
Ok(mut locked) => {
if let Ok(dur) = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
let current_timestamp = dur.as_secs();
let expired_timestamp = current_timestamp + SPV_INSTANCE_CACHED_SECS;
let cache = CachedSpvInstance {
instance,
expired_timestamp,
};
*locked = Some(cache);
log::debug!(
"refreshed the cached SPV instance, it will be expired at {expired_timestamp}, \
now is {current_timestamp}",
);
} else {
log::warn!(
"failed to read current timestamp for update the cached SPV instance"
);
}
}
Err(err) => {
log::debug!("failed to update the cached SPV instance since {err}");
}
}
}
}

Expand All @@ -85,7 +157,7 @@ impl SpvRpc for SpvRpcImpl {
tx_index: u32,
confirmations: u32,
) -> RpcResult<BitcoinTxProof> {
log::trace!("Call getTxProof with params [{txid:#x}, {confirmations}]");
log::debug!("Call getTxProof with params [{txid:#x}, {confirmations}]");
let spv = &self.spv_service;

let (target_height, target_hash, raw_tx_out_proof) =
Expand Down Expand Up @@ -126,7 +198,7 @@ impl SpvRpc for SpvRpcImpl {
data: None,
}
})?;
log::trace!(">>> tip height in local storage is {stg_tip_height}");
log::debug!(">>> tip height in local storage is {stg_tip_height}");

if stg_tip_height < target_height {
let desc = format!(
Expand Down Expand Up @@ -160,19 +232,56 @@ impl SpvRpc for SpvRpcImpl {
return Err(ApiErrorCode::StorageHeaderUnmatched.with_desc(desc));
}

let spv_client_cell = tokio::task::block_in_place(|| -> RpcResult<SpvClientCell> {
spv.find_best_spv_client(stg_tip_height).map_err(|err| {
let message =
format!("failed to get SPV cell base on height {stg_tip_height} from chain");
let spv_type_script = spv.storage.spv_contract_type_script().map_err(|err| {
let message = "failed to get SPV contract type script from storage".to_owned();
log::error!("{message} since {err}");
RpcError {
code: RpcErrorCode::InternalError,
message,
data: None,
}
})?;

log::debug!(">>> try the cached SPV instance at first");

let spv_instance = if let Some(spv_instance) = self.load_spv_instance() {
log::debug!(">>> the cached SPV instance is {spv_instance}");
spv_instance
} else {
log::debug!(">>> fetch SPV instance from remote since cached is not satisfied");
let spv_instance = tokio::task::block_in_place(|| -> RpcResult<SpvInstance> {
spv.ckb_cli.find_spv_cells(spv_type_script).map_err(|err| {
let message = format!(
"failed to get SPV cell base on height {stg_tip_height} from chain"
);
log::error!("{message} since {err}");
RpcError {
code: RpcErrorCode::InternalError,
message,
data: None,
}
})
})?;
log::debug!(">>> the fetched SPV instance is {spv_instance}");
self.update_spv_instance(spv_instance.clone());
spv_instance
};

let spv_client_cell = spv_instance
.find_best_spv_client_include_height(stg_tip_height)
.map_err(|err| {
let message = format!(
"failed to get SPV cell base on height {stg_tip_height} from fetched data"
);
log::error!("{message} since {err}");
RpcError {
code: RpcErrorCode::InternalError,
message,
data: None,
}
})
})?;
log::trace!(">>> the best SPV client is {}", spv_client_cell.client);
})?;

log::debug!(">>> the best SPV client is {}", spv_client_cell.client);

let spv_header_root = &spv_client_cell.client.headers_mmr_root;

Expand Down
79 changes: 51 additions & 28 deletions src/components/ckb_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Expand the functionality of the original CKB RPC client.
use std::collections::HashMap;
use std::{collections::HashMap, fmt};

use ckb_bitcoin_spv_verifier::types::{
core::{SpvClient, SpvInfo},
Expand Down Expand Up @@ -36,6 +36,7 @@ pub struct SpvClientCell {
pub(crate) cell: LiveCell,
}

#[derive(Clone)]
pub struct SpvInstance {
pub(crate) info: SpvInfoCell,
pub(crate) clients: HashMap<u8, SpvClientCell>,
Expand Down Expand Up @@ -76,35 +77,11 @@ pub trait CkbRpcClientExtension {
spv_type_script: Script,
height_opt: Option<u32>,
) -> Result<SpvClientCell> {
let SpvInstance { mut info, clients } = self.find_spv_cells(spv_type_script)?;
let instance = self.find_spv_cells(spv_type_script)?;
if let Some(height) = height_opt {
for _ in 0..clients.len() {
let cell = clients.get(&info.info.tip_client_id).ok_or_else(|| {
let msg = format!(
"the SPV client (id={}) is not found",
info.info.tip_client_id
);
Error::other(msg)
})?;
if cell.client.headers_mmr_root.max_height <= height {
return Ok(cell.to_owned());
}
info.info.tip_client_id = info.prev_tip_client_id();
}
let msg =
format!("all SPV clients have better heights than server has (height: {height})");
Err(Error::other(msg))
instance.find_best_spv_client_include_height(height)
} else {
clients
.get(&info.info.tip_client_id)
.ok_or_else(|| {
let msg = format!(
"the SPV client (id={}) is not found",
info.info.tip_client_id
);
Error::other(msg)
})
.cloned()
instance.find_tip_spv_client()
}
}
}
Expand Down Expand Up @@ -183,6 +160,52 @@ impl CkbRpcClientExtension for CkbRpcClient {
}
}

impl SpvInstance {
pub(crate) fn find_tip_spv_client(&self) -> Result<SpvClientCell> {
self.clients
.get(&self.info.info.tip_client_id)
.ok_or_else(|| {
let msg = format!(
"the SPV client (id={}) is not found",
self.info.info.tip_client_id
);
Error::other(msg)
})
.cloned()
}

pub(crate) fn find_best_spv_client_include_height(&self, height: u32) -> Result<SpvClientCell> {
let SpvInstance { ref info, clients } = self;
let mut info = info.to_owned();
for _ in 0..clients.len() {
let cell = clients.get(&info.info.tip_client_id).ok_or_else(|| {
let msg = format!(
"the SPV client (id={}) is not found",
info.info.tip_client_id
);
Error::other(msg)
})?;
if cell.client.headers_mmr_root.max_height <= height {
return Ok(cell.to_owned());
}
info.info.tip_client_id = info.prev_tip_client_id();
}
let msg = format!("all SPV clients have better heights than server has (height: {height})");
Err(Error::other(msg))
}
}

impl fmt::Display for SpvInstance {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{{ tip: {}, clients-count: {} }}",
self.info.info.tip_client_id,
self.clients.len()
)
}
}

fn parse_raw_spv_cells(cells: Vec<LiveCell>) -> Result<SpvInstance> {
let mut spv_info_opt = None;
let mut spv_clients = HashMap::new();
Expand Down
6 changes: 0 additions & 6 deletions src/components/spv_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ pub enum SpvOperation {
}

impl SpvService {
pub(crate) fn find_best_spv_client(&self, height: u32) -> Result<SpvClientCell> {
let spv_type_script = self.storage.spv_contract_type_script()?;
self.ckb_cli
.find_best_spv_client(spv_type_script, Some(height))
}

pub(crate) fn select_operation(&self) -> Result<SpvOperation> {
let spv_type_script = self.storage.spv_contract_type_script()?;
let ins = self.ckb_cli.find_spv_cells(spv_type_script)?;
Expand Down

0 comments on commit 0ab417f

Please sign in to comment.