Skip to content

Commit

Permalink
feat: app kill on listener thread panic with backoff stratagy
Browse files Browse the repository at this point in the history
Signed-off-by: nabil salah <[email protected]>
  • Loading branch information
Nabil-Salah committed Dec 10, 2024
1 parent 11b1c05 commit 649e2e0
Showing 1 changed file with 45 additions and 8 deletions.
53 changes: 45 additions & 8 deletions src/bins/rmb-relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use rmb::relay::{
limiter::{FixedWindowOptions, Limiters},
};
use rmb::twin::SubstrateTwinDB;
use tokio::sync::oneshot;

/// A peer requires only which rely to connect to, and
/// which identity (mnemonics)
Expand Down Expand Up @@ -92,7 +93,7 @@ fn set_limits() -> Result<()> {
Ok(())
}

async fn app(args: Args) -> Result<()> {
async fn app(args: Args, tx: tokio::sync::oneshot::Sender<()>) -> Result<()> {
if args.workers == 0 {
anyhow::bail!("number of workers cannot be zero");
}
Expand Down Expand Up @@ -172,10 +173,32 @@ async fn app(args: Args) -> Result<()> {

let mut l = events::Listener::new(args.substrate, redis_cache).await?;
tokio::spawn(async move {
l.listen()
.await
.context("failed to listen to chain events")
.unwrap();
let max_retries = 7;// max wait is 2^7 = 128 seconds ( 2 minutes)
let mut attempt = 0;
let mut backoff = Duration::from_secs(1);

loop {
match l.listen().await.context("failed to listen to chain events") {
Ok(_) => break,
Err(e) => {
attempt += 1;
if attempt > max_retries {
log::error!("Listener failed after {} attempts: {:?}", attempt, e);
let _ = tx.send(());
break;
} else {
log::warn!(
"Listener failed on attempt {}: {:?}. Retrying in {:?}...",
attempt,
e,
backoff
);
tokio::time::sleep(backoff).await;
backoff *= 2;
}
}
}
}
});

r.start(&args.listen).await.unwrap();
Expand All @@ -185,8 +208,22 @@ async fn app(args: Args) -> Result<()> {
#[tokio::main]
async fn main() {
let args = Args::parse();
if let Err(e) = app(args).await {
eprintln!("{:#}", e);
std::process::exit(1);
let (tx, rx) = oneshot::channel();
let app_handle = tokio::spawn(async move {
if let Err(e) = app(args, tx).await {
eprintln!("{:#}", e);
std::process::exit(1);
}
});

tokio::select! {
_ = app_handle => {
log::info!("Application is closing successfully.");
}
_ = rx => {
log::error!("Listener shutdown signal received. Exiting application.");
std::process::exit(1);
}
}

}

0 comments on commit 649e2e0

Please sign in to comment.