Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: graceful shutdown #204

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use anyhow::Result;
use futures::StreamExt;
use log;
use subxt::{OnlineClient, PolkadotConfig};
use tokio::select;

#[derive(Clone)]
pub struct Listener<C>
Expand Down Expand Up @@ -68,15 +69,29 @@ where
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
if let Err(err) = self.handle_events().await {
log::error!("error listening to events {}", err);
if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::<subxt::Error>() {
self.api = Self::connect(&mut self.substrate_urls).await?;
select! {
_ = tokio::signal::ctrl_c() => {
log::info!("shutting down listener gracefully");
if let Err(err) = self.cache.flush().await {
log::error!("failed to flush redis cache {}", err);
}else {
log::info!("Succesful flush of redis cache");
}
break;
},
result = self.handle_events() => {
if let Err(err) = result {
log::error!("error listening to events {}", err);
if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::<subxt::Error>() {
self.api = Self::connect(&mut self.substrate_urls).await?;
}
} else {
*got_hit = true
}
}
} else {
*got_hit = true
}
}
Ok(())
}

async fn handle_events(&self) -> Result<()> {
Expand Down
57 changes: 34 additions & 23 deletions src/relay/federation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
use anyhow::Result;
use bb8_redis::{
bb8::{Pool, RunError},
redis::{cmd, RedisError},
redis::{aio::ConnectionLike, cmd, RedisError},

Check warning on line 9 in src/relay/federation/mod.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

unused import: `aio::ConnectionLike`

Check warning on line 9 in src/relay/federation/mod.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

unused import: `aio::ConnectionLike`

Check warning on line 9 in src/relay/federation/mod.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

unused import: `aio::ConnectionLike`
RedisConnectionManager,
};
use prometheus::{IntCounterVec, Opts, Registry};
use tokio::select;
use workers::WorkerPool;

mod router;
Expand Down Expand Up @@ -127,28 +128,38 @@
let mut workers = self.workers;

loop {
let mut con = match self.pool.get().await {
Ok(con) => con,
Err(err) => {
log::error!("could not get redis connection from pool, {}", err);
continue;
}
};
let worker_handler = workers.get().await;
let (_, msg): (String, Vec<u8>) = match cmd("BRPOP")
.arg(FEDERATION_QUEUE)
.arg(0.0)
.query_async(&mut *con)
.await
{
Ok(msg) => msg,
Err(err) => {
log::error!("could not get message from redis {}", err);
continue;
}
};
if let Err(err) = worker_handler.send(msg) {
log::error!("failed to send job to worker: {}", err);
select! {
_ = tokio::signal::ctrl_c() => {
log::info!("shutting down fedartor gracefully");
workers.close().await;
break;
},
result = self.pool.get() => {
let mut con = match result {
Ok(con) => con,
Err(err) => {
log::error!("could not get redis connection from pool, {}", err);
continue;
}
};
let worker_handler = workers.get().await;
let (_, msg): (String, Vec<u8>) = match cmd("BRPOP")
.arg(FEDERATION_QUEUE)
.arg(0.0)
.query_async(&mut *con)
.await
{
Ok(msg) => msg,
Err(err) => {
log::error!("could not get message from redis {}", err);
continue;
}
};
if let Err(err) = worker_handler.send(msg) {
log::error!("failed to send job to worker: {}", err);
}

},
}
}
}
Expand Down
35 changes: 23 additions & 12 deletions src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use hyper::server::conn::Http;
use hyper_tungstenite::tungstenite::error::ProtocolError;
use tokio::net::TcpListener;
use tokio::net::ToSocketAddrs;
use tokio::select;
use tokio::signal;

mod api;
mod federation;
Expand Down Expand Up @@ -63,19 +65,28 @@ where
self.limiter,
));
loop {
let (tcp_stream, _) = tcp_listener.accept().await?;
let http = http.clone();
tokio::task::spawn(async move {
if let Err(http_err) = Http::new()
.http1_keep_alive(true)
.serve_connection(tcp_stream, http)
.with_upgrades()
.await
{
eprintln!("Error while serving HTTP connection: {}", http_err);
}
});
select! {
_ = signal::ctrl_c() => {
log::info!("shutting down relay gracefully");
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to close the listener here?
something like

tcp_lister.close()...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio tcp_listener doesn't implement close or shutdown function to terminate the listener.
but using this tokio select simultaneously awaits for both the two actions and whenever one hits first it goes with it
So once the Ctrl+C signal is received, the listener stops accepting new TCP connections, and the application exits the loop

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So once the Ctrl+C signal is received, the listener stops accepting new TCP connections, and the application exits the loop

Yes, but this is not graceful. All in-flight requests will be forcefully closed as well.

There are two alternatives i can think of:

  1. see the example at tokio mini-redis https://github.com/tokio-rs/mini-redis/blob/e186482ca00f8d884ddcbe20417f3654d03315a4/src/server.rs#L172-L197

  2. If client can handle it, then it is probably OK to not shutdown it gracefully.

},
result = tcp_listener.accept() => {
let (tcp_stream, _) = result?;
let http = http.clone();
tokio::task::spawn(async move {
if let Err(http_err) = Http::new()
.http1_keep_alive(true)
.serve_connection(tcp_stream, http)
.with_upgrades()
.await
{
eprintln!("Error while serving HTTP connection: {}", http_err);
}
});
},
}
}
Ok(())
}
}

Expand Down
Loading