Skip to content
This repository has been archived by the owner on Nov 29, 2023. It is now read-only.

Commit

Permalink
feat(gateway): cache indexer response (#389)
Browse files Browse the repository at this point in the history
* Add cache in resolver

* Make cache configurable

* Use workspace dep
  • Loading branch information
kckeiks authored Feb 13, 2023
1 parent 1f9ffbb commit fe215f7
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 48 deletions.
1 change: 1 addition & 0 deletions crates/ursa-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ libp2p.workspace = true
ursa-telemetry = { path = "../ursa-telemetry" }
tracing-opentelemetry.workspace = true
axum-prometheus.workspace = true
moka.workspace = true
10 changes: 5 additions & 5 deletions crates/ursa-gateway/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ pub struct DaemonCmdOpts {
/// indexer cid url
#[arg(long)]
pub indexer_cid_url: Option<String>,
/// max cache size (bytes)
/// max cache capacity (number of entries)
#[arg(long)]
pub max_cache_size: Option<u64>,
pub cache_max_capacity: Option<u64>,
/// cache ttl (ms)
#[arg(long)]
pub ttl_buf: Option<u64>,
/// ttl cache interval (ms)
pub cache_time_to_live: Option<u64>,
/// time to idle cache interval (ms)
#[arg(long)]
pub ttl_cache_interval: Option<u64>,
pub cache_time_to_idle: Option<u64>,
}
17 changes: 16 additions & 1 deletion crates/ursa-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ pub struct ServerConfig {
pub cert_path: PathBuf,
pub key_path: PathBuf,
pub stream_buf: u64,
pub cache_max_capacity: u64,
pub cache_time_to_idle: u64,
pub cache_time_to_live: u64,
}

#[derive(Deserialize, Serialize)]
Expand All @@ -100,7 +103,10 @@ impl Default for GatewayConfig {
key_path: PathBuf::from(env!("HOME"))
.join(DEFAULT_URSA_GATEWAY_PATH)
.join("key.pem"),
stream_buf: 2_000_000, // 2MB
stream_buf: 2_000_000, // 2MB
cache_max_capacity: 100_000, // Number of entries.
cache_time_to_idle: 5 * 60 * 1000, // 5 mins.
cache_time_to_live: 5 * 60 * 1000, // 5 mins.
},
indexer: IndexerConfig {
cid_url: "https://cid.contact/cid".into(),
Expand Down Expand Up @@ -141,5 +147,14 @@ impl GatewayConfig {
if let Some(indexer_cid_url) = config.indexer_cid_url {
self.indexer.cid_url = indexer_cid_url;
}
if let Some(cache_max_capacity) = config.cache_max_capacity {
self.server.cache_max_capacity = cache_max_capacity;
}
if let Some(cache_time_to_live) = config.cache_time_to_live {
self.server.cache_time_to_live = cache_time_to_live;
}
if let Some(cache_time_to_idle) = config.cache_time_to_idle {
self.server.cache_time_to_idle = cache_time_to_idle;
}
}
}
92 changes: 54 additions & 38 deletions crates/ursa-gateway/src/resolver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ use hyper::{
use hyper_tls::HttpsConnector;
use libp2p::multiaddr::Protocol;
use model::IndexerResponse;
use moka::sync::Cache;
use serde_json::from_slice;
use tracing::{debug, error, info, warn};

use crate::{
resolver::model::{Metadata, ProviderResult},
resolver::model::{Metadata, MultihashResult, ProviderResult},
util::error::Error,
};

Expand All @@ -28,13 +29,19 @@ type Client = client::Client<HttpsConnector<HttpConnector>, Body>;
pub struct Resolver {
indexer_cid_url: String,
client: Client,
cache: Cache<String, Vec<MultihashResult>>,
}

impl Resolver {
pub fn new(indexer_cid_url: String, client: Client) -> Self {
pub fn new(
indexer_cid_url: String,
client: Client,
cache: Cache<String, Vec<MultihashResult>>,
) -> Self {
Self {
indexer_cid_url,
client,
cache,
}
}

Expand All @@ -46,46 +53,55 @@ impl Resolver {
anyhow!("Error parsed uri: {endpoint}")
})?;

let body = match self
.client
.get(uri)
.await
.map_err(|e| {
error!("Error requested indexer: {endpoint} {e:?}");
anyhow!("Error requested indexer: {endpoint}")
})?
.into_parts()
{
(
Parts {
status: StatusCode::OK,
..
},
body,
) => body,
(parts, body) => {
error!("Error requested indexer {endpoint} with parts {parts:?} and body {body:?}");
return Err(Error::Upstream(
parts.status,
format!("Error requested indexer: {endpoint}"),
));
}
};
let multihash_results = match self.cache.get(cid) {
None => {
let body = match self
.client
.get(uri)
.await
.map_err(|e| {
error!("Error requested indexer: {endpoint} {e:?}");
anyhow!("Error requested indexer: {endpoint}")
})?
.into_parts()
{
(
Parts {
status: StatusCode::OK,
..
},
body,
) => body,
(parts, body) => {
error!("Error requested indexer {endpoint} with parts {parts:?} and body {body:?}");
return Err(Error::Upstream(
parts.status,
format!("Error requested indexer: {endpoint}"),
));
}
};

let bytes = to_bytes(body).await.map_err(|e| {
error!("Error read data from indexer: {endpoint} {e:?}");
anyhow!("Error read data from indexer {endpoint}")
})?;
let bytes = to_bytes(body).await.map_err(|e| {
error!("Error read data from indexer: {endpoint} {e:?}");
anyhow!("Error read data from indexer {endpoint}")
})?;

let indexer_response: IndexerResponse = from_slice(&bytes).map_err(|e| {
error!("Error parsed indexer response from indexer: {endpoint} {e:?}");
anyhow!("Error parsed indexer response from indexer: {endpoint}")
})?;
let indexer_response: IndexerResponse = from_slice(&bytes).map_err(|e| {
error!("Error parsed indexer response from indexer: {endpoint} {e:?}");
anyhow!("Error parsed indexer response from indexer: {endpoint}")
})?;

debug!("Received indexer response for {cid}: {indexer_response:?}");

self.cache
.insert(cid.to_string(), indexer_response.multihash_results.clone());

debug!("Received indexer response for {cid}: {indexer_response:?}");
indexer_response.multihash_results
}
Some(multihash_results) => multihash_results,
};

let providers: Vec<(&ProviderResult, Metadata)> = indexer_response
.multihash_results
let providers: Vec<(&ProviderResult, Metadata)> = multihash_results
.first()
.context("Indexer result did not contain a multi-hash result")?
.provider_results
Expand Down
8 changes: 4 additions & 4 deletions crates/ursa-gateway/src/resolver/model.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use libp2p::Multiaddr;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Debug)]
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct IndexerResponse {
#[serde(rename = "MultihashResults")]
pub multihash_results: Vec<MultihashResult>,
}

#[derive(Deserialize, Serialize, Debug)]
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct MultihashResult {
#[serde(rename = "Multihash")]
multihash: String,
#[serde(rename = "ProviderResults")]
pub provider_results: Vec<ProviderResult>,
}

#[derive(Deserialize, Serialize, Debug)]
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct ProviderResult {
#[serde(rename = "ContextID")]
context_id: String,
Expand All @@ -25,7 +25,7 @@ pub struct ProviderResult {
pub provider: AddrInfo,
}

#[derive(Deserialize, Serialize, Debug)]
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct AddrInfo {
#[serde(rename = "ID")]
id: String,
Expand Down
11 changes: 11 additions & 0 deletions crates/ursa-gateway/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use axum_prometheus::PrometheusMetricLayerBuilder;
use axum_server::{tls_rustls::RustlsConfig, Handle};
use axum_tracing_opentelemetry::{find_current_trace_id, opentelemetry_tracing_layer};
use hyper_tls::HttpsConnector;
use moka::sync::Cache;
use route::api::v1::get::get_car_handler;
use serde_json::json;
use tokio::{
Expand Down Expand Up @@ -57,6 +58,9 @@ pub async fn start(config: Arc<RwLock<GatewayConfig>>, shutdown_rx: Receiver<()>
key_path,
concurrency_limit,
request_timeout,
cache_max_capacity,
cache_time_to_idle,
cache_time_to_live,
..
},
indexer: IndexerConfig { cid_url },
Expand All @@ -80,9 +84,16 @@ pub async fn start(config: Arc<RwLock<GatewayConfig>>, shutdown_rx: Receiver<()>
.with_default_metrics()
.build_pair();

let cache = Cache::builder()
.max_capacity(*cache_max_capacity)
.time_to_idle(Duration::from_millis(*cache_time_to_idle))
.time_to_live(Duration::from_millis(*cache_time_to_live))
.build();

let resolver = Arc::new(Resolver::new(
String::from(cid_url),
hyper::Client::builder().build::<_, Body>(HttpsConnector::new()),
cache,
));

let app = NormalizePath::trim_trailing_slash(
Expand Down

0 comments on commit fe215f7

Please sign in to comment.