diff --git a/Cargo.lock b/Cargo.lock index f9116da..6e790fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3850,6 +3850,7 @@ dependencies = [ "mpart-async", "nix", "openssl", + "parity-scale-codec", "prometheus", "protobuf 3.3.0", "protobuf-codegen", diff --git a/Cargo.toml b/Cargo.toml index 41cb188..adf8287 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,8 @@ url = "2.3.1" tokio-tungstenite = { version = "0.20", features = ["native-tls"] } futures-util = "0.3.25" jwt = "0.16" -subxt = "0.28.0" +subxt = { version = "0.28.0", features = ["substrate-compat"]} +codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] } itertools = "0.11" # for static build diff --git a/_tests/e2e_tests.rs b/_tests/e2e_tests.rs index 90eecf5..685aa0f 100644 --- a/_tests/e2e_tests.rs +++ b/_tests/e2e_tests.rs @@ -44,6 +44,9 @@ impl TwinDB for InMemoryDB { ) -> anyhow::Result> { unimplemented!() } + async fn set_twin(&self, twin: Twin) -> anyhow::Result<()> { + unimplemented!() + } } fn new_message( diff --git a/artifacts/network.scale b/artifacts/network.scale new file mode 100644 index 0000000..d9891b7 Binary files /dev/null and b/artifacts/network.scale differ diff --git a/proto/types.proto b/proto/types.proto index 8e9cedb..e7ba3bf 100644 --- a/proto/types.proto +++ b/proto/types.proto @@ -71,4 +71,6 @@ message Envelope { bytes plain = 13; bytes cipher = 14; } + + optional string relays = 17; } diff --git a/src/bins/rmb-peer.rs b/src/bins/rmb-peer.rs index c62c4dc..8c39e06 100644 --- a/src/bins/rmb-peer.rs +++ b/src/bins/rmb-peer.rs @@ -1,6 +1,5 @@ use std::path::PathBuf; use std::str::FromStr; -use std::time::Duration; use anyhow::{Context, Result}; use clap::{builder::ArgAction, Args, Parser}; @@ -151,12 +150,10 @@ async fn app(args: Params) -> Result<()> { // cache is a little bit tricky because while it improves performance it // makes changes to twin data takes at least 5 min before they are detected - let db = SubstrateTwinDB::::new( - args.substrate, - RedisCache::new(pool.clone(), "twin", Duration::from_secs(60)), - ) - .await - .context("cannot create substrate twin db object")?; + let db = + SubstrateTwinDB::::new(args.substrate, RedisCache::new(pool.clone(), "twin")) + .await + .context("cannot create substrate twin db object")?; let id = db .get_twin_with_account(signer.account()) diff --git a/src/bins/rmb-relay.rs b/src/bins/rmb-relay.rs index 1732bed..a00230c 100644 --- a/src/bins/rmb-relay.rs +++ b/src/bins/rmb-relay.rs @@ -4,6 +4,7 @@ use std::time::Duration; use anyhow::{Context, Result}; use clap::{builder::ArgAction, Parser}; use rmb::cache::RedisCache; +use rmb::events; use rmb::redis; use rmb::relay::{ self, @@ -142,14 +143,13 @@ async fn app(args: Args) -> Result<()> { .await .context("failed to initialize redis pool")?; - // we use 6 hours cache for twin information because twin id will not change anyway - // and we only need twin public key for validation only. - let twins = SubstrateTwinDB::::new( - args.substrate, - RedisCache::new(pool.clone(), "twin", Duration::from_secs(args.cache * 60)), - ) - .await - .context("cannot create substrate twin db object")?; + let redis_cache = RedisCache::new(pool.clone(), "twin"); + + redis_cache.flush().await?; + + let twins = SubstrateTwinDB::::new(args.substrate.clone(), redis_cache.clone()) + .await + .context("cannot create substrate twin db object")?; let max_users = args.workers as usize * args.user_per_worker as usize; let opt = relay::SwitchOptions::new(pool.clone()) @@ -175,6 +175,14 @@ async fn app(args: Args) -> Result<()> { let r = relay::Relay::new(&args.domain, twins, opt, federation, limiter, ranker) .await .unwrap(); + + let l = events::Listener::new(args.substrate[0].as_str(), redis_cache).await?; + tokio::spawn(async move { + if let Err(e) = l.listen().await { + log::error!("failed to listen to events: {:#}", e); + } + }); + r.start(&args.listen).await.unwrap(); Ok(()) } diff --git a/src/cache/redis.rs b/src/cache/redis.rs index 13320f2..33d9b31 100644 --- a/src/cache/redis.rs +++ b/src/cache/redis.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use super::Cache; use anyhow::{Context, Result}; @@ -22,19 +20,13 @@ use serde::{de::DeserializeOwned, Serialize}; pub struct RedisCache { pool: Pool, prefix: String, - ttl: Duration, } impl RedisCache { - pub fn new>( - pool: Pool, - prefix: P, - ttl: Duration, - ) -> Self { + pub fn new>(pool: Pool, prefix: P) -> Self { Self { pool, prefix: prefix.into(), - ttl, } } @@ -47,6 +39,12 @@ impl RedisCache { Ok(conn) } + pub async fn flush(&self) -> Result<()> { + let mut conn = self.get_connection().await?; + cmd("DEL").arg(&self.prefix).query_async(&mut *conn).await?; + + Ok(()) + } } #[async_trait] @@ -57,12 +55,10 @@ where async fn set(&self, key: S, obj: T) -> Result<()> { let mut conn = self.get_connection().await?; let obj = serde_json::to_vec(&obj).context("unable to serialize twin object for redis")?; - let key = format!("{}.{}", self.prefix, key.to_string()); - cmd("SET") - .arg(key) + cmd("HSET") + .arg(&self.prefix) + .arg(key.to_string()) .arg(obj) - .arg("EX") - .arg(self.ttl.as_secs()) .query_async(&mut *conn) .await?; @@ -70,9 +66,12 @@ where } async fn get(&self, key: S) -> Result> { let mut conn = self.get_connection().await?; - let key = format!("{}.{}", self.prefix, key.to_string()); - let ret: Option> = cmd("GET").arg(key).query_async(&mut *conn).await?; + let ret: Option> = cmd("HGET") + .arg(&self.prefix) + .arg(key.to_string()) + .query_async(&mut *conn) + .await?; match ret { Some(val) => { @@ -93,7 +92,6 @@ mod tests { use super::*; const PREFIX: &str = "twin"; - const TTL: u64 = 20; async fn create_redis_cache() -> RedisCache { let manager = RedisConnectionManager::new("redis://127.0.0.1/") @@ -105,7 +103,7 @@ mod tests { .context("unable to build pool or redis connection manager") .unwrap(); - RedisCache::new(pool, PREFIX, Duration::from_secs(TTL)) + RedisCache::new(pool, PREFIX) } #[tokio::test] diff --git a/src/events/events.rs b/src/events/events.rs new file mode 100644 index 0000000..cd8fffb --- /dev/null +++ b/src/events/events.rs @@ -0,0 +1,43 @@ +use crate::{cache::Cache, tfchain::tfchain, twin::Twin}; +use anyhow::Result; +use futures::StreamExt; +use log; +use subxt::{OnlineClient, PolkadotConfig}; + +#[derive(Clone)] +pub struct Listener +where + C: Cache, +{ + cache: C, + api: OnlineClient, +} + +impl Listener +where + C: Cache + Clone, +{ + pub async fn new(url: &str, cache: C) -> Result { + let api = OnlineClient::::from_url(url).await?; + Ok(Listener { api, cache }) + } + pub async fn listen(&self) -> Result<()> { + log::info!("started chain events listener"); + let mut blocks_sub = self.api.blocks().subscribe_finalized().await?; + while let Some(block) = blocks_sub.next().await { + let events = block?.events().await?; + for evt in events.iter() { + let evt = evt?; + if let Ok(Some(twin)) = evt.as_event::() + { + self.cache.set(twin.0.id, twin.0.into()).await?; + } else if let Ok(Some(twin)) = + evt.as_event::() + { + self.cache.set(twin.0.id, twin.0.into()).await?; + } + } + } + Ok(()) + } +} diff --git a/src/events/mod.rs b/src/events/mod.rs new file mode 100644 index 0000000..3a34494 --- /dev/null +++ b/src/events/mod.rs @@ -0,0 +1,3 @@ +mod events; + +pub use events::Listener; diff --git a/src/lib.rs b/src/lib.rs index 71e0a38..e3e063e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,10 +3,12 @@ extern crate anyhow; extern crate mime; pub mod cache; +pub mod events; pub mod identity; pub mod peer; pub mod redis; pub mod relay; +pub mod tfchain; pub mod token; pub mod twin; pub mod types; diff --git a/src/relay/api.rs b/src/relay/api.rs index 0c86fbd..c8e61db 100644 --- a/src/relay/api.rs +++ b/src/relay/api.rs @@ -1,5 +1,5 @@ use crate::token::{self, Claims}; -use crate::twin::TwinDB; +use crate::twin::{RelayDomains, TwinDB}; use crate::types::{Envelope, EnvelopeExt, Pong}; use anyhow::{Context, Result}; use futures::stream::SplitSink; @@ -17,6 +17,7 @@ use prometheus::TextEncoder; use protobuf::Message as ProtoMessage; use std::fmt::Display; use std::pin::Pin; +use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; @@ -203,6 +204,27 @@ async fn federation( let envelope = Envelope::parse_from_bytes(&body).map_err(|err| HttpError::BadRequest(err.to_string()))?; + if let Some(relays) = &envelope.relays { + let mut twin = data + .twins + .get_twin(envelope.source.twin) + .await + .map_err(|err| HttpError::FailedToGetTwin(err.to_string()))? + .ok_or_else(|| HttpError::TwinNotFound(envelope.source.twin))?; + let envelope_relays = match RelayDomains::from_str(relays) { + Ok(r) => r, + Err(_) => return Err(HttpError::BadRequest("invalid relays".to_string())), + }; + if let Some(twin_relays) = twin.relay { + if twin_relays != envelope_relays { + twin.relay = Some(envelope_relays); + data.twins + .set_twin(twin) + .await + .map_err(|err| HttpError::FailedToSetTwin(err.to_string()))?; + } + } + } let dst: StreamID = (&envelope.destination).into(); data.switch.send(&dst, &body).await?; @@ -309,6 +331,25 @@ impl Stream { .await? .ok_or_else(|| anyhow::Error::msg("unknown twin destination"))?; + if let Some(relays) = &envelope.relays { + let mut twin = self + .twins + .get_twin(envelope.source.twin) + .await? + .ok_or_else(|| anyhow::Error::msg("unknown twin source"))?; + + let envelope_relays = match RelayDomains::from_str(relays) { + Ok(r) => r, + Err(_) => anyhow::bail!("invalid relays"), + }; + if let Some(twin_relays) = twin.relay { + if twin_relays != envelope_relays { + twin.relay = Some(envelope_relays); + self.twins.set_twin(twin.clone()).await?; + } + } + } + if !twin .relay .ok_or_else(|| anyhow::Error::msg("relay info is not set for this twin"))? diff --git a/src/relay/mod.rs b/src/relay/mod.rs index df6532b..15db835 100644 --- a/src/relay/mod.rs +++ b/src/relay/mod.rs @@ -87,6 +87,8 @@ pub enum HttpError { InvalidJWT(#[from] token::Error), #[error("failed to get twin: {0}")] FailedToGetTwin(String), + #[error("failed to set twin: {0}")] + FailedToSetTwin(String), #[error("twin not found {0}")] TwinNotFound(u32), #[error("{0}")] @@ -109,6 +111,7 @@ impl HttpError { Self::MissingJWT => Codes::BAD_REQUEST, Self::InvalidJWT(_) => Codes::UNAUTHORIZED, Self::FailedToGetTwin(_) => Codes::INTERNAL_SERVER_ERROR, + Self::FailedToSetTwin(_) => Codes::INTERNAL_SERVER_ERROR, Self::TwinNotFound(_) => Codes::UNAUTHORIZED, Self::WebsocketError(_) => Codes::INTERNAL_SERVER_ERROR, Self::NotFound => Codes::NOT_FOUND, diff --git a/src/tfchain/mod.rs b/src/tfchain/mod.rs new file mode 100644 index 0000000..0832754 --- /dev/null +++ b/src/tfchain/mod.rs @@ -0,0 +1,6 @@ +#[subxt::subxt(runtime_metadata_path = "artifacts/network.scale")] +mod tfchain {} + +use subxt::utils::AccountId32; +pub use tfchain::runtime_types::pallet_tfgrid::types::Twin as TwinData; +pub type Twin = TwinData; diff --git a/src/twin/mod.rs b/src/twin/mod.rs index 17f2e63..a91edf1 100644 --- a/src/twin/mod.rs +++ b/src/twin/mod.rs @@ -15,10 +15,13 @@ use subxt::utils::AccountId32; pub trait TwinDB: Send + Sync + Clone + 'static { async fn get_twin(&self, twin_id: u32) -> Result>; async fn get_twin_with_account(&self, account_id: AccountId32) -> Result>; + async fn set_twin(&self, twin: Twin) -> Result<()>; } use tfchain_client::client::Twin as TwinData; +use crate::tfchain; + #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] pub struct Twin { pub id: u32, @@ -41,6 +44,20 @@ impl From for Twin { } } +impl From for Twin { + fn from(twin: tfchain::Twin) -> Self { + Twin { + id: twin.id, + account: twin.account_id, + relay: twin.relay.map(|v| { + let string: String = String::from_utf8_lossy(&v.0).into(); + RelayDomains::from_str(&string).unwrap_or_default() + }), + pk: twin.pk.map(|v| v.0), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default, Eq)] pub struct RelayDomains(HashSet); diff --git a/src/twin/substrate.rs b/src/twin/substrate.rs index cf7b018..7ac9d21 100644 --- a/src/twin/substrate.rs +++ b/src/twin/substrate.rs @@ -78,6 +78,11 @@ where Ok(id) } + + async fn set_twin(&self, twin: Twin) -> Result<()> { + self.cache.set(twin.id, twin).await?; + Ok(()) + } } /// ClientWrapper is basically a substrate client. diff --git a/src/types/mod.rs b/src/types/mod.rs index ab26689..71dc005 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -338,6 +338,9 @@ impl Challengeable for Envelope { hash.write_all(data)?; } } + if let Some(ref relays) = self.relays { + write!(hash, "{}", relays)?; + } Ok(()) }