diff --git a/src/bins/rmb-relay.rs b/src/bins/rmb-relay.rs index 222a64b..f95bfb0 100644 --- a/src/bins/rmb-relay.rs +++ b/src/bins/rmb-relay.rs @@ -93,7 +93,7 @@ fn set_limits() -> Result<()> { Ok(()) } -async fn app(args: Args, tx: tokio::sync::oneshot::Sender<()>) -> Result<()> { +async fn app(args: Args, tx: oneshot::Sender<()>) -> Result<()> { if args.workers == 0 { anyhow::bail!("number of workers cannot be zero"); } @@ -173,29 +173,39 @@ async fn app(args: Args, tx: tokio::sync::oneshot::Sender<()>) -> Result<()> { let mut l = events::Listener::new(args.substrate, redis_cache).await?; tokio::spawn(async move { - let max_retries = 7; // max wait is 2^7 = 128 seconds ( 2 minutes) + let max_retries = 9; // max wait is 2^9 = 512 seconds ( 5 minutes ) let mut attempt = 0; let mut backoff = Duration::from_secs(1); + let mut got_hit = false; loop { - match l.listen().await.context("failed to listen to chain events") { + match l + .listen(&mut got_hit) + .await + .context("failed to listen to chain events") + { Ok(_) => break, Err(e) => { + if got_hit { + log::warn!("Listener got a hit, but failed to listen to chain events before no attempts will be reset"); + got_hit = false; + attempt = 0; + backoff = Duration::from_secs(1); + } attempt += 1; if attempt > max_retries { - log::error!("Listener failed after {} attempts: {:?}", attempt, e); + log::error!("Listener failed after {} attempts: {:?}", attempt - 1, e); let _ = tx.send(()); break; - } else { - log::warn!( - "Listener failed on attempt {}: {:?}. Retrying in {:?}...", - attempt, - e, - backoff - ); - tokio::time::sleep(backoff).await; - backoff *= 2; } + log::warn!( + "Listener failed on attempt {}: {:?}. Retrying in {:?}...", + attempt, + e, + backoff + ); + tokio::time::sleep(backoff).await; + backoff *= 2; } } } diff --git a/src/events/mod.rs b/src/events/mod.rs index b98540e..8a69542 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -60,7 +60,7 @@ where anyhow::bail!("failed to connect to substrate using the provided urls") } - pub async fn listen(&mut self) -> Result<()> { + pub async fn listen(&mut self, got_hit: &mut bool) -> Result<()> { loop { // always flush in case some blocks were finalized before reconnecting if let Err(err) = self.cache.flush().await { @@ -73,6 +73,8 @@ where if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::() { self.api = Self::connect(&mut self.substrate_urls).await?; } + } else { + *got_hit = true } } }