Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: cache the SPV instance to improve the performance of the JSON-RPC APIs #25

Merged
merged 1 commit into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 121 additions & 12 deletions src/components/api_service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! JSON-RPC APIs service.
use std::net::SocketAddr;
use std::{net::SocketAddr, sync::RwLock, time::SystemTime};

use bitcoin::Txid;
use ckb_bitcoin_spv_verifier::types::{
Expand All @@ -16,7 +16,7 @@ use jsonrpc_server_utils::{cors::AccessControlAllowOrigin, hosts::DomainsValidat
use serde::Serialize;

use crate::{
components::{SpvClientCell, SpvService},
components::{SpvInstance, SpvService},
prelude::*,
result::{Error, Result},
};
Expand All @@ -25,6 +25,9 @@ mod error;

pub use error::ApiErrorCode;

// Bitcoin target block time is 10 minutes.
const SPV_INSTANCE_CACHED_SECS: u64 = 60 * 10;

pub struct ApiServiceConfig {
listen_address: SocketAddr,
}
Expand All @@ -48,6 +51,13 @@ pub trait SpvRpc {

pub struct SpvRpcImpl {
spv_service: SpvService,
cached_spv_instance: RwLock<Option<CachedSpvInstance>>,
}

#[derive(Clone)]
struct CachedSpvInstance {
instance: SpvInstance,
expired_timestamp: u64,
}

impl ApiServiceConfig {
Expand All @@ -74,7 +84,69 @@ impl ApiServiceConfig {

impl SpvRpcImpl {
pub fn new(spv_service: SpvService) -> Self {
Self { spv_service }
Self {
spv_service,
cached_spv_instance: RwLock::new(None),
}
}

fn load_spv_instance(&self) -> Option<SpvInstance> {
if let Some(cached) = self
.cached_spv_instance
.read()
.ok()
.and_then(|locked| locked.as_ref().cloned())
{
if let Ok(dur) = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
let current_timestamp = dur.as_secs();
if current_timestamp > cached.expired_timestamp {
log::trace!(
"cached SPV instance is expired, expired at {}, now is {current_timestamp}",
cached.expired_timestamp
);
None
} else {
log::trace!(
"cached SPV instance is loaded, will be expired at {}, now is {current_timestamp}",
cached.expired_timestamp
);
Some(cached.instance)
}
} else {
log::warn!("failed to read current timestamp for load the cached SPV instance");
None
}
} else {
log::debug!("failed to load cached SPV instance: not existed or lock error");
None
}
}

fn update_spv_instance(&self, instance: SpvInstance) {
match self.cached_spv_instance.write() {
Ok(mut locked) => {
if let Ok(dur) = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
let current_timestamp = dur.as_secs();
let expired_timestamp = current_timestamp + SPV_INSTANCE_CACHED_SECS;
let cache = CachedSpvInstance {
instance,
expired_timestamp,
};
*locked = Some(cache);
log::debug!(
"refreshed the cached SPV instance, it will be expired at {expired_timestamp}, \
now is {current_timestamp}",
);
} else {
log::warn!(
"failed to read current timestamp for update the cached SPV instance"
);
}
}
Err(err) => {
log::debug!("failed to update the cached SPV instance since {err}");
}
}
}
}

Expand All @@ -85,7 +157,7 @@ impl SpvRpc for SpvRpcImpl {
tx_index: u32,
confirmations: u32,
) -> RpcResult<BitcoinTxProof> {
log::trace!("Call getTxProof with params [{txid:#x}, {confirmations}]");
log::debug!("Call getTxProof with params [{txid:#x}, {confirmations}]");
let spv = &self.spv_service;

let (target_height, target_hash, raw_tx_out_proof) =
Expand Down Expand Up @@ -126,7 +198,7 @@ impl SpvRpc for SpvRpcImpl {
data: None,
}
})?;
log::trace!(">>> tip height in local storage is {stg_tip_height}");
log::debug!(">>> tip height in local storage is {stg_tip_height}");

if stg_tip_height < target_height {
let desc = format!(
Expand Down Expand Up @@ -160,19 +232,56 @@ impl SpvRpc for SpvRpcImpl {
return Err(ApiErrorCode::StorageHeaderUnmatched.with_desc(desc));
}

let spv_client_cell = tokio::task::block_in_place(|| -> RpcResult<SpvClientCell> {
spv.find_best_spv_client(stg_tip_height).map_err(|err| {
let message =
format!("failed to get SPV cell base on height {stg_tip_height} from chain");
let spv_type_script = spv.storage.spv_contract_type_script().map_err(|err| {
let message = "failed to get SPV contract type script from storage".to_owned();
log::error!("{message} since {err}");
RpcError {
code: RpcErrorCode::InternalError,
message,
data: None,
}
})?;

log::debug!(">>> try the cached SPV instance at first");

let spv_instance = if let Some(spv_instance) = self.load_spv_instance() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if spv_best_height(from cache) < target_height + confirmations, the service rpc get_tx_proof will keep returning errors until the cache expires.

In other words, if the current cache doesn't hit, the api doesn't go through ckb_cli to get the latest spv client instance, but just waits for the cache to expire.

log::debug!(">>> the cached SPV instance is {spv_instance}");
spv_instance
} else {
log::debug!(">>> fetch SPV instance from remote since cached is not satisfied");
let spv_instance = tokio::task::block_in_place(|| -> RpcResult<SpvInstance> {
spv.ckb_cli.find_spv_cells(spv_type_script).map_err(|err| {
let message = format!(
"failed to get SPV cell base on height {stg_tip_height} from chain"
);
log::error!("{message} since {err}");
RpcError {
code: RpcErrorCode::InternalError,
message,
data: None,
}
})
})?;
log::debug!(">>> the fetched SPV instance is {spv_instance}");
self.update_spv_instance(spv_instance.clone());
spv_instance
};

let spv_client_cell = spv_instance
.find_best_spv_client_include_height(stg_tip_height)
.map_err(|err| {
let message = format!(
"failed to get SPV cell base on height {stg_tip_height} from fetched data"
);
log::error!("{message} since {err}");
RpcError {
code: RpcErrorCode::InternalError,
message,
data: None,
}
})
})?;
log::trace!(">>> the best SPV client is {}", spv_client_cell.client);
})?;

log::debug!(">>> the best SPV client is {}", spv_client_cell.client);

let spv_header_root = &spv_client_cell.client.headers_mmr_root;

Expand Down
79 changes: 51 additions & 28 deletions src/components/ckb_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Expand the functionality of the original CKB RPC client.

use std::collections::HashMap;
use std::{collections::HashMap, fmt};

use ckb_bitcoin_spv_verifier::types::{
core::{SpvClient, SpvInfo},
Expand Down Expand Up @@ -36,6 +36,7 @@ pub struct SpvClientCell {
pub(crate) cell: LiveCell,
}

#[derive(Clone)]
pub struct SpvInstance {
pub(crate) info: SpvInfoCell,
pub(crate) clients: HashMap<u8, SpvClientCell>,
Expand Down Expand Up @@ -76,35 +77,11 @@ pub trait CkbRpcClientExtension {
spv_type_script: Script,
height_opt: Option<u32>,
) -> Result<SpvClientCell> {
let SpvInstance { mut info, clients } = self.find_spv_cells(spv_type_script)?;
let instance = self.find_spv_cells(spv_type_script)?;
if let Some(height) = height_opt {
for _ in 0..clients.len() {
let cell = clients.get(&info.info.tip_client_id).ok_or_else(|| {
let msg = format!(
"the SPV client (id={}) is not found",
info.info.tip_client_id
);
Error::other(msg)
})?;
if cell.client.headers_mmr_root.max_height <= height {
return Ok(cell.to_owned());
}
info.info.tip_client_id = info.prev_tip_client_id();
}
let msg =
format!("all SPV clients have better heights than server has (height: {height})");
Err(Error::other(msg))
instance.find_best_spv_client_include_height(height)
} else {
clients
.get(&info.info.tip_client_id)
.ok_or_else(|| {
let msg = format!(
"the SPV client (id={}) is not found",
info.info.tip_client_id
);
Error::other(msg)
})
.cloned()
instance.find_tip_spv_client()
}
}
}
Expand Down Expand Up @@ -183,6 +160,52 @@ impl CkbRpcClientExtension for CkbRpcClient {
}
}

impl SpvInstance {
pub(crate) fn find_tip_spv_client(&self) -> Result<SpvClientCell> {
self.clients
.get(&self.info.info.tip_client_id)
.ok_or_else(|| {
let msg = format!(
"the SPV client (id={}) is not found",
self.info.info.tip_client_id
);
Error::other(msg)
})
.cloned()
}

pub(crate) fn find_best_spv_client_include_height(&self, height: u32) -> Result<SpvClientCell> {
let SpvInstance { ref info, clients } = self;
let mut info = info.to_owned();
for _ in 0..clients.len() {
let cell = clients.get(&info.info.tip_client_id).ok_or_else(|| {
let msg = format!(
"the SPV client (id={}) is not found",
info.info.tip_client_id
);
Error::other(msg)
})?;
if cell.client.headers_mmr_root.max_height <= height {
Flouse marked this conversation as resolved.
Show resolved Hide resolved
return Ok(cell.to_owned());
}
info.info.tip_client_id = info.prev_tip_client_id();
}
let msg = format!("all SPV clients have better heights than server has (height: {height})");
Err(Error::other(msg))
}
}

impl fmt::Display for SpvInstance {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{{ tip: {}, clients-count: {} }}",
self.info.info.tip_client_id,
self.clients.len()
)
}
}

fn parse_raw_spv_cells(cells: Vec<LiveCell>) -> Result<SpvInstance> {
let mut spv_info_opt = None;
let mut spv_clients = HashMap::new();
Expand Down
6 changes: 0 additions & 6 deletions src/components/spv_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ pub enum SpvOperation {
}

impl SpvService {
pub(crate) fn find_best_spv_client(&self, height: u32) -> Result<SpvClientCell> {
let spv_type_script = self.storage.spv_contract_type_script()?;
self.ckb_cli
.find_best_spv_client(spv_type_script, Some(height))
}

pub(crate) fn select_operation(&self) -> Result<SpvOperation> {
let spv_type_script = self.storage.spv_contract_type_script()?;
let ins = self.ckb_cli.find_spv_cells(spv_type_script)?;
Expand Down