Skip to content

Commit

Permalink
implement in-mem cache with ttl for peers
Browse files Browse the repository at this point in the history
- The rmb peer should use an inmemory cache instead of redis
- it also must have a ttl for twins
- also run clippy
  • Loading branch information
muhamadazmy committed Jun 5, 2024
1 parent 90b7078 commit e80f815
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 19 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jwt = "0.16"
subxt = { version = "0.28.0", features = ["substrate-compat"]}
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] }
itertools = "0.11"
ttl_cache = "0.5"

# for static build
openssl = { version = "0.10", features = ["vendored"] }
Expand Down
11 changes: 5 additions & 6 deletions src/bins/rmb-peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::str::FromStr;

use anyhow::{Context, Result};
use clap::{builder::ArgAction, Args, Parser};
use rmb::cache::RedisCache;
use rmb::cache::MemCache;
use rmb::identity::KeyType;
use rmb::identity::{Identity, Signer};
use rmb::peer::Pair;
Expand Down Expand Up @@ -149,11 +149,10 @@ async fn app(args: Params) -> Result<()> {
.context("failed to initialize redis pool")?;

// cache is a little bit tricky because while it improves performance it
// makes changes to twin data takes at least 5 min before they are detected
let db =
SubstrateTwinDB::<RedisCache>::new(args.substrate, RedisCache::new(pool.clone(), "twin"))
.await
.context("cannot create substrate twin db object")?;
// makes changes to twin data takes at least 2 min before they are detected
let db = SubstrateTwinDB::new(args.substrate, MemCache::default())
.await
.context("cannot create substrate twin db object")?;

let id = db
.get_twin_with_account(signer.account())
Expand Down
23 changes: 15 additions & 8 deletions src/cache/memory.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use std::time::Duration;
use tokio::sync::RwLock;

use super::Cache;
use anyhow::Result;
use async_trait::async_trait;
use ttl_cache::TtlCache;

static IN_MEMORY_CAP: usize = 500;
static IN_MEMORY_TTL_SEC: u64 = 5 * 60;

#[derive(Clone)]
pub struct MemCache<V> {
mem: Arc<RwLock<HashMap<String, V>>>,
mem: Arc<RwLock<TtlCache<String, V>>>,
ttl: Duration,
}

impl<V> Default for MemCache<V> {
fn default() -> Self {
Self::new()
Self::new(IN_MEMORY_CAP, Duration::from_secs(IN_MEMORY_TTL_SEC))
}
}

impl<V> MemCache<V> {
#[allow(unused)]
pub fn new() -> Self {
pub fn new(capacity: usize, ttl: Duration) -> Self {
Self {
mem: Arc::new(RwLock::new(HashMap::new())),
mem: Arc::new(RwLock::new(TtlCache::new(capacity))),
ttl,
}
}
}
Expand All @@ -33,7 +40,7 @@ where
{
async fn set<K: ToString + Send + Sync>(&self, key: K, obj: T) -> Result<()> {
let mut mem = self.mem.write().await;
mem.insert(key.to_string(), obj);
mem.insert(key.to_string(), obj, self.ttl);

Ok(())
}
Expand Down Expand Up @@ -62,7 +69,7 @@ mod tests {

#[tokio::test]
async fn test_success_set_get_string() {
let cache = MemCache::new();
let cache = MemCache::default();
cache
.set("k".to_string(), "v".to_string())
.await
Expand All @@ -86,7 +93,7 @@ mod tests {

let some_val = DummyStruct { k: "v".to_string() };

let cache = MemCache::new();
let cache = MemCache::default();

cache
.set("k".to_string(), some_val.clone())
Expand Down
2 changes: 1 addition & 1 deletion src/peer/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl JsonOutgoingRequest {
backlog.reference = self.reference;

let mut env = Envelope::new();
env.uid = backlog.uid.clone();
env.uid.clone_from(&backlog.uid);
env.set_plain(base64::decode(self.data).context("invalid data base64 encoding")?);
env.tags = self.tags;
env.timestamp = self.timestamp;
Expand Down
2 changes: 1 addition & 1 deletion src/relay/federation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ mod test {
let reg = prometheus::Registry::new();
let pool = redis::pool("redis://localhost:6379", 10).await.unwrap();
let sink = Sink::new(pool.clone());
let mem: MemCache<Twin> = MemCache::new();
let mem: MemCache<Twin> = MemCache::default();
let account_id: AccountId32 = "5EyHmbLydxX7hXTX7gQqftCJr2e57Z3VNtgd6uxJzZsAjcPb"
.parse()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/relay/federation/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ mod test {

// Start a lightweight mock server.
let server = MockServer::start();
let mem: MemCache<Twin> = MemCache::new();
let mem: MemCache<Twin> = MemCache::default();
let account_id: AccountId32 = "5EyHmbLydxX7hXTX7gQqftCJr2e57Z3VNtgd6uxJzZsAjcPb"
.parse()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/relay/switch/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl From<&StreamID> for Address {
fn from(value: &StreamID) -> Self {
let mut address = Address::new();
address.twin = value.0;
address.connection = value.1.clone();
address.connection.clone_from(&value.1);

address
}
Expand Down
33 changes: 32 additions & 1 deletion src/twin/substrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ mod tests {

#[tokio::test]
async fn test_get_twin_with_mem_cache() {
let mem: MemCache<Twin> = MemCache::new();
let mem: MemCache<Twin> = MemCache::default();

let db = SubstrateTwinDB::new(
vec![String::from("wss://tfchain.dev.grid.tf:443")],
Expand Down Expand Up @@ -258,6 +258,37 @@ mod tests {
);
}

#[tokio::test]
async fn test_get_ashraf_twin() {
let db = SubstrateTwinDB::new(vec![String::from("wss://tfchain.grid.tf:443")], NoCache)
.await
.context("cannot create substrate twin db object")
.unwrap();

let twin = db
.get_twin(4307)
.await
.context("can't get twin from substrate")
.unwrap()
.unwrap();

// NOTE: this currently checks against devnet substrate
// as provided by the url wss://tfchain.dev.grid.tf.
// if this environment was reset at some point. those
// values won't match anymore.

assert!(!matches!(twin.relay, None));
let relay = twin.relay.unwrap();
assert_eq!(relay.0.len(), 1);
assert!(relay.0.contains("relay.02.grid.tf"));
assert!(twin.pk.is_some());

assert_eq!(
twin.account.to_string(),
"5GYtsF9XyaWUEa1zZMhZRe1p9XRMkF21wGyg4G7pPrJok942"
);
}

#[tokio::test]
async fn test_get_twin_id() {
let db = SubstrateTwinDB::new(vec![String::from("wss://tfchain.dev.grid.tf:443")], NoCache)
Expand Down

0 comments on commit e80f815

Please sign in to comment.