Skip to content

Commit

Permalink
use GracefulShutdown util (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
lperlaki authored May 28, 2024
1 parent 030c22c commit b089118
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 46 deletions.
5 changes: 3 additions & 2 deletions crates/s3s-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ clap = { version = "4.3.21", optional = true, features = ["derive"] }
crc32c = "0.6.4"
futures = "0.3.28"
hex-simd = "0.8.0"
hyper-util = { version = "0.1.3", optional = true, features = [
"server",
hyper-util = { version = "0.1.5", optional = true, features = [
"server-auto",
"server-graceful",
"http1",
"http2",
"tokio",
Expand Down
56 changes: 35 additions & 21 deletions crates/s3s-fs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,30 +116,44 @@ async fn run(opt: Opt) -> Result {

let hyper_service = service.into_shared();

let connection = ConnBuilder::new(TokioExecutor::new());

let server = async move {
loop {
let (socket, _) = match listener.accept().await {
Ok(ok) => ok,
Err(err) => {
tracing::error!("error accepting connection: {err}");
continue;
}
};
let service = hyper_service.clone();
let conn = connection.clone();
tokio::spawn(async move {
let _ = conn.serve_connection(TokioIo::new(socket), service).await;
});
}
};
let http_server = ConnBuilder::new(TokioExecutor::new());
let graceful = hyper_util::server::graceful::GracefulShutdown::new();

let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());

let task = tokio::spawn(server);
info!("server is running at http://{local_addr}");

tokio::signal::ctrl_c().await?;
task.abort();
loop {
let (socket, _) = tokio::select! {
res = listener.accept() => {
match res {
Ok(conn) => conn,
Err(err) => {
tracing::error!("error accepting connection: {err}");
continue;
}
}
}
_ = ctrl_c.as_mut() => {
break;
}
};

let conn = http_server.serve_connection(TokioIo::new(socket), hyper_service.clone());
let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
let _ = conn.await;
});
}

tokio::select! {
() = graceful.shutdown() => {
tracing::debug!("Gracefully shutdown!");
},
() = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
tracing::debug!("Waited 10 seconds for graceful shutdown, aborting...");
}
}

info!("server is stopped");
Ok(())
Expand Down
5 changes: 3 additions & 2 deletions crates/s3s-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ aws-config = { version = "1.1.7", default-features = false, features = [
aws-credential-types = "1.1.7"
aws-sdk-s3 = "1.17.0"
clap = { version = "4.3.21", features = ["derive"] }
hyper-util = { version = "0.1.3", features = [
"server",
hyper-util = { version = "0.1.5", features = [
"server-auto",
"server-graceful",
"http1",
"http2",
"tokio",
Expand Down
58 changes: 37 additions & 21 deletions crates/s3s-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,31 +77,47 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {

let hyper_service = service.into_shared();

let connection = ConnBuilder::new(TokioExecutor::new());

let server = async move {
loop {
let (socket, _) = match listener.accept().await {
Ok(ok) => ok,
Err(err) => {
tracing::error!("error accepting connection: {err}");
continue;
}
};
let service = hyper_service.clone();
let conn = connection.clone();
tokio::spawn(async move {
let _ = conn.serve_connection(TokioIo::new(socket), service).await;
});
}
};
let http_server = ConnBuilder::new(TokioExecutor::new());
let graceful = hyper_util::server::graceful::GracefulShutdown::new();

let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());

info!("server is running at http://{}:{}/", opt.host, opt.port);
info!("server is forwarding requests to {}", opt.endpoint_url);
let task = tokio::spawn(server);

tokio::signal::ctrl_c().await?;
task.abort();
loop {
let (socket, _) = tokio::select! {
res = listener.accept() => {
match res {
Ok(conn) => conn,
Err(err) => {
tracing::error!("error accepting connection: {err}");
continue;
}
}
}
_ = ctrl_c.as_mut() => {
break;
}
};

let conn = http_server.serve_connection(TokioIo::new(socket), hyper_service.clone());
let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
let _ = conn.await;
});
}

tokio::select! {
() = graceful.shutdown() => {
tracing::debug!("Gracefully shutdown!");
},
() = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
tracing::debug!("Waited 10 seconds for graceful shutdown, aborting...");
}
}

info!("server is stopped");

Ok(())
}

0 comments on commit b089118

Please sign in to comment.