Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize: refine the scope of the lock. #350

Merged
merged 3 commits into from
Dec 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 62 additions & 58 deletions crates/runc-shim/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,17 @@ where
info!("Create request for {:?}", &req);
// Note: Get containers here is for getting the lock,
// to make sure no other threads manipulate the containers metadata;
let mut containers = self.containers.write().await;

let ns = self.namespace.as_str();
let id = req.id.as_str();

let container = self.factory.create(ns, &req).await?;
let mut resp = CreateTaskResponse::new();
let pid = container.pid().await as u32;
resp.pid = pid;

containers.insert(id.to_string(), container);
let pid = {
let mut containers = self.containers.write().await;
let container = self.factory.create(ns, &req).await?;
let pid = container.pid().await as u32;
resp.pid = pid;
containers.insert(id.to_string(), container);
pid
};

self.send_event(TaskCreate {
container_id: req.id.to_string(),
Expand All @@ -226,17 +226,19 @@ where

async fn start(&self, _ctx: &TtrpcContext, req: StartRequest) -> TtrpcResult<StartResponse> {
info!("Start request for {:?}", &req);
let mut container = self.container_mut(req.id()).await?;
// Prevent the init process from exiting and continuing with start
// Return early to reduce the time it takes to return only when runc encounters an error
if container.init_state().await == EnumOrUnknown::new(Status::STOPPED) {
debug!("container init process has exited, start process should not continue");
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::FAILED_PRECONDITION,
format!("container init process has exited {}", container.id().await),
)));
}
let pid = container.start(req.exec_id.as_str().as_option()).await?;
let pid = {
let mut container = self.container_mut(req.id()).await?;
// Prevent the init process from exiting and continuing with start
// Return early to reduce the time it takes to return only when runc encounters an error
if container.init_state().await == EnumOrUnknown::new(Status::STOPPED) {
debug!("container init process has exited, start process should not continue");
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::FAILED_PRECONDITION,
format!("container init process has exited {}", container.id().await),
)));
}
container.start(req.exec_id.as_str().as_option()).await?
};

let mut resp = StartResponse::new();
resp.pid = pid as u32;
Expand Down Expand Up @@ -268,19 +270,17 @@ where

async fn delete(&self, _ctx: &TtrpcContext, req: DeleteRequest) -> TtrpcResult<DeleteResponse> {
info!("Delete request for {:?}", &req);
let mut containers = self.containers.write().await;
let container = containers.get_mut(req.id()).ok_or_else(|| {
ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::NOT_FOUND,
format!("can not find container by id {}", req.id()),
))
})?;
let id = container.id().await;
let exec_id_opt = req.exec_id().as_option();
let (pid, exit_status, exited_at) = container.delete(exec_id_opt).await?;
self.factory.cleanup(&self.namespace, container).await?;
let (id, pid, exit_status, exited_at) = {
let mut container = self.container_mut(req.id()).await?;
let id = container.id().await;
let exec_id_opt = req.exec_id().as_option();
let (pid, exit_status, exited_at) = container.delete(exec_id_opt).await?;
self.factory.cleanup(&self.namespace, &container).await?;
(id, pid, exit_status, exited_at)
};

if req.exec_id().is_empty() {
containers.remove(req.id());
self.containers.write().await.remove(req.id());
}

let ts = convert_to_timestamp(exited_at);
Expand Down Expand Up @@ -308,8 +308,7 @@ where

async fn pids(&self, _ctx: &TtrpcContext, req: PidsRequest) -> TtrpcResult<PidsResponse> {
debug!("Pids request for {:?}", req);
let container = self.container(req.id()).await?;
let processes = container.all_processes().await?;
let processes = self.container(req.id()).await?.all_processes().await?;
debug!("Pids request for {:?} returns successfully", req);
Ok(PidsResponse {
processes,
Expand All @@ -319,8 +318,7 @@ where

async fn pause(&self, _ctx: &TtrpcContext, req: PauseRequest) -> TtrpcResult<Empty> {
info!("pause request for {:?}", req);
let mut container = self.container_mut(req.id()).await?;
container.pause().await?;
self.container_mut(req.id()).await?.pause().await?;
self.send_event(TaskPaused {
container_id: req.id.to_string(),
..Default::default()
Expand All @@ -332,8 +330,7 @@ where

async fn resume(&self, _ctx: &TtrpcContext, req: ResumeRequest) -> TtrpcResult<Empty> {
info!("resume request for {:?}", req);
let mut container = self.container_mut(req.id()).await?;
container.resume().await?;
self.container_mut(req.id()).await?.resume().await?;
self.send_event(TaskResumed {
container_id: req.id.to_string(),
..Default::default()
Expand All @@ -345,8 +342,8 @@ where

async fn kill(&self, _ctx: &TtrpcContext, req: KillRequest) -> TtrpcResult<Empty> {
info!("Kill request for {:?}", req);
let mut container = self.container_mut(req.id()).await?;
container
self.container_mut(req.id())
.await?
.kill(req.exec_id().as_option(), req.signal, req.all)
.await?;
info!("Kill request for {:?} returns successfully", req);
Expand All @@ -356,11 +353,15 @@ where
async fn exec(&self, _ctx: &TtrpcContext, req: ExecProcessRequest) -> TtrpcResult<Empty> {
info!("Exec request for {:?}", req);
let exec_id = req.exec_id().to_string();
let mut container = self.container_mut(req.id()).await?;
container.exec(req).await?;

let container_id = {
let mut container = self.container_mut(req.id()).await?;
container.exec(req).await?;
container.id().await
};

self.send_event(TaskExecAdded {
container_id: container.id().await,
container_id,
exec_id,
..Default::default()
})
Expand All @@ -374,16 +375,18 @@ where
"Resize pty request for container {}, exec_id: {}",
&req.id, &req.exec_id
);
let mut container = self.container_mut(req.id()).await?;
container
self.container_mut(req.id())
.await?
.resize_pty(req.exec_id().as_option(), req.height, req.width)
.await?;
Ok(Empty::new())
}

async fn close_io(&self, _ctx: &TtrpcContext, req: CloseIORequest) -> TtrpcResult<Empty> {
let mut container = self.container_mut(req.id()).await?;
container.close_io(req.exec_id().as_option()).await?;
self.container_mut(req.id())
.await?
.close_io(req.exec_id().as_option())
.await?;
Ok(Empty::new())
}

Expand All @@ -404,9 +407,7 @@ where
format!("failed to parse resource spec: {}", e),
))
})?;

let mut container = self.container_mut(&id).await?;
container.update(&resources).await?;
self.container_mut(&id).await?.update(&resources).await?;
Ok(Empty::new())
}

Expand All @@ -428,8 +429,11 @@ where

wait_rx.await.unwrap_or_default();
// get lock again.
let container = self.container(req.id()).await?;
let (_, code, exited_at) = container.get_exit_info(exec_id).await?;
let (_, code, exited_at) = self
.container(req.id())
.await?
.get_exit_info(exec_id)
.await?;
let mut resp = WaitResponse::new();
resp.set_exit_status(code as u32);
let ts = convert_to_timestamp(exited_at);
Expand All @@ -440,9 +444,7 @@ where

async fn stats(&self, _ctx: &TtrpcContext, req: StatsRequest) -> TtrpcResult<StatsResponse> {
debug!("Stats request for {:?}", req);
let container = self.container(req.id()).await?;
let stats = container.stats().await?;

let stats = self.container(req.id()).await?.stats().await?;
let mut resp = StatsResponse::new();
resp.set_stats(convert_to_any(Box::new(stats))?);
Ok(resp)
Expand All @@ -454,10 +456,12 @@ where
req: ConnectRequest,
) -> TtrpcResult<ConnectResponse> {
info!("Connect request for {:?}", req);
let mut pid: u32 = 0;
if let Ok(container) = self.container(req.id()).await {
pid = container.pid().await as u32;
}

let pid = if let Ok(container) = self.container(req.id()).await {
container.pid().await as u32
} else {
0
};

Ok(ConnectResponse {
shim_pid: std::process::id(),
Expand Down
Loading