From 125fa2235865766c97a25c6c69db1a3210015984 Mon Sep 17 00:00:00 2001 From: nabil salah Date: Thu, 28 Nov 2024 13:59:10 +0200 Subject: [PATCH] fix: add exponential backoff and catch missed events when connection is down Co-authored-by: Salma Elsoly --- src/events/mod.rs | 102 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 82 insertions(+), 20 deletions(-) diff --git a/src/events/mod.rs b/src/events/mod.rs index b98540e..8263665 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -4,7 +4,8 @@ use crate::{cache::Cache, tfchain::tfchain, twin::Twin}; use anyhow::Result; use futures::StreamExt; use log; -use subxt::{OnlineClient, PolkadotConfig}; +use subxt::{config::WithExtrinsicParams, OnlineClient, PolkadotConfig}; +use tfchain_client::client::Hash; #[derive(Clone)] pub struct Listener @@ -61,45 +62,106 @@ where } pub async fn listen(&mut self) -> Result<()> { + let mut last_proccessed_block_hash:Option = None; + let mut backoff_delay = Duration::from_millis(500); + loop { // always flush in case some blocks were finalized before reconnecting if let Err(err) = self.cache.flush().await { log::error!("failed to flush redis cache {}", err); - tokio::time::sleep(Duration::from_millis(500)).await; + tokio::time::sleep(backoff_delay).await; + backoff_delay = (backoff_delay * 2).min(Duration::from_secs(120)); continue; } - if let Err(err) = self.handle_events().await { + if let Err(err) = self.handle_blocks(&mut last_proccessed_block_hash).await { log::error!("error listening to events {}", err); if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::() { - self.api = Self::connect(&mut self.substrate_urls).await?; + match Self::connect(&mut self.substrate_urls).await { + Ok(val) => { + self.api = val; + backoff_delay = Duration::from_millis(500); + }, + Err(err) => { + log::error!("failed to reconnect to substrate: {}", err); + tokio::time::sleep(backoff_delay).await; + backoff_delay = (backoff_delay * 2).min(Duration::from_secs(120)); + //return OnlineClient>; + continue; + } + }; } } } } - async fn handle_events(&self) -> Result<()> { + async fn handle_blocks(&self,last_proccessed_block_hash: &mut Option) -> Result<()> { log::info!("started chain events listener"); let mut blocks_sub = self.api.blocks().subscribe_finalized().await?; - while let Some(block) = blocks_sub.next().await { - let events = block?.events().await?; - for evt in events.iter() { - let evt = match evt { + let mut flag_first = true; + while let Some(block_result) = blocks_sub.next().await { + let block = block_result?; + let events = block.events().await?; + if flag_first { + flag_first = false; + let parent_hash: Hash = block.header().parent_hash; + let mut missed_hashed = Vec::new(); + + match self.handle_missed_blocks(parent_hash, last_proccessed_block_hash).await { + Ok(val) => missed_hashed = val, Err(err) => { - log::error!("failed to decode event {}", err); - continue; - } - Ok(e) => e, + log::error!("failed to handle missed blocks: {}", err); + }, }; - if let Ok(Some(twin)) = evt.as_event::() - { - self.cache.set(twin.0.id, twin.0.into()).await?; - } else if let Ok(Some(twin)) = - evt.as_event::() - { - self.cache.set(twin.0.id, twin.0.into()).await?; + for hash in missed_hashed { + let block_missed = self.api.blocks().at(hash).await?; + let events_missed = block_missed.events().await?; + self.handle_events(events_missed).await?; } } + self.handle_events(events).await?; + *last_proccessed_block_hash = Some(block.header().parent_hash); } Ok(()) } + + async fn handle_missed_blocks(&self, from_block: Hash, last_proccessed_block_hash: &mut Option) -> Result> { + let mut missed_blocks = Vec::new(); + let mut current_hash = from_block; + + if *last_proccessed_block_hash == None { + return Ok(missed_blocks); + } + loop { + let block = self.api.blocks().at(current_hash).await?; + if let Some(hash) = last_proccessed_block_hash { + if *hash == block.header().parent_hash { + break; + } + } + current_hash = block.header().parent_hash; + missed_blocks.push(current_hash); + } + // Reverse to get chronological order + missed_blocks.reverse(); + Ok(missed_blocks) + } + + async fn handle_events(&self, events: subxt::events::Events>>) -> Result<()>{ + for evt in events.iter() { + let evt = match evt { + Err(err) => { + log::error!("failed to decode event {}", err); + continue; + } + Ok(e) => e, + }; + if let Ok(Some(twin)) = evt.as_event::(){ + self.cache.set(twin.0.id, twin.0.into()).await?; + } else if let Ok(Some(twin)) = evt.as_event::(){ + self.cache.set(twin.0.id, twin.0.into()).await?; + } + } + + Ok(()) + } }