From 70bab3e048bbd345dcc2de04e6e5fdf44f3d022e Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Tue, 3 Dec 2024 11:47:59 +0200 Subject: [PATCH] refactor: remove unnessecary missed events handling --- src/events/mod.rs | 95 ++++++++++++----------------------------------- 1 file changed, 24 insertions(+), 71 deletions(-) diff --git a/src/events/mod.rs b/src/events/mod.rs index 8263665..00ee2f5 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -4,8 +4,7 @@ use crate::{cache::Cache, tfchain::tfchain, twin::Twin}; use anyhow::Result; use futures::StreamExt; use log; -use subxt::{config::WithExtrinsicParams, OnlineClient, PolkadotConfig}; -use tfchain_client::client::Hash; +use subxt::{OnlineClient, PolkadotConfig}; #[derive(Clone)] pub struct Listener @@ -62,29 +61,28 @@ where } pub async fn listen(&mut self) -> Result<()> { - let mut last_proccessed_block_hash:Option = None; let mut backoff_delay = Duration::from_millis(500); - + loop { // always flush in case some blocks were finalized before reconnecting if let Err(err) = self.cache.flush().await { log::error!("failed to flush redis cache {}", err); tokio::time::sleep(backoff_delay).await; - backoff_delay = (backoff_delay * 2).min(Duration::from_secs(120)); + backoff_delay = (backoff_delay * 2).min(Duration::from_secs(120)); continue; } - if let Err(err) = self.handle_blocks(&mut last_proccessed_block_hash).await { + if let Err(err) = self.handle_events().await { log::error!("error listening to events {}", err); if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::() { match Self::connect(&mut self.substrate_urls).await { Ok(val) => { - self.api = val; - backoff_delay = Duration::from_millis(500); - }, + self.api = val; + backoff_delay = Duration::from_millis(500); + } Err(err) => { log::error!("failed to reconnect to substrate: {}", err); tokio::time::sleep(backoff_delay).await; - backoff_delay = (backoff_delay * 2).min(Duration::from_secs(120)); + backoff_delay = (backoff_delay * 2).min(Duration::from_secs(120)); //return OnlineClient>; continue; } @@ -94,74 +92,29 @@ where } } - async fn handle_blocks(&self,last_proccessed_block_hash: &mut Option) -> Result<()> { + async fn handle_events(&self) -> Result<()> { log::info!("started chain events listener"); let mut blocks_sub = self.api.blocks().subscribe_finalized().await?; - let mut flag_first = true; - while let Some(block_result) = blocks_sub.next().await { - let block = block_result?; - let events = block.events().await?; - if flag_first { - flag_first = false; - let parent_hash: Hash = block.header().parent_hash; - let mut missed_hashed = Vec::new(); - - match self.handle_missed_blocks(parent_hash, last_proccessed_block_hash).await { - Ok(val) => missed_hashed = val, + while let Some(block) = blocks_sub.next().await { + let events = block?.events().await?; + for evt in events.iter() { + let evt = match evt { Err(err) => { - log::error!("failed to handle missed blocks: {}", err); - }, + log::error!("failed to decode event {}", err); + continue; + } + Ok(e) => e, }; - for hash in missed_hashed { - let block_missed = self.api.blocks().at(hash).await?; - let events_missed = block_missed.events().await?; - self.handle_events(events_missed).await?; + if let Ok(Some(twin)) = evt.as_event::() + { + self.cache.set(twin.0.id, twin.0.into()).await?; + } else if let Ok(Some(twin)) = + evt.as_event::() + { + self.cache.set(twin.0.id, twin.0.into()).await?; } } - self.handle_events(events).await?; - *last_proccessed_block_hash = Some(block.header().parent_hash); } Ok(()) } - - async fn handle_missed_blocks(&self, from_block: Hash, last_proccessed_block_hash: &mut Option) -> Result> { - let mut missed_blocks = Vec::new(); - let mut current_hash = from_block; - - if *last_proccessed_block_hash == None { - return Ok(missed_blocks); - } - loop { - let block = self.api.blocks().at(current_hash).await?; - if let Some(hash) = last_proccessed_block_hash { - if *hash == block.header().parent_hash { - break; - } - } - current_hash = block.header().parent_hash; - missed_blocks.push(current_hash); - } - // Reverse to get chronological order - missed_blocks.reverse(); - Ok(missed_blocks) - } - - async fn handle_events(&self, events: subxt::events::Events>>) -> Result<()>{ - for evt in events.iter() { - let evt = match evt { - Err(err) => { - log::error!("failed to decode event {}", err); - continue; - } - Ok(e) => e, - }; - if let Ok(Some(twin)) = evt.as_event::(){ - self.cache.set(twin.0.id, twin.0.into()).await?; - } else if let Ok(Some(twin)) = evt.as_event::(){ - self.cache.set(twin.0.id, twin.0.into()).await?; - } - } - - Ok(()) - } }