-
SummaryIm new to async rust and using the example in this repo + this ref Ive come up with this: ..
lazy_static! {
/// Channel used to send shutdown signal - wrapped in an Option to allow
/// it to be taken by value (since oneshot channels consume themselves on
/// send) and an Arc<Mutex> to allow it to be safely shared between threads
pub static ref SHUTDOWN_TX: Arc<Mutex<Option<Sender<()>>>> = <_>::default();
}
..
let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
let (terminate_tx, terminate_rx) = tokio::sync::oneshot::channel::<()>();
SHUTDOWN_TX.lock().await.replace(terminate_tx);
let mut terminate_rx = terminate_rx.fuse();
// Create a watch channel to track tasks that are handling connections and wait for them to
// complete.
let (close_tx, close_rx) = watch::channel(());
// Continuously accept new connections.
loop {
let (socket, remote_addr) = tokio::select! {
// Either accept a new connection...
result = listener.accept() => {
result.unwrap()
}
// ...or wait to receive a shutdown signal and stop the accept loop.
_ = shutdown_signal(&mut terminate_rx) => {
debug!("signal received, not accepting new connections");
break;
}
};
debug!("connection {remote_addr} accepted");
// We don't need to call `poll_ready` because `Router` is always ready.
let tower_service = app.clone();
// Clone the watch receiver and move it into the task.
let close_rx = close_rx.clone();
// Spawn a task to handle the connection. That way we can serve multiple connections
// concurrently.
tokio_scoped::scope(|scope| {
scope.spawn(async {
// Hyper has its own `AsyncRead` and `AsyncWrite` traits and doesn't use tokio.
// `TokioIo` converts between them.
let socket = TokioIo::new(socket);
// Hyper also has its own `Service` trait and doesn't use tower. We can use
// `hyper::service::service_fn` to create a hyper `Service` that calls our app through
// `tower::Service::call`.
let hyper_service =
hyper::service::service_fn(move |request: Request<Incoming>| {
// We have to clone `tower_service` because hyper's `Service` uses `&self` whereas
// tower's `Service` requires `&mut self`.
//
// We don't need to call `poll_ready` since `Router` is always ready.
tower_service.clone().call(request)
});
// `hyper_util::server::conn::auto::Builder` supports both http1 and http2 but doesn't
// support graceful so we have to use hyper directly and unfortunately pick between
// http1 and http2.
let conn = hyper::server::conn::http1::Builder::new()
.serve_connection(socket, hyper_service)
// `with_upgrades` is required for websockets.
.with_upgrades();
// `graceful_shutdown` requires a pinned connection.
let mut conn = std::pin::pin!(conn);
loop {
tokio::select! {
// Poll the connection. This completes when the client has closed the
// connection, graceful shutdown has completed, or we encounter a TCP error.
result = conn.as_mut() => {
if let Err(err) = result {
debug!("failed to serve connection: {err:#}");
}
break;
}
// Start graceful shutdown when we receive a shutdown signal.
//
// We use a loop to continue polling the connection to allow requests to finish
// after starting graceful shutdown. Our `Router` has `TimeoutLayer` so
// requests will finish after at most 10 seconds.
_ = shutdown_signal(&mut terminate_rx) => {
debug!("signal received, starting graceful shutdown");
conn.as_mut().graceful_shutdown();
}
}
}
debug!("connection {remote_addr} closed");
// Drop the watch receiver to signal to `main` that this task is done.
drop(close_rx);
});
});
}
// We only care about the watch receivers that were moved into the tasks so close the residual
// receiver.
drop(close_rx);
// Close the listener to stop accepting new connections.
drop(listener);
// Wait for all tasks to complete.
debug!("waiting for {} tasks to finish", close_tx.receiver_count());
close_tx.closed().await;
}
pub async fn shutdown_signal(terminate_rx: &mut Fuse<Receiver<()>>) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
let terminate_rx = async {
terminate_rx
.await
.expect("failed to install oneshot channel handler");
};
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
_ = terminate_rx => {}
}
} Calling // Attempt to send a shutdown signal, if one hasn't already been sent
if let Some(tx) = SHUTDOWN_TX.lock().await.take() {
let _ = tx.send(());
} in a handler actually closes the connection but I can't break out of the main loop meaning it keeps accepting new connections. Any help would be appreciated. axum version0.7.2 |
Beta Was this translation helpful? Give feedback.
Answered by
yuezk
Dec 25, 2024
Replies: 1 comment 1 reply
-
Looking at it again, I realized there are two distinct shutdown signals needed for the server to exit. I prob. don't need this multi conn. setup but one oneshot channel for the connection shutdown and one oneshot for the main task do the job for now. |
Beta Was this translation helpful? Give feedback.
1 reply
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I implemented this with the tiny_http crate. Thus, we can easily control the main loop.