Skip to content

Commit

Permalink
feat: graceful shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: nabil salah <[email protected]>
  • Loading branch information
Nabil-Salah committed Dec 11, 2024
1 parent 0ef17e7 commit 6e64155
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 47 deletions.
39 changes: 27 additions & 12 deletions src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use anyhow::Result;
use futures::StreamExt;
use log;
use subxt::{OnlineClient, PolkadotConfig};
use tokio::select;

#[derive(Clone)]
pub struct Listener<C>
Expand Down Expand Up @@ -62,21 +63,35 @@ where

pub async fn listen(&mut self, got_hit: &mut bool) -> Result<()> {
loop {
// always flush in case some blocks were finalized before reconnecting
if let Err(err) = self.cache.flush().await {
log::error!("failed to flush redis cache {}", err);
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
if let Err(err) = self.handle_events().await {
log::error!("error listening to events {}", err);
if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::<subxt::Error>() {
self.api = Self::connect(&mut self.substrate_urls).await?;
select! {
_ = tokio::signal::ctrl_c() => {
log::info!("shutting down listener gracefully");
if let Err(err) = self.cache.flush().await {
log::error!("failed to flush redis cache {}", err);
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
break;
},
result = self.cache.flush() => {
// always flush in case some blocks were finalized before reconnecting
if let Err(err) = result {
log::error!("failed to flush redis cache {}", err);
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
if let Err(err) = self.handle_events().await {
log::error!("error listening to events {}", err);
if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::<subxt::Error>() {
self.api = Self::connect(&mut self.substrate_urls).await?;
}
} else {
*got_hit = true
}
}
} else {
*got_hit = true
}
}
Ok(())
}

async fn handle_events(&self) -> Result<()> {
Expand Down
57 changes: 34 additions & 23 deletions src/relay/federation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use crate::twin::TwinDB;
use anyhow::Result;
use bb8_redis::{
bb8::{Pool, RunError},
redis::{cmd, RedisError},
redis::{aio::ConnectionLike, cmd, RedisError},

Check warning on line 9 in src/relay/federation/mod.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

unused import: `aio::ConnectionLike`

Check warning on line 9 in src/relay/federation/mod.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

unused import: `aio::ConnectionLike`

Check warning on line 9 in src/relay/federation/mod.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

unused import: `aio::ConnectionLike`
RedisConnectionManager,
};
use prometheus::{IntCounterVec, Opts, Registry};
use tokio::select;
use workers::WorkerPool;

mod router;
Expand Down Expand Up @@ -127,28 +128,38 @@ where
let mut workers = self.workers;

loop {
let mut con = match self.pool.get().await {
Ok(con) => con,
Err(err) => {
log::error!("could not get redis connection from pool, {}", err);
continue;
}
};
let worker_handler = workers.get().await;
let (_, msg): (String, Vec<u8>) = match cmd("BRPOP")
.arg(FEDERATION_QUEUE)
.arg(0.0)
.query_async(&mut *con)
.await
{
Ok(msg) => msg,
Err(err) => {
log::error!("could not get message from redis {}", err);
continue;
}
};
if let Err(err) = worker_handler.send(msg) {
log::error!("failed to send job to worker: {}", err);
select! {
_ = tokio::signal::ctrl_c() => {
log::info!("shutting down fedartor gracefully");
workers.close().await;
break;
},
result = self.pool.get() => {
let mut con = match result {
Ok(con) => con,
Err(err) => {
log::error!("could not get redis connection from pool, {}", err);
continue;
}
};
let worker_handler = workers.get().await;
let (_, msg): (String, Vec<u8>) = match cmd("BRPOP")
.arg(FEDERATION_QUEUE)
.arg(0.0)
.query_async(&mut *con)
.await
{
Ok(msg) => msg,
Err(err) => {
log::error!("could not get message from redis {}", err);
continue;
}
};
if let Err(err) = worker_handler.send(msg) {
log::error!("failed to send job to worker: {}", err);
}

},
}
}
}
Expand Down
35 changes: 23 additions & 12 deletions src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use hyper::server::conn::Http;
use hyper_tungstenite::tungstenite::error::ProtocolError;
use tokio::net::TcpListener;
use tokio::net::ToSocketAddrs;
use tokio::select;
use tokio::signal;

mod api;
mod federation;
Expand Down Expand Up @@ -63,19 +65,28 @@ where
self.limiter,
));
loop {
let (tcp_stream, _) = tcp_listener.accept().await?;
let http = http.clone();
tokio::task::spawn(async move {
if let Err(http_err) = Http::new()
.http1_keep_alive(true)
.serve_connection(tcp_stream, http)
.with_upgrades()
.await
{
eprintln!("Error while serving HTTP connection: {}", http_err);
}
});
select! {
_ = signal::ctrl_c() => {
log::info!("shutting down relay gracefully");
break;
},
result = tcp_listener.accept() => {
let (tcp_stream, _) = result?;
let http = http.clone();
tokio::task::spawn(async move {
if let Err(http_err) = Http::new()
.http1_keep_alive(true)
.serve_connection(tcp_stream, http)
.with_upgrades()
.await
{
eprintln!("Error while serving HTTP connection: {}", http_err);
}
});
},
}
}
Ok(())
}
}

Expand Down

0 comments on commit 6e64155

Please sign in to comment.