Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listen to chain events and update twin on requests #194

Merged
merged 6 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ url = "2.3.1"
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
futures-util = "0.3.25"
jwt = "0.16"
subxt = "0.28.0"
subxt = { version = "0.28.0", features = ["substrate-compat"]}
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] }
itertools = "0.11"

# for static build
Expand Down
3 changes: 3 additions & 0 deletions _tests/e2e_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ impl TwinDB for InMemoryDB {
) -> anyhow::Result<Option<u32>> {
unimplemented!()
}
async fn set_twin(&self, twin: Twin) -> anyhow::Result<()> {
unimplemented!()
}
}

fn new_message(
Expand Down
Binary file added artifacts/network.scale
Binary file not shown.
2 changes: 2 additions & 0 deletions proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ message Envelope {
bytes plain = 13;
bytes cipher = 14;
}

optional string relays = 17;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is just a single string, not a list of strings? it's better to send it as a []string imho

Copy link
Member

@sameh-farouk sameh-farouk Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we gonna re-introduced this again why not reuse the old federation field?
is that to keep compatibility with old peers that may interrupt it differently?
Also it is expected to have some seconds delay after switching the relay before it can receive messages.
it won't be bad as DNS propagation time which can take anywhere from a few hours up to 48 hours to propagate a new domain name for a website worldwide :D

so even if we didn't go with this path (adding the relays filed) we still just fine, how often node would switch its relays given that we are storing on-chain multiple relays per twin? is that even supported in zos nodes?

@muhamadazmy @AbdelrahmanElawady

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is currently used to set the relay of twin destination in rmb-sdk-go here. so it would break compatibility with older versions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also for the delay part, for clients that are using this new feature there should be no delay as cache will be updated with the requests made. for clients that are not using this field and changing relays there should be a delay up to 6 seconds until new blocks are produced.

This is already an improvement to the currently used relay warmer that adds a delay up to 10 minutes and previously before warmer it could take up to 60 seconds.

}
11 changes: 4 additions & 7 deletions src/bins/rmb-peer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;

use anyhow::{Context, Result};
use clap::{builder::ArgAction, Args, Parser};
Expand Down Expand Up @@ -151,12 +150,10 @@ async fn app(args: Params) -> Result<()> {

// cache is a little bit tricky because while it improves performance it
// makes changes to twin data takes at least 5 min before they are detected
let db = SubstrateTwinDB::<RedisCache>::new(
args.substrate,
RedisCache::new(pool.clone(), "twin", Duration::from_secs(60)),
)
.await
.context("cannot create substrate twin db object")?;
let db =
SubstrateTwinDB::<RedisCache>::new(args.substrate, RedisCache::new(pool.clone(), "twin"))
.await
.context("cannot create substrate twin db object")?;

let id = db
.get_twin_with_account(signer.account())
Expand Down
22 changes: 14 additions & 8 deletions src/bins/rmb-relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;
use anyhow::{Context, Result};
use clap::{builder::ArgAction, Parser};
use rmb::cache::RedisCache;
use rmb::events;
use rmb::redis;
use rmb::relay::{
self,
Expand Down Expand Up @@ -142,14 +143,11 @@ async fn app(args: Args) -> Result<()> {
.await
.context("failed to initialize redis pool")?;

// we use 6 hours cache for twin information because twin id will not change anyway
// and we only need twin public key for validation only.
let twins = SubstrateTwinDB::<RedisCache>::new(
args.substrate,
RedisCache::new(pool.clone(), "twin", Duration::from_secs(args.cache * 60)),
)
.await
.context("cannot create substrate twin db object")?;
let redis_cache = RedisCache::new(pool.clone(), "twin");

let twins = SubstrateTwinDB::<RedisCache>::new(args.substrate.clone(), redis_cache.clone())
.await
.context("cannot create substrate twin db object")?;

let max_users = args.workers as usize * args.user_per_worker as usize;
let opt = relay::SwitchOptions::new(pool.clone())
Expand All @@ -175,6 +173,14 @@ async fn app(args: Args) -> Result<()> {
let r = relay::Relay::new(&args.domain, twins, opt, federation, limiter, ranker)
.await
.unwrap();

let mut l = events::Listener::new(args.substrate, redis_cache).await?;
tokio::spawn(async move {
if let Err(e) = l.listen().await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see a problem here. If the listener failed for any reason (and returned an error here) It's not gonna be good for the system that the relay keeps running otherwise it will rely on out-dated date forever.

So what we need to do is:

  • Events listener failure is fatal and the entire system should then exit with an error
  • Event system never fails, it need to retry forever
    • If failure is persisted for sometime relay then should return errors to sender that they can't grantee deliver to destinations

I checked the listener code and it seems there are many points where the listener can return an error. for example cache.flush() but what if redis was temporary unavailable or restarting for some reason. This will cause the system to continue working but with a cache that will never be updated.

IMHO the best approach is to make the listener infailable. This can be accomplished but basically never give up on errors

log::error!("failed to listen to events: {:#}", e);
}
});

r.start(&args.listen).await.unwrap();
Ok(())
}
Expand Down
6 changes: 6 additions & 0 deletions src/cache/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ where
Some(v) => Ok(Some(v.clone())),
}
}
async fn flush(&self) -> Result<()> {
let mut mem = self.mem.write().await;
mem.clear();

Ok(())
}
}

#[cfg(test)]
Expand Down
10 changes: 10 additions & 0 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::marker::{Send, Sync};
pub trait Cache<T>: Send + Sync + 'static {
async fn set<S: ToString + Send + Sync>(&self, id: S, obj: T) -> Result<()>;
async fn get<S: ToString + Send + Sync>(&self, id: S) -> Result<Option<T>>;
async fn flush(&self) -> Result<()>;
}

#[async_trait]
Expand All @@ -31,6 +32,12 @@ where
None => Ok(None),
}
}
async fn flush(&self) -> Result<()> {
match self {
Some(cache) => cache.flush().await,
None => Ok(()),
}
}
}

#[derive(Clone, Copy)]
Expand All @@ -47,4 +54,7 @@ where
async fn get<S: ToString + Send + Sync>(&self, _id: S) -> Result<Option<T>> {
Ok(None)
}
async fn flush(&self) -> Result<()> {
Ok(())
}
}
34 changes: 16 additions & 18 deletions src/cache/redis.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time::Duration;

use super::Cache;

use anyhow::{Context, Result};
Expand All @@ -22,19 +20,13 @@ use serde::{de::DeserializeOwned, Serialize};
pub struct RedisCache {
pool: Pool<RedisConnectionManager>,
prefix: String,
ttl: Duration,
}

impl RedisCache {
pub fn new<P: Into<String>>(
pool: Pool<RedisConnectionManager>,
prefix: P,
ttl: Duration,
) -> Self {
pub fn new<P: Into<String>>(pool: Pool<RedisConnectionManager>, prefix: P) -> Self {
Self {
pool,
prefix: prefix.into(),
ttl,
}
}

Expand All @@ -57,22 +49,23 @@ where
async fn set<S: ToString + Send + Sync>(&self, key: S, obj: T) -> Result<()> {
let mut conn = self.get_connection().await?;
let obj = serde_json::to_vec(&obj).context("unable to serialize twin object for redis")?;
let key = format!("{}.{}", self.prefix, key.to_string());
cmd("SET")
.arg(key)
cmd("HSET")
.arg(&self.prefix)
.arg(key.to_string())
.arg(obj)
.arg("EX")
.arg(self.ttl.as_secs())
.query_async(&mut *conn)
.await?;

Ok(())
}
async fn get<S: ToString + Send + Sync>(&self, key: S) -> Result<Option<T>> {
let mut conn = self.get_connection().await?;
let key = format!("{}.{}", self.prefix, key.to_string());

let ret: Option<Vec<u8>> = cmd("GET").arg(key).query_async(&mut *conn).await?;
let ret: Option<Vec<u8>> = cmd("HGET")
.arg(&self.prefix)
.arg(key.to_string())
.query_async(&mut *conn)
.await?;

match ret {
Some(val) => {
Expand All @@ -84,6 +77,12 @@ where
None => Ok(None),
}
}
async fn flush(&self) -> Result<()> {
let mut conn = self.get_connection().await?;
cmd("DEL").arg(&self.prefix).query_async(&mut *conn).await?;

Ok(())
}
}

#[cfg(test)]
Expand All @@ -93,7 +92,6 @@ mod tests {
use super::*;

const PREFIX: &str = "twin";
const TTL: u64 = 20;

async fn create_redis_cache() -> RedisCache {
let manager = RedisConnectionManager::new("redis://127.0.0.1/")
Expand All @@ -105,7 +103,7 @@ mod tests {
.context("unable to build pool or redis connection manager")
.unwrap();

RedisCache::new(pool, PREFIX, Duration::from_secs(TTL))
RedisCache::new(pool, PREFIX)
}

#[tokio::test]
Expand Down
105 changes: 105 additions & 0 deletions src/events/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use std::collections::LinkedList;

use crate::{cache::Cache, tfchain::tfchain, twin::Twin};
use anyhow::Result;
use futures::StreamExt;
use log;
use subxt::{OnlineClient, PolkadotConfig};

#[derive(Clone)]
pub struct Listener<C>
where
C: Cache<Twin>,
{
cache: C,
api: OnlineClient<PolkadotConfig>,
substrate_urls: LinkedList<String>,
}

impl<C> Listener<C>
where
C: Cache<Twin> + Clone,
{
pub async fn new(substrate_urls: Vec<String>, cache: C) -> Result<Self> {
let mut urls = LinkedList::new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sure you can create a linked list directly from a Vec
will probably be something like

LinkedList::from_iter(substrate_urls)

just saying

for url in substrate_urls {
urls.push_back(url);
}

let api = Self::connect(&mut urls).await?;

cache.flush().await?;
Ok(Listener {
api,
cache,
substrate_urls: urls,
})
}

async fn connect(urls: &mut LinkedList<String>) -> Result<OnlineClient<PolkadotConfig>> {
let trials = urls.len() * 2;
for _ in 0..trials {
let url = match urls.front() {
Some(url) => url,
None => anyhow::bail!("substrate urls list is empty"),
};

match OnlineClient::<PolkadotConfig>::from_url(url).await {
Ok(client) => return Ok(client),
Err(err) => {
log::error!(
"failed to create substrate client with url \"{}\": {}",
url,
err
);
}
}

if let Some(front) = urls.pop_front() {
urls.push_back(front);
}
}

anyhow::bail!("failed to connect to substrate using the provided urls")
}

pub async fn listen(&mut self) -> Result<()> {
loop {
// always flush in case some blocks were finalized before reconnecting
self.cache.flush().await?;
match self.handle_events().await {
Err(err) => {
if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::<subxt::Error>() {
self.api = Self::connect(&mut self.substrate_urls).await?;
} else {
return Err(err);
}
}
Ok(_) => {
// reconnect here too?
self.api = Self::connect(&mut self.substrate_urls).await?;
}
}
}
}

async fn handle_events(&self) -> Result<()> {
log::info!("started chain events listener");
let mut blocks_sub = self.api.blocks().subscribe_finalized().await?;
while let Some(block) = blocks_sub.next().await {
let events = block?.events().await?;
for evt in events.iter() {
let evt = evt?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do u assume this is a connection error here. Although it can be encoding error (most probably) this mean that you are flushing the entire db, and reconnecting if you receive an bad event type

I am wondering if we better log this error and continue instead ? what do you think

if let Ok(Some(twin)) = evt.as_event::<tfchain::tfgrid_module::events::TwinStored>()
{
self.cache.set(twin.0.id, twin.0.into()).await?;
} else if let Ok(Some(twin)) =
evt.as_event::<tfchain::tfgrid_module::events::TwinUpdated>()
{
self.cache.set(twin.0.id, twin.0.into()).await?;
}
}
}
Ok(())
}
}
3 changes: 3 additions & 0 deletions src/events/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod events;

Check warning on line 1 in src/events/mod.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

module has the same name as its containing module

pub use events::Listener;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the warning from clippy i think since you don't need to use submodule instead just move the Listener to mod.rs directly

2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ extern crate anyhow;
extern crate mime;

pub mod cache;
pub mod events;
pub mod identity;
pub mod peer;
pub mod redis;
pub mod relay;
pub mod tfchain;
pub mod token;
pub mod twin;
pub mod types;
Loading
Loading