From 0e1085ed41d334aa380a69639de90e9d250d0bf0 Mon Sep 17 00:00:00 2001 From: nabil salah Date: Thu, 28 Nov 2024 14:27:14 +0200 Subject: [PATCH] fix: linting Co-authored-by: Salma Elsoly --- src/events/mod.rs | 54 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/src/events/mod.rs b/src/events/mod.rs index 8263665..affb3a0 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -62,15 +62,15 @@ where } pub async fn listen(&mut self) -> Result<()> { - let mut last_proccessed_block_hash:Option = None; + let mut last_proccessed_block_hash: Option = None; let mut backoff_delay = Duration::from_millis(500); - + loop { // always flush in case some blocks were finalized before reconnecting if let Err(err) = self.cache.flush().await { log::error!("failed to flush redis cache {}", err); tokio::time::sleep(backoff_delay).await; - backoff_delay = (backoff_delay * 2).min(Duration::from_secs(120)); + backoff_delay = (backoff_delay * 2).min(Duration::from_secs(120)); continue; } if let Err(err) = self.handle_blocks(&mut last_proccessed_block_hash).await { @@ -78,13 +78,13 @@ where if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::() { match Self::connect(&mut self.substrate_urls).await { Ok(val) => { - self.api = val; - backoff_delay = Duration::from_millis(500); - }, + self.api = val; + backoff_delay = Duration::from_millis(500); + } Err(err) => { log::error!("failed to reconnect to substrate: {}", err); tokio::time::sleep(backoff_delay).await; - backoff_delay = (backoff_delay * 2).min(Duration::from_secs(120)); + backoff_delay = (backoff_delay * 2).min(Duration::from_secs(120)); //return OnlineClient>; continue; } @@ -94,7 +94,7 @@ where } } - async fn handle_blocks(&self,last_proccessed_block_hash: &mut Option) -> Result<()> { + async fn handle_blocks(&self, last_proccessed_block_hash: &mut Option) -> Result<()> { log::info!("started chain events listener"); let mut blocks_sub = self.api.blocks().subscribe_finalized().await?; let mut flag_first = true; @@ -105,12 +105,15 @@ where flag_first = false; let parent_hash: Hash = block.header().parent_hash; let mut missed_hashed = Vec::new(); - - match self.handle_missed_blocks(parent_hash, last_proccessed_block_hash).await { + + match self + .handle_missed_blocks(parent_hash, last_proccessed_block_hash) + .await + { Ok(val) => missed_hashed = val, Err(err) => { log::error!("failed to handle missed blocks: {}", err); - }, + } }; for hash in missed_hashed { let block_missed = self.api.blocks().at(hash).await?; @@ -124,10 +127,14 @@ where Ok(()) } - async fn handle_missed_blocks(&self, from_block: Hash, last_proccessed_block_hash: &mut Option) -> Result> { + async fn handle_missed_blocks( + &self, + from_block: Hash, + last_proccessed_block_hash: &mut Option, + ) -> Result> { let mut missed_blocks = Vec::new(); let mut current_hash = from_block; - + if *last_proccessed_block_hash == None { return Ok(missed_blocks); } @@ -145,8 +152,19 @@ where missed_blocks.reverse(); Ok(missed_blocks) } - - async fn handle_events(&self, events: subxt::events::Events>>) -> Result<()>{ + + async fn handle_events( + &self, + events: subxt::events::Events< + WithExtrinsicParams< + subxt::SubstrateConfig, + subxt::config::extrinsic_params::BaseExtrinsicParams< + subxt::SubstrateConfig, + subxt::config::polkadot::PlainTip, + >, + >, + >, + ) -> Result<()> { for evt in events.iter() { let evt = match evt { Err(err) => { @@ -155,9 +173,11 @@ where } Ok(e) => e, }; - if let Ok(Some(twin)) = evt.as_event::(){ + if let Ok(Some(twin)) = evt.as_event::() { self.cache.set(twin.0.id, twin.0.into()).await?; - } else if let Ok(Some(twin)) = evt.as_event::(){ + } else if let Ok(Some(twin)) = + evt.as_event::() + { self.cache.set(twin.0.id, twin.0.into()).await?; } }