From 652639d450097a98d601c189967d3227d35fb60c Mon Sep 17 00:00:00 2001 From: jokemanfire Date: Wed, 4 Dec 2024 15:33:25 +0800 Subject: [PATCH 1/2] Prevent the init process from exiting and continuing with start No need to wait for runc return an error Signed-off-by: jokemanfire --- crates/runc-shim/src/container.rs | 9 ++++++++- crates/runc-shim/src/task.rs | 14 +++++++++++--- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/crates/runc-shim/src/container.rs b/crates/runc-shim/src/container.rs index bd3bc5ab..21a2c88e 100644 --- a/crates/runc-shim/src/container.rs +++ b/crates/runc-shim/src/container.rs @@ -18,11 +18,12 @@ use std::collections::HashMap; use async_trait::async_trait; use containerd_shim::{ + api::Status, error::Result, protos::{ api::{CreateTaskRequest, ExecProcessRequest, ProcessInfo, StateResponse}, cgroups::metrics::Metrics, - protobuf::{well_known_types::any::Any, Message, MessageDyn}, + protobuf::{well_known_types::any::Any, EnumOrUnknown, Message, MessageDyn}, shim::oci::ProcessDetails, }, Error, @@ -58,6 +59,7 @@ pub trait Container { async fn close_io(&mut self, exec_id: Option<&str>) -> Result<()>; async fn pause(&mut self) -> Result<()>; async fn resume(&mut self) -> Result<()>; + async fn init_state(&self) -> EnumOrUnknown; } #[async_trait] @@ -95,6 +97,11 @@ where E: Process + Send + Sync, P: ProcessFactory + Send + Sync, { + async fn init_state(&self) -> EnumOrUnknown { + // Default should be unknown + self.init.state().await.unwrap_or_default().status + } + async fn start(&mut self, exec_id: Option<&str>) -> Result { let process = self.get_mut_process(exec_id)?; process.start().await?; diff --git a/crates/runc-shim/src/task.rs b/crates/runc-shim/src/task.rs index 97e2e941..9679c3fc 100644 --- a/crates/runc-shim/src/task.rs +++ b/crates/runc-shim/src/task.rs @@ -30,10 +30,9 @@ use containerd_shim::{ PidsResponse, StatsRequest, StatsResponse, UpdateTaskRequest, }, events::task::{TaskCreate, TaskDelete, TaskExecAdded, TaskExecStarted, TaskIO, TaskStart}, - protobuf::MessageDyn, + protobuf::{EnumOrUnknown, MessageDyn}, shim_async::Task, - ttrpc, - ttrpc::r#async::TtrpcContext, + ttrpc::{self, r#async::TtrpcContext}, }, util::{convert_to_any, convert_to_timestamp, AsOption}, TtrpcResult, @@ -228,6 +227,15 @@ where async fn start(&self, _ctx: &TtrpcContext, req: StartRequest) -> TtrpcResult { info!("Start request for {:?}", &req); let mut container = self.container_mut(req.id()).await?; + // Prevent the init process from exiting and continuing with start + // Return early to reduce the time it takes to return only when runc encounters an error + if container.init_state().await == EnumOrUnknown::new(Status::STOPPED) { + debug!("container init process has exited, start process should not continue"); + return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( + ttrpc::Code::FAILED_PRECONDITION, + format!("container init process has exited {}", container.id().await), + ))); + } let pid = container.start(req.exec_id.as_str().as_option()).await?; let mut resp = StartResponse::new(); From 6121223615f583b8cdf26f5de462f13dea5ed059 Mon Sep 17 00:00:00 2001 From: jokemanfire Date: Wed, 4 Dec 2024 10:18:21 +0800 Subject: [PATCH 2/2] Optimize: refine the scope of the lock. We can discard the lock before reporting the incident.According to Goshim. Signed-off-by: jokemanfire --- crates/runc-shim/src/task.rs | 120 ++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 58 deletions(-) diff --git a/crates/runc-shim/src/task.rs b/crates/runc-shim/src/task.rs index 9679c3fc..7f1d8562 100644 --- a/crates/runc-shim/src/task.rs +++ b/crates/runc-shim/src/task.rs @@ -191,17 +191,17 @@ where info!("Create request for {:?}", &req); // Note: Get containers here is for getting the lock, // to make sure no other threads manipulate the containers metadata; - let mut containers = self.containers.write().await; - let ns = self.namespace.as_str(); let id = req.id.as_str(); - - let container = self.factory.create(ns, &req).await?; let mut resp = CreateTaskResponse::new(); - let pid = container.pid().await as u32; - resp.pid = pid; - - containers.insert(id.to_string(), container); + let pid = { + let mut containers = self.containers.write().await; + let container = self.factory.create(ns, &req).await?; + let pid = container.pid().await as u32; + resp.pid = pid; + containers.insert(id.to_string(), container); + pid + }; self.send_event(TaskCreate { container_id: req.id.to_string(), @@ -226,17 +226,19 @@ where async fn start(&self, _ctx: &TtrpcContext, req: StartRequest) -> TtrpcResult { info!("Start request for {:?}", &req); - let mut container = self.container_mut(req.id()).await?; - // Prevent the init process from exiting and continuing with start - // Return early to reduce the time it takes to return only when runc encounters an error - if container.init_state().await == EnumOrUnknown::new(Status::STOPPED) { - debug!("container init process has exited, start process should not continue"); - return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::FAILED_PRECONDITION, - format!("container init process has exited {}", container.id().await), - ))); - } - let pid = container.start(req.exec_id.as_str().as_option()).await?; + let pid = { + let mut container = self.container_mut(req.id()).await?; + // Prevent the init process from exiting and continuing with start + // Return early to reduce the time it takes to return only when runc encounters an error + if container.init_state().await == EnumOrUnknown::new(Status::STOPPED) { + debug!("container init process has exited, start process should not continue"); + return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( + ttrpc::Code::FAILED_PRECONDITION, + format!("container init process has exited {}", container.id().await), + ))); + } + container.start(req.exec_id.as_str().as_option()).await? + }; let mut resp = StartResponse::new(); resp.pid = pid as u32; @@ -268,19 +270,17 @@ where async fn delete(&self, _ctx: &TtrpcContext, req: DeleteRequest) -> TtrpcResult { info!("Delete request for {:?}", &req); - let mut containers = self.containers.write().await; - let container = containers.get_mut(req.id()).ok_or_else(|| { - ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::NOT_FOUND, - format!("can not find container by id {}", req.id()), - )) - })?; - let id = container.id().await; - let exec_id_opt = req.exec_id().as_option(); - let (pid, exit_status, exited_at) = container.delete(exec_id_opt).await?; - self.factory.cleanup(&self.namespace, container).await?; + let (id, pid, exit_status, exited_at) = { + let mut container = self.container_mut(req.id()).await?; + let id = container.id().await; + let exec_id_opt = req.exec_id().as_option(); + let (pid, exit_status, exited_at) = container.delete(exec_id_opt).await?; + self.factory.cleanup(&self.namespace, &container).await?; + (id, pid, exit_status, exited_at) + }; + if req.exec_id().is_empty() { - containers.remove(req.id()); + self.containers.write().await.remove(req.id()); } let ts = convert_to_timestamp(exited_at); @@ -308,8 +308,7 @@ where async fn pids(&self, _ctx: &TtrpcContext, req: PidsRequest) -> TtrpcResult { debug!("Pids request for {:?}", req); - let container = self.container(req.id()).await?; - let processes = container.all_processes().await?; + let processes = self.container(req.id()).await?.all_processes().await?; debug!("Pids request for {:?} returns successfully", req); Ok(PidsResponse { processes, @@ -319,8 +318,7 @@ where async fn pause(&self, _ctx: &TtrpcContext, req: PauseRequest) -> TtrpcResult { info!("pause request for {:?}", req); - let mut container = self.container_mut(req.id()).await?; - container.pause().await?; + self.container_mut(req.id()).await?.pause().await?; self.send_event(TaskPaused { container_id: req.id.to_string(), ..Default::default() @@ -332,8 +330,7 @@ where async fn resume(&self, _ctx: &TtrpcContext, req: ResumeRequest) -> TtrpcResult { info!("resume request for {:?}", req); - let mut container = self.container_mut(req.id()).await?; - container.resume().await?; + self.container_mut(req.id()).await?.resume().await?; self.send_event(TaskResumed { container_id: req.id.to_string(), ..Default::default() @@ -345,8 +342,8 @@ where async fn kill(&self, _ctx: &TtrpcContext, req: KillRequest) -> TtrpcResult { info!("Kill request for {:?}", req); - let mut container = self.container_mut(req.id()).await?; - container + self.container_mut(req.id()) + .await? .kill(req.exec_id().as_option(), req.signal, req.all) .await?; info!("Kill request for {:?} returns successfully", req); @@ -356,11 +353,15 @@ where async fn exec(&self, _ctx: &TtrpcContext, req: ExecProcessRequest) -> TtrpcResult { info!("Exec request for {:?}", req); let exec_id = req.exec_id().to_string(); - let mut container = self.container_mut(req.id()).await?; - container.exec(req).await?; + + let container_id = { + let mut container = self.container_mut(req.id()).await?; + container.exec(req).await?; + container.id().await + }; self.send_event(TaskExecAdded { - container_id: container.id().await, + container_id, exec_id, ..Default::default() }) @@ -374,16 +375,18 @@ where "Resize pty request for container {}, exec_id: {}", &req.id, &req.exec_id ); - let mut container = self.container_mut(req.id()).await?; - container + self.container_mut(req.id()) + .await? .resize_pty(req.exec_id().as_option(), req.height, req.width) .await?; Ok(Empty::new()) } async fn close_io(&self, _ctx: &TtrpcContext, req: CloseIORequest) -> TtrpcResult { - let mut container = self.container_mut(req.id()).await?; - container.close_io(req.exec_id().as_option()).await?; + self.container_mut(req.id()) + .await? + .close_io(req.exec_id().as_option()) + .await?; Ok(Empty::new()) } @@ -404,9 +407,7 @@ where format!("failed to parse resource spec: {}", e), )) })?; - - let mut container = self.container_mut(&id).await?; - container.update(&resources).await?; + self.container_mut(&id).await?.update(&resources).await?; Ok(Empty::new()) } @@ -428,8 +429,11 @@ where wait_rx.await.unwrap_or_default(); // get lock again. - let container = self.container(req.id()).await?; - let (_, code, exited_at) = container.get_exit_info(exec_id).await?; + let (_, code, exited_at) = self + .container(req.id()) + .await? + .get_exit_info(exec_id) + .await?; let mut resp = WaitResponse::new(); resp.set_exit_status(code as u32); let ts = convert_to_timestamp(exited_at); @@ -440,9 +444,7 @@ where async fn stats(&self, _ctx: &TtrpcContext, req: StatsRequest) -> TtrpcResult { debug!("Stats request for {:?}", req); - let container = self.container(req.id()).await?; - let stats = container.stats().await?; - + let stats = self.container(req.id()).await?.stats().await?; let mut resp = StatsResponse::new(); resp.set_stats(convert_to_any(Box::new(stats))?); Ok(resp) @@ -454,10 +456,12 @@ where req: ConnectRequest, ) -> TtrpcResult { info!("Connect request for {:?}", req); - let mut pid: u32 = 0; - if let Ok(container) = self.container(req.id()).await { - pid = container.pid().await as u32; - } + + let pid = if let Ok(container) = self.container(req.id()).await { + container.pid().await as u32 + } else { + 0 + }; Ok(ConnectResponse { shim_pid: std::process::id(),