From 6e641554c4657f7dc771807f86b84c25218fddc4 Mon Sep 17 00:00:00 2001 From: nabil salah Date: Wed, 11 Dec 2024 17:08:07 +0200 Subject: [PATCH] feat: graceful shutdown Signed-off-by: nabil salah --- src/events/mod.rs | 39 +++++++++++++++++-------- src/relay/federation/mod.rs | 57 ++++++++++++++++++++++--------------- src/relay/mod.rs | 35 +++++++++++++++-------- 3 files changed, 84 insertions(+), 47 deletions(-) diff --git a/src/events/mod.rs b/src/events/mod.rs index 8a69542..e46db99 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -5,6 +5,7 @@ use anyhow::Result; use futures::StreamExt; use log; use subxt::{OnlineClient, PolkadotConfig}; +use tokio::select; #[derive(Clone)] pub struct Listener @@ -62,21 +63,35 @@ where pub async fn listen(&mut self, got_hit: &mut bool) -> Result<()> { loop { - // always flush in case some blocks were finalized before reconnecting - if let Err(err) = self.cache.flush().await { - log::error!("failed to flush redis cache {}", err); - tokio::time::sleep(Duration::from_millis(500)).await; - continue; - } - if let Err(err) = self.handle_events().await { - log::error!("error listening to events {}", err); - if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::() { - self.api = Self::connect(&mut self.substrate_urls).await?; + select! { + _ = tokio::signal::ctrl_c() => { + log::info!("shutting down listener gracefully"); + if let Err(err) = self.cache.flush().await { + log::error!("failed to flush redis cache {}", err); + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + } + break; + }, + result = self.cache.flush() => { + // always flush in case some blocks were finalized before reconnecting + if let Err(err) = result { + log::error!("failed to flush redis cache {}", err); + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + } + if let Err(err) = self.handle_events().await { + log::error!("error listening to events {}", err); + if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::() { + self.api = Self::connect(&mut self.substrate_urls).await?; + } + } else { + *got_hit = true + } } - } else { - *got_hit = true } } + Ok(()) } async fn handle_events(&self) -> Result<()> { diff --git a/src/relay/federation/mod.rs b/src/relay/federation/mod.rs index 87cd11a..4461db0 100644 --- a/src/relay/federation/mod.rs +++ b/src/relay/federation/mod.rs @@ -6,10 +6,11 @@ use crate::twin::TwinDB; use anyhow::Result; use bb8_redis::{ bb8::{Pool, RunError}, - redis::{cmd, RedisError}, + redis::{aio::ConnectionLike, cmd, RedisError}, RedisConnectionManager, }; use prometheus::{IntCounterVec, Opts, Registry}; +use tokio::select; use workers::WorkerPool; mod router; @@ -127,28 +128,38 @@ where let mut workers = self.workers; loop { - let mut con = match self.pool.get().await { - Ok(con) => con, - Err(err) => { - log::error!("could not get redis connection from pool, {}", err); - continue; - } - }; - let worker_handler = workers.get().await; - let (_, msg): (String, Vec) = match cmd("BRPOP") - .arg(FEDERATION_QUEUE) - .arg(0.0) - .query_async(&mut *con) - .await - { - Ok(msg) => msg, - Err(err) => { - log::error!("could not get message from redis {}", err); - continue; - } - }; - if let Err(err) = worker_handler.send(msg) { - log::error!("failed to send job to worker: {}", err); + select! { + _ = tokio::signal::ctrl_c() => { + log::info!("shutting down fedartor gracefully"); + workers.close().await; + break; + }, + result = self.pool.get() => { + let mut con = match result { + Ok(con) => con, + Err(err) => { + log::error!("could not get redis connection from pool, {}", err); + continue; + } + }; + let worker_handler = workers.get().await; + let (_, msg): (String, Vec) = match cmd("BRPOP") + .arg(FEDERATION_QUEUE) + .arg(0.0) + .query_async(&mut *con) + .await + { + Ok(msg) => msg, + Err(err) => { + log::error!("could not get message from redis {}", err); + continue; + } + }; + if let Err(err) = worker_handler.send(msg) { + log::error!("failed to send job to worker: {}", err); + } + + }, } } } diff --git a/src/relay/mod.rs b/src/relay/mod.rs index 15db835..db81a38 100644 --- a/src/relay/mod.rs +++ b/src/relay/mod.rs @@ -5,6 +5,8 @@ use hyper::server::conn::Http; use hyper_tungstenite::tungstenite::error::ProtocolError; use tokio::net::TcpListener; use tokio::net::ToSocketAddrs; +use tokio::select; +use tokio::signal; mod api; mod federation; @@ -63,19 +65,28 @@ where self.limiter, )); loop { - let (tcp_stream, _) = tcp_listener.accept().await?; - let http = http.clone(); - tokio::task::spawn(async move { - if let Err(http_err) = Http::new() - .http1_keep_alive(true) - .serve_connection(tcp_stream, http) - .with_upgrades() - .await - { - eprintln!("Error while serving HTTP connection: {}", http_err); - } - }); + select! { + _ = signal::ctrl_c() => { + log::info!("shutting down relay gracefully"); + break; + }, + result = tcp_listener.accept() => { + let (tcp_stream, _) = result?; + let http = http.clone(); + tokio::task::spawn(async move { + if let Err(http_err) = Http::new() + .http1_keep_alive(true) + .serve_connection(tcp_stream, http) + .with_upgrades() + .await + { + eprintln!("Error while serving HTTP connection: {}", http_err); + } + }); + }, + } } + Ok(()) } }