Skip to content

Commit

Permalink
Merge pull request #57 from cgwalters/rustix
Browse files Browse the repository at this point in the history
Port fully to rustix
  • Loading branch information
jmarrero authored Sep 19, 2023
2 parents d641f50 + 3de4bc3 commit 702dafb
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 54 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ nix = "0.26"
oci-spec = "0.5.5"
once_cell = "1.9.0"
libc = "0.2"
rustix = { version = "0.37", features = ["process"] }
rustix = { version = "0.38", features = ["process", "net"] }
serde = { features = ["derive"], version = "1.0.125" }
serde_json = "1.0.64"
semver = "1.0.4"
Expand Down
91 changes: 38 additions & 53 deletions src/imageproxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ use anyhow::{anyhow, Context, Result};
use cap_std_ext::prelude::CapStdExtCommandExt;
use cap_std_ext::{cap_std, cap_tempfile};
use futures_util::Future;
use nix::sys::socket::{self as nixsocket, ControlMessageOwned};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::ops::Range;
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::{CommandExt, FromRawFd, RawFd};
use std::os::fd::OwnedFd;
use std::os::unix::prelude::CommandExt;
use std::path::PathBuf;
use std::pin::Pin;
use std::process::{Command, Stdio};
Expand Down Expand Up @@ -86,7 +85,7 @@ type ChildFuture = Pin<

/// Manage a child process proxy to fetch container images.
pub struct ImageProxy {
sockfd: Arc<Mutex<File>>,
sockfd: Arc<Mutex<OwnedFd>>,
childwait: Arc<AsyncMutex<ChildFuture>>,
protover: semver::Version,
}
Expand All @@ -101,30 +100,6 @@ impl std::fmt::Debug for ImageProxy {
#[derive(Debug, PartialEq, Eq)]
pub struct OpenedImage(u32);

#[allow(unsafe_code)]
fn new_seqpacket_pair() -> Result<(File, File)> {
let (mysock, theirsock) = nixsocket::socketpair(
nixsocket::AddressFamily::Unix,
nixsocket::SockType::SeqPacket,
None,
nixsocket::SockFlag::SOCK_CLOEXEC,
)?;
// Convert to owned values
let mysock = unsafe { std::fs::File::from_raw_fd(mysock) };
let theirsock = unsafe { std::fs::File::from_raw_fd(theirsock) };
Ok((mysock, theirsock))
}

#[allow(unsafe_code)]
fn file_from_scm_rights(cmsg: ControlMessageOwned) -> Option<File> {
if let nixsocket::ControlMessageOwned::ScmRights(fds) = cmsg {
fds.first()
.map(|&fd| unsafe { std::fs::File::from_raw_fd(fd) })
} else {
None
}
}

/// Configuration for the proxy.
#[derive(Debug, Default)]
pub struct ImageProxyConfig {
Expand Down Expand Up @@ -265,7 +240,12 @@ impl ImageProxy {
#[instrument]
pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
let mut c = Command::try_from(config)?;
let (mysock, theirsock) = new_seqpacket_pair()?;
let (mysock, theirsock) = rustix::net::socketpair(
rustix::net::AddressFamily::UNIX,
rustix::net::SocketType::SEQPACKET,
rustix::net::SocketFlags::CLOEXEC,
None,
)?;
c.stdin(Stdio::from(theirsock));
let child = c.spawn().context("Failed to spawn skopeo")?;
tracing::debug!("Spawned skopeo pid={:?}", child.id());
Expand Down Expand Up @@ -301,36 +281,39 @@ impl ImageProxy {
}

async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static>(
sockfd: Arc<Mutex<File>>,
sockfd: Arc<Mutex<OwnedFd>>,
req: Request,
) -> Result<(T, Option<(File, u32)>)> {
) -> Result<(T, Option<(OwnedFd, u32)>)> {
tracing::trace!("sending request {}", req.method.as_str());
// TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio
let r = tokio::task::spawn_blocking(move || {
let sockfd = sockfd.lock().unwrap();
let sendbuf = serde_json::to_vec(&req)?;
nixsocket::send(sockfd.as_raw_fd(), &sendbuf, nixsocket::MsgFlags::empty())?;
let sockfd = &*sockfd;
rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
drop(sendbuf);
let mut buf = [0u8; MAX_MSG_SIZE];
let mut cmsg_buffer = nix::cmsg_space!([RawFd; 1]);
let mut cmsg_space = vec![0; rustix::cmsg_space!(ScmRights(1))];
let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(&mut cmsg_space);
let iov = std::io::IoSliceMut::new(buf.as_mut());
let mut iov = [iov];
let r = nixsocket::recvmsg::<()>(
sockfd.as_raw_fd(),
let nread = rustix::net::recvmsg(
sockfd,
&mut iov,
Some(&mut cmsg_buffer),
nixsocket::MsgFlags::MSG_CMSG_CLOEXEC,
)?;
// SAFETY: We provided a buffer
let iov = r.iovs().next().unwrap();
let mut fdret: Option<File> = None;
for cmsg in r.cmsgs() {
if let Some(f) = file_from_scm_rights(cmsg) {
fdret = Some(f);
break;
}
}
let reply: Reply = serde_json::from_slice(iov).context("Deserializing reply")?;
&mut cmsg_buffer,
rustix::net::RecvFlags::CMSG_CLOEXEC,
)?
.bytes;
let fdret = cmsg_buffer
.drain()
.filter_map(|m| match m {
rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
_ => None,
})
.flatten()
.next();
let buf = &buf[..nread];
let reply: Reply = serde_json::from_slice(buf).context("Deserializing reply")?;
if !reply.success {
return Err(anyhow!("remote error: {}", reply.error));
}
Expand Down Expand Up @@ -361,7 +344,7 @@ impl ImageProxy {
&self,
method: &str,
args: T,
) -> Result<(R, Option<(File, u32)>)>
) -> Result<(R, Option<(OwnedFd, u32)>)>
where
T: IntoIterator<Item = I>,
I: Into<serde_json::Value>,
Expand Down Expand Up @@ -420,9 +403,10 @@ impl ImageProxy {
Ok(r)
}

async fn read_all_fd(&self, fd: Option<(File, u32)>) -> Result<Vec<u8>> {
async fn read_all_fd(&self, fd: Option<(OwnedFd, u32)>) -> Result<Vec<u8>> {
let (fd, pipeid) = fd.ok_or_else(|| anyhow!("Missing fd from reply"))?;
let mut fd = tokio::io::BufReader::new(tokio::fs::File::from_std(fd));
let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
let mut fd = tokio::io::BufReader::new(fd);
let mut r = Vec::new();
let reader = fd.read_to_end(&mut r);
let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipeid));
Expand Down Expand Up @@ -491,7 +475,8 @@ impl ImageProxy {
vec![img.0.into(), digest.to_string().into(), size.into()];
let (_bloblen, fd) = self.impl_request::<i64, _, _>("GetBlob", args).await?;
let (fd, pipeid) = fd.ok_or_else(|| anyhow!("Missing fd from reply"))?;
let fd = tokio::io::BufReader::new(tokio::fs::File::from_std(fd));
let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
let fd = tokio::io::BufReader::new(fd);
let finish = Box::pin(self.finish_pipe(pipeid));
Ok((fd, finish))
}
Expand Down Expand Up @@ -519,7 +504,7 @@ impl ImageProxy {
let sendbuf = serde_json::to_vec(&req)?;
// SAFETY: Only panics if a worker thread already panic'd
let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
nixsocket::send(sockfd.as_raw_fd(), &sendbuf, nixsocket::MsgFlags::empty())?;
rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
drop(sendbuf);
tracing::debug!("sent shutdown request");
let mut childwait = self.childwait.lock().await;
Expand Down

0 comments on commit 702dafb

Please sign in to comment.