Skip to content

Commit

Permalink
fix: add exponential backoff and catch missed events when connection …
Browse files Browse the repository at this point in the history
…is down

Co-authored-by: Salma Elsoly <[email protected]>
  • Loading branch information
Nabil-Salah and SalmaElsoly committed Nov 28, 2024
1 parent 11b1c05 commit 125fa22
Showing 1 changed file with 82 additions and 20 deletions.
102 changes: 82 additions & 20 deletions src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use crate::{cache::Cache, tfchain::tfchain, twin::Twin};
use anyhow::Result;
use futures::StreamExt;
use log;
use subxt::{OnlineClient, PolkadotConfig};
use subxt::{config::WithExtrinsicParams, OnlineClient, PolkadotConfig};
use tfchain_client::client::Hash;

#[derive(Clone)]
pub struct Listener<C>
Expand Down Expand Up @@ -61,45 +62,106 @@ where
}

pub async fn listen(&mut self) -> Result<()> {
let mut last_proccessed_block_hash:Option<Hash> = None;
let mut backoff_delay = Duration::from_millis(500);

loop {
// always flush in case some blocks were finalized before reconnecting
if let Err(err) = self.cache.flush().await {
log::error!("failed to flush redis cache {}", err);
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(backoff_delay).await;
backoff_delay = (backoff_delay * 2).min(Duration::from_secs(120));
continue;
}
if let Err(err) = self.handle_events().await {
if let Err(err) = self.handle_blocks(&mut last_proccessed_block_hash).await {
log::error!("error listening to events {}", err);
if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::<subxt::Error>() {
self.api = Self::connect(&mut self.substrate_urls).await?;
match Self::connect(&mut self.substrate_urls).await {
Ok(val) => {
self.api = val;
backoff_delay = Duration::from_millis(500);
},
Err(err) => {
log::error!("failed to reconnect to substrate: {}", err);
tokio::time::sleep(backoff_delay).await;
backoff_delay = (backoff_delay * 2).min(Duration::from_secs(120));
//return OnlineClient<WithExtrinsicParams<,>>;
continue;
}
};
}
}
}
}

async fn handle_events(&self) -> Result<()> {
async fn handle_blocks(&self,last_proccessed_block_hash: &mut Option<Hash>) -> Result<()> {
log::info!("started chain events listener");
let mut blocks_sub = self.api.blocks().subscribe_finalized().await?;
while let Some(block) = blocks_sub.next().await {
let events = block?.events().await?;
for evt in events.iter() {
let evt = match evt {
let mut flag_first = true;
while let Some(block_result) = blocks_sub.next().await {
let block = block_result?;
let events = block.events().await?;
if flag_first {
flag_first = false;
let parent_hash: Hash = block.header().parent_hash;
let mut missed_hashed = Vec::new();

match self.handle_missed_blocks(parent_hash, last_proccessed_block_hash).await {
Ok(val) => missed_hashed = val,
Err(err) => {
log::error!("failed to decode event {}", err);
continue;
}
Ok(e) => e,
log::error!("failed to handle missed blocks: {}", err);
},
};
if let Ok(Some(twin)) = evt.as_event::<tfchain::tfgrid_module::events::TwinStored>()
{
self.cache.set(twin.0.id, twin.0.into()).await?;
} else if let Ok(Some(twin)) =
evt.as_event::<tfchain::tfgrid_module::events::TwinUpdated>()
{
self.cache.set(twin.0.id, twin.0.into()).await?;
for hash in missed_hashed {
let block_missed = self.api.blocks().at(hash).await?;
let events_missed = block_missed.events().await?;
self.handle_events(events_missed).await?;
}
}
self.handle_events(events).await?;
*last_proccessed_block_hash = Some(block.header().parent_hash);
}
Ok(())
}

async fn handle_missed_blocks(&self, from_block: Hash, last_proccessed_block_hash: &mut Option<Hash>) -> Result<Vec<Hash>> {
let mut missed_blocks = Vec::new();
let mut current_hash = from_block;

if *last_proccessed_block_hash == None {
return Ok(missed_blocks);
}
loop {
let block = self.api.blocks().at(current_hash).await?;
if let Some(hash) = last_proccessed_block_hash {
if *hash == block.header().parent_hash {
break;
}
}
current_hash = block.header().parent_hash;
missed_blocks.push(current_hash);
}
// Reverse to get chronological order
missed_blocks.reverse();
Ok(missed_blocks)
}

async fn handle_events(&self, events: subxt::events::Events<WithExtrinsicParams<subxt::SubstrateConfig, subxt::config::extrinsic_params::BaseExtrinsicParams<subxt::SubstrateConfig, subxt::config::polkadot::PlainTip>>>) -> Result<()>{
for evt in events.iter() {
let evt = match evt {
Err(err) => {
log::error!("failed to decode event {}", err);
continue;
}
Ok(e) => e,
};
if let Ok(Some(twin)) = evt.as_event::<tfchain::tfgrid_module::events::TwinStored>(){
self.cache.set(twin.0.id, twin.0.into()).await?;
} else if let Ok(Some(twin)) = evt.as_event::<tfchain::tfgrid_module::events::TwinUpdated>(){
self.cache.set(twin.0.id, twin.0.into()).await?;
}
}

Ok(())
}
}

0 comments on commit 125fa22

Please sign in to comment.