From 241c5c55080f143062604d89f736e44726f2b95f Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Wed, 5 Jun 2024 11:36:54 +0200 Subject: [PATCH] implement in-mem cache with ttl for peers (#199) * implement in-mem cache with ttl for peers - The rmb peer should use an inmemory cache instead of redis - it also must have a ttl for twins - also run clippy * remove a debug test --- Cargo.lock | 16 ++++++++++++++++ Cargo.toml | 1 + src/bins/rmb-peer.rs | 11 +++++------ src/cache/memory.rs | 23 +++++++++++++++-------- src/peer/storage/mod.rs | 2 +- src/relay/federation/mod.rs | 2 +- src/relay/federation/router.rs | 2 +- src/relay/switch/session.rs | 2 +- src/twin/substrate.rs | 2 +- 9 files changed, 42 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6e790fc..0c6f53c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2657,6 +2657,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "linregress" version = "0.4.4" @@ -3870,6 +3876,7 @@ dependencies = [ "tokio-retry", "tokio-stream", "tokio-tungstenite", + "ttl_cache", "url", "uuid", "workers", @@ -5949,6 +5956,15 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4f195fd851901624eee5a58c4bb2b4f06399148fcd0ed336e6f1cb60a9881df" +[[package]] +name = "ttl_cache" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4189890526f0168710b6ee65ceaedf1460c48a14318ceec933cb26baa492096a" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "tungstenite" version = "0.20.1" diff --git a/Cargo.toml b/Cargo.toml index adf8287..0aa61f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ jwt = "0.16" subxt = { version = "0.28.0", features = ["substrate-compat"]} codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] } itertools = "0.11" +ttl_cache = "0.5" # for static build openssl = { version = "0.10", features = ["vendored"] } diff --git a/src/bins/rmb-peer.rs b/src/bins/rmb-peer.rs index 8c39e06..376b23e 100644 --- a/src/bins/rmb-peer.rs +++ b/src/bins/rmb-peer.rs @@ -3,7 +3,7 @@ use std::str::FromStr; use anyhow::{Context, Result}; use clap::{builder::ArgAction, Args, Parser}; -use rmb::cache::RedisCache; +use rmb::cache::MemCache; use rmb::identity::KeyType; use rmb::identity::{Identity, Signer}; use rmb::peer::Pair; @@ -149,11 +149,10 @@ async fn app(args: Params) -> Result<()> { .context("failed to initialize redis pool")?; // cache is a little bit tricky because while it improves performance it - // makes changes to twin data takes at least 5 min before they are detected - let db = - SubstrateTwinDB::::new(args.substrate, RedisCache::new(pool.clone(), "twin")) - .await - .context("cannot create substrate twin db object")?; + // makes changes to twin data takes at least 2 min before they are detected + let db = SubstrateTwinDB::new(args.substrate, MemCache::default()) + .await + .context("cannot create substrate twin db object")?; let id = db .get_twin_with_account(signer.account()) diff --git a/src/cache/memory.rs b/src/cache/memory.rs index 9523b06..78b978e 100644 --- a/src/cache/memory.rs +++ b/src/cache/memory.rs @@ -1,27 +1,34 @@ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; +use std::time::Duration; use tokio::sync::RwLock; use super::Cache; use anyhow::Result; use async_trait::async_trait; +use ttl_cache::TtlCache; + +static IN_MEMORY_CAP: usize = 500; +static IN_MEMORY_TTL_SEC: u64 = 5 * 60; #[derive(Clone)] pub struct MemCache { - mem: Arc>>, + mem: Arc>>, + ttl: Duration, } impl Default for MemCache { fn default() -> Self { - Self::new() + Self::new(IN_MEMORY_CAP, Duration::from_secs(IN_MEMORY_TTL_SEC)) } } impl MemCache { #[allow(unused)] - pub fn new() -> Self { + pub fn new(capacity: usize, ttl: Duration) -> Self { Self { - mem: Arc::new(RwLock::new(HashMap::new())), + mem: Arc::new(RwLock::new(TtlCache::new(capacity))), + ttl, } } } @@ -33,7 +40,7 @@ where { async fn set(&self, key: K, obj: T) -> Result<()> { let mut mem = self.mem.write().await; - mem.insert(key.to_string(), obj); + mem.insert(key.to_string(), obj, self.ttl); Ok(()) } @@ -62,7 +69,7 @@ mod tests { #[tokio::test] async fn test_success_set_get_string() { - let cache = MemCache::new(); + let cache = MemCache::default(); cache .set("k".to_string(), "v".to_string()) .await @@ -86,7 +93,7 @@ mod tests { let some_val = DummyStruct { k: "v".to_string() }; - let cache = MemCache::new(); + let cache = MemCache::default(); cache .set("k".to_string(), some_val.clone()) diff --git a/src/peer/storage/mod.rs b/src/peer/storage/mod.rs index 5f11641..32a75cd 100644 --- a/src/peer/storage/mod.rs +++ b/src/peer/storage/mod.rs @@ -137,7 +137,7 @@ impl JsonOutgoingRequest { backlog.reference = self.reference; let mut env = Envelope::new(); - env.uid = backlog.uid.clone(); + env.uid.clone_from(&backlog.uid); env.set_plain(base64::decode(self.data).context("invalid data base64 encoding")?); env.tags = self.tags; env.timestamp = self.timestamp; diff --git a/src/relay/federation/mod.rs b/src/relay/federation/mod.rs index 67100f9..87cd11a 100644 --- a/src/relay/federation/mod.rs +++ b/src/relay/federation/mod.rs @@ -179,7 +179,7 @@ mod test { let reg = prometheus::Registry::new(); let pool = redis::pool("redis://localhost:6379", 10).await.unwrap(); let sink = Sink::new(pool.clone()); - let mem: MemCache = MemCache::new(); + let mem: MemCache = MemCache::default(); let account_id: AccountId32 = "5EyHmbLydxX7hXTX7gQqftCJr2e57Z3VNtgd6uxJzZsAjcPb" .parse() .unwrap(); diff --git a/src/relay/federation/router.rs b/src/relay/federation/router.rs index 126fea2..4d5d474 100644 --- a/src/relay/federation/router.rs +++ b/src/relay/federation/router.rs @@ -150,7 +150,7 @@ mod test { // Start a lightweight mock server. let server = MockServer::start(); - let mem: MemCache = MemCache::new(); + let mem: MemCache = MemCache::default(); let account_id: AccountId32 = "5EyHmbLydxX7hXTX7gQqftCJr2e57Z3VNtgd6uxJzZsAjcPb" .parse() .unwrap(); diff --git a/src/relay/switch/session.rs b/src/relay/switch/session.rs index 72ee0d1..cb7d458 100644 --- a/src/relay/switch/session.rs +++ b/src/relay/switch/session.rs @@ -68,7 +68,7 @@ impl From<&StreamID> for Address { fn from(value: &StreamID) -> Self { let mut address = Address::new(); address.twin = value.0; - address.connection = value.1.clone(); + address.connection.clone_from(&value.1); address } diff --git a/src/twin/substrate.rs b/src/twin/substrate.rs index 7ac9d21..c402eca 100644 --- a/src/twin/substrate.rs +++ b/src/twin/substrate.rs @@ -195,7 +195,7 @@ mod tests { #[tokio::test] async fn test_get_twin_with_mem_cache() { - let mem: MemCache = MemCache::new(); + let mem: MemCache = MemCache::default(); let db = SubstrateTwinDB::new( vec![String::from("wss://tfchain.dev.grid.tf:443")],