Skip to content

Commit

Permalink
Listen to chain events and update twin on requests
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdelrahmanElawady committed Mar 21, 2024
1 parent f35d685 commit fae3a21
Show file tree
Hide file tree
Showing 17 changed files with 168 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ url = "2.3.1"
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
futures-util = "0.3.25"
jwt = "0.16"
subxt = "0.28.0"
subxt = { version = "0.28.0", features = ["substrate-compat"]}
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] }
itertools = "0.11"

# for static build
Expand Down
3 changes: 3 additions & 0 deletions _tests/e2e_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ impl TwinDB for InMemoryDB {
) -> anyhow::Result<Option<u32>> {
unimplemented!()
}
async fn set_twin(&self, twin: Twin) -> anyhow::Result<()> {
unimplemented!()
}
}

fn new_message(
Expand Down
Binary file added artifacts/network.scale
Binary file not shown.
2 changes: 2 additions & 0 deletions proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ message Envelope {
bytes plain = 13;
bytes cipher = 14;
}

optional string relays = 17;
}
11 changes: 4 additions & 7 deletions src/bins/rmb-peer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;

use anyhow::{Context, Result};
use clap::{builder::ArgAction, Args, Parser};
Expand Down Expand Up @@ -151,12 +150,10 @@ async fn app(args: Params) -> Result<()> {

// cache is a little bit tricky because while it improves performance it
// makes changes to twin data takes at least 5 min before they are detected
let db = SubstrateTwinDB::<RedisCache>::new(
args.substrate,
RedisCache::new(pool.clone(), "twin", Duration::from_secs(60)),
)
.await
.context("cannot create substrate twin db object")?;
let db =
SubstrateTwinDB::<RedisCache>::new(args.substrate, RedisCache::new(pool.clone(), "twin"))
.await
.context("cannot create substrate twin db object")?;

let id = db
.get_twin_with_account(signer.account())
Expand Down
24 changes: 16 additions & 8 deletions src/bins/rmb-relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;
use anyhow::{Context, Result};
use clap::{builder::ArgAction, Parser};
use rmb::cache::RedisCache;
use rmb::events;
use rmb::redis;
use rmb::relay::{
self,
Expand Down Expand Up @@ -142,14 +143,13 @@ async fn app(args: Args) -> Result<()> {
.await
.context("failed to initialize redis pool")?;

// we use 6 hours cache for twin information because twin id will not change anyway
// and we only need twin public key for validation only.
let twins = SubstrateTwinDB::<RedisCache>::new(
args.substrate,
RedisCache::new(pool.clone(), "twin", Duration::from_secs(args.cache * 60)),
)
.await
.context("cannot create substrate twin db object")?;
let redis_cache = RedisCache::new(pool.clone(), "twin");

redis_cache.flush().await?;

let twins = SubstrateTwinDB::<RedisCache>::new(args.substrate.clone(), redis_cache.clone())
.await
.context("cannot create substrate twin db object")?;

let max_users = args.workers as usize * args.user_per_worker as usize;
let opt = relay::SwitchOptions::new(pool.clone())
Expand All @@ -175,6 +175,14 @@ async fn app(args: Args) -> Result<()> {
let r = relay::Relay::new(&args.domain, twins, opt, federation, limiter, ranker)
.await
.unwrap();

let l = events::Listener::new(args.substrate[0].as_str(), redis_cache).await?;
tokio::spawn(async move {
if let Err(e) = l.listen().await {
log::error!("failed to listen to events: {:#}", e);
}
});

r.start(&args.listen).await.unwrap();
Ok(())
}
Expand Down
34 changes: 16 additions & 18 deletions src/cache/redis.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time::Duration;

use super::Cache;

use anyhow::{Context, Result};
Expand All @@ -22,19 +20,13 @@ use serde::{de::DeserializeOwned, Serialize};
pub struct RedisCache {
pool: Pool<RedisConnectionManager>,
prefix: String,
ttl: Duration,
}

impl RedisCache {
pub fn new<P: Into<String>>(
pool: Pool<RedisConnectionManager>,
prefix: P,
ttl: Duration,
) -> Self {
pub fn new<P: Into<String>>(pool: Pool<RedisConnectionManager>, prefix: P) -> Self {
Self {
pool,
prefix: prefix.into(),
ttl,
}
}

Expand All @@ -47,6 +39,12 @@ impl RedisCache {

Ok(conn)
}
pub async fn flush(&self) -> Result<()> {
let mut conn = self.get_connection().await?;
cmd("DEL").arg(&self.prefix).query_async(&mut *conn).await?;

Ok(())
}
}

#[async_trait]
Expand All @@ -57,22 +55,23 @@ where
async fn set<S: ToString + Send + Sync>(&self, key: S, obj: T) -> Result<()> {
let mut conn = self.get_connection().await?;
let obj = serde_json::to_vec(&obj).context("unable to serialize twin object for redis")?;
let key = format!("{}.{}", self.prefix, key.to_string());
cmd("SET")
.arg(key)
cmd("HSET")
.arg(&self.prefix)
.arg(key.to_string())
.arg(obj)
.arg("EX")
.arg(self.ttl.as_secs())
.query_async(&mut *conn)
.await?;

Ok(())
}
async fn get<S: ToString + Send + Sync>(&self, key: S) -> Result<Option<T>> {
let mut conn = self.get_connection().await?;
let key = format!("{}.{}", self.prefix, key.to_string());

let ret: Option<Vec<u8>> = cmd("GET").arg(key).query_async(&mut *conn).await?;
let ret: Option<Vec<u8>> = cmd("HGET")
.arg(&self.prefix)
.arg(key.to_string())
.query_async(&mut *conn)
.await?;

match ret {
Some(val) => {
Expand All @@ -93,7 +92,6 @@ mod tests {
use super::*;

const PREFIX: &str = "twin";
const TTL: u64 = 20;

async fn create_redis_cache() -> RedisCache {
let manager = RedisConnectionManager::new("redis://127.0.0.1/")
Expand All @@ -105,7 +103,7 @@ mod tests {
.context("unable to build pool or redis connection manager")
.unwrap();

RedisCache::new(pool, PREFIX, Duration::from_secs(TTL))
RedisCache::new(pool, PREFIX)
}

#[tokio::test]
Expand Down
43 changes: 43 additions & 0 deletions src/events/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use crate::{cache::Cache, tfchain::tfchain, twin::Twin};
use anyhow::Result;
use futures::StreamExt;
use log;
use subxt::{OnlineClient, PolkadotConfig};

#[derive(Clone)]
pub struct Listener<C>
where
C: Cache<Twin>,
{
cache: C,
api: OnlineClient<PolkadotConfig>,
}

impl<C> Listener<C>
where
C: Cache<Twin> + Clone,
{
pub async fn new(url: &str, cache: C) -> Result<Self> {
let api = OnlineClient::<PolkadotConfig>::from_url(url).await?;
Ok(Listener { api, cache })
}
pub async fn listen(&self) -> Result<()> {
log::info!("started chain events listener");
let mut blocks_sub = self.api.blocks().subscribe_finalized().await?;
while let Some(block) = blocks_sub.next().await {
let events = block?.events().await?;
for evt in events.iter() {
let evt = evt?;
if let Ok(Some(twin)) = evt.as_event::<tfchain::tfgrid_module::events::TwinStored>()
{
self.cache.set(twin.0.id, twin.0.into()).await?;
} else if let Ok(Some(twin)) =
evt.as_event::<tfchain::tfgrid_module::events::TwinUpdated>()
{
self.cache.set(twin.0.id, twin.0.into()).await?;
}
}
}
Ok(())
}
}
3 changes: 3 additions & 0 deletions src/events/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod events;

Check warning on line 1 in src/events/mod.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

module has the same name as its containing module

pub use events::Listener;
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ extern crate anyhow;
extern crate mime;

pub mod cache;
pub mod events;
pub mod identity;
pub mod peer;
pub mod redis;
pub mod relay;
pub mod tfchain;
pub mod token;
pub mod twin;
pub mod types;
43 changes: 42 additions & 1 deletion src/relay/api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::token::{self, Claims};
use crate::twin::TwinDB;
use crate::twin::{RelayDomains, TwinDB};
use crate::types::{Envelope, EnvelopeExt, Pong};
use anyhow::{Context, Result};
use futures::stream::SplitSink;
Expand All @@ -17,6 +17,7 @@ use prometheus::TextEncoder;
use protobuf::Message as ProtoMessage;
use std::fmt::Display;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;

Expand Down Expand Up @@ -203,6 +204,27 @@ async fn federation<D: TwinDB, R: RateLimiter>(
let envelope =
Envelope::parse_from_bytes(&body).map_err(|err| HttpError::BadRequest(err.to_string()))?;

if let Some(relays) = &envelope.relays {
let mut twin = data
.twins
.get_twin(envelope.source.twin)
.await
.map_err(|err| HttpError::FailedToGetTwin(err.to_string()))?
.ok_or_else(|| HttpError::TwinNotFound(envelope.source.twin))?;
let envelope_relays = match RelayDomains::from_str(relays) {
Ok(r) => r,
Err(_) => return Err(HttpError::BadRequest("invalid relays".to_string())),
};
if let Some(twin_relays) = twin.relay {
if twin_relays != envelope_relays {
twin.relay = Some(envelope_relays);
data.twins
.set_twin(twin)
.await
.map_err(|err| HttpError::FailedToSetTwin(err.to_string()))?;
}
}
}
let dst: StreamID = (&envelope.destination).into();
data.switch.send(&dst, &body).await?;

Expand Down Expand Up @@ -309,6 +331,25 @@ impl<M: Metrics, D: TwinDB> Stream<M, D> {
.await?
.ok_or_else(|| anyhow::Error::msg("unknown twin destination"))?;

if let Some(relays) = &envelope.relays {
let mut twin = self
.twins
.get_twin(envelope.source.twin)
.await?
.ok_or_else(|| anyhow::Error::msg("unknown twin source"))?;

let envelope_relays = match RelayDomains::from_str(relays) {
Ok(r) => r,
Err(_) => anyhow::bail!("invalid relays"),
};
if let Some(twin_relays) = twin.relay {
if twin_relays != envelope_relays {
twin.relay = Some(envelope_relays);
self.twins.set_twin(twin.clone()).await?;
}
}
}

if !twin
.relay
.ok_or_else(|| anyhow::Error::msg("relay info is not set for this twin"))?
Expand Down
3 changes: 3 additions & 0 deletions src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub enum HttpError {
InvalidJWT(#[from] token::Error),
#[error("failed to get twin: {0}")]
FailedToGetTwin(String),
#[error("failed to set twin: {0}")]
FailedToSetTwin(String),
#[error("twin not found {0}")]
TwinNotFound(u32),
#[error("{0}")]
Expand All @@ -109,6 +111,7 @@ impl HttpError {
Self::MissingJWT => Codes::BAD_REQUEST,
Self::InvalidJWT(_) => Codes::UNAUTHORIZED,
Self::FailedToGetTwin(_) => Codes::INTERNAL_SERVER_ERROR,
Self::FailedToSetTwin(_) => Codes::INTERNAL_SERVER_ERROR,
Self::TwinNotFound(_) => Codes::UNAUTHORIZED,
Self::WebsocketError(_) => Codes::INTERNAL_SERVER_ERROR,
Self::NotFound => Codes::NOT_FOUND,
Expand Down
6 changes: 6 additions & 0 deletions src/tfchain/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#[subxt::subxt(runtime_metadata_path = "artifacts/network.scale")]
mod tfchain {}

use subxt::utils::AccountId32;
pub use tfchain::runtime_types::pallet_tfgrid::types::Twin as TwinData;
pub type Twin = TwinData<AccountId32>;
17 changes: 17 additions & 0 deletions src/twin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ use subxt::utils::AccountId32;
pub trait TwinDB: Send + Sync + Clone + 'static {
async fn get_twin(&self, twin_id: u32) -> Result<Option<Twin>>;
async fn get_twin_with_account(&self, account_id: AccountId32) -> Result<Option<u32>>;
async fn set_twin(&self, twin: Twin) -> Result<()>;
}

use tfchain_client::client::Twin as TwinData;

use crate::tfchain;

#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct Twin {
pub id: u32,
Expand All @@ -41,6 +44,20 @@ impl From<TwinData> for Twin {
}
}

impl From<tfchain::Twin> for Twin {
fn from(twin: tfchain::Twin) -> Self {
Twin {
id: twin.id,
account: twin.account_id,
relay: twin.relay.map(|v| {
let string: String = String::from_utf8_lossy(&v.0).into();
RelayDomains::from_str(&string).unwrap_or_default()
}),
pk: twin.pk.map(|v| v.0),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default, Eq)]
pub struct RelayDomains(HashSet<String>);

Expand Down
5 changes: 5 additions & 0 deletions src/twin/substrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ where

Ok(id)
}

async fn set_twin(&self, twin: Twin) -> Result<()> {
self.cache.set(twin.id, twin).await?;
Ok(())
}
}

/// ClientWrapper is basically a substrate client.
Expand Down
Loading

0 comments on commit fae3a21

Please sign in to comment.