diff --git a/Cargo.toml b/Cargo.toml index b9c380663a..e279c0f8f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ camino = "1.0.9" cap-std-ext = "0.26" cap-std = { version = "0.25", features = ["fs_utf8"] } # Explicitly force on libc -rustix = { version = "0.35", features = ["use-libc"] } +rustix = { version = "0.35", features = ["use-libc", "net"] } cap-primitives = "0.25.2" cap-tempfile = "0.25.2" chrono = { version = "0.4.19", features = ["serde"] } diff --git a/Makefile-daemon.am b/Makefile-daemon.am index 25e753ee39..95e0f55bcb 100644 --- a/Makefile-daemon.am +++ b/Makefile-daemon.am @@ -69,7 +69,7 @@ systemdunit_other_files = \ $(NULL) if CLIENT_SOCKET - systemdunit_other_files += $(srcdir)/src/daemon/rpm-ostreed.socket +systemdunit_other_files += $(srcdir)/src/daemon/rpm-ostreed.socket endif systemdunit_DATA = \ diff --git a/configure.ac b/configure.ac index a3347e4b21..62ad3582b5 100644 --- a/configure.ac +++ b/configure.ac @@ -69,7 +69,6 @@ AC_ARG_ENABLE(featuresrs, AS_HELP_STRING([--enable-featuresrs], [Rust features, see Cargo.toml for more information]),, [enable_featuresrs=]) -AC_SUBST([RUST_FEATURES], $enable_featuresrs) AC_ARG_ENABLE(client-socket, AS_HELP_STRING([--enable-client-socket], @@ -78,6 +77,8 @@ AC_ARG_ENABLE(client-socket, AS_IF([test x$enable_client_socket = xyes], [enable_featuresrs="$enable_featuresrs client-socket"]) AM_CONDITIONAL(CLIENT_SOCKET, [echo $enable_featuresrs | grep -q 'client-socket']) +AC_SUBST([RUST_FEATURES], $enable_featuresrs) + # Initialize libtool LT_PREREQ([2.2.4]) LT_INIT([disable-static]) diff --git a/rust/src/client.rs b/rust/src/client.rs index fb00639f62..e01868601c 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -6,12 +6,11 @@ use crate::core::OSTREE_BOOTED; use crate::cxxrsutil::*; use crate::ffi::SystemHostType; use crate::utils; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use cap_std_ext::rustix; use gio::prelude::*; use ostree_ext::{gio, glib}; use std::os::unix::io::IntoRawFd; -use std::process::Command; /// The well-known bus name. const BUS_NAME: &str = "org.projectatomic.rpmostree1"; @@ -48,7 +47,8 @@ impl ClientConnection { SYSROOT_PATH, "org.projectatomic.rpmostree1.Sysroot", gio::NONE_CANCELLABLE, - )?; + ) + .context("Initializing sysroot proxy")?; // Today the daemon mode requires running inside a booted deployment. let booted = sysroot_proxy .cached_property("Booted") @@ -159,21 +159,40 @@ pub(crate) fn client_handle_fd_argument( /// returned cleanly. #[cfg(feature = "client-socket")] fn start_daemon_via_socket() -> Result<()> { - let address = "/run/rpm-ostree/client.sock"; - tracing::debug!("Starting daemon via {address}"); - let s = std::os::unix::net::UnixStream::connect(address) - .with_context(|| anyhow!("Failed to connect to {}", address))?; - let mut s = std::io::BufReader::new(s); - let mut r = String::new(); - s.read_to_string(&mut r) - .context("Reading from client socket")?; - if r.is_empty() { - Ok(()) - } else { - Err(anyhow!("{r}").into()) + use cap_std::io_lifetimes::IntoSocketlike; + + let address = sockaddr()?; + let socket = rustix::net::socket( + rustix::net::AddressFamily::UNIX, + rustix::net::SocketType::SEQPACKET, + rustix::net::Protocol::from_raw(0), + )?; + let addr = crate::client::sockaddr()?; + tracing::debug!("Starting daemon via {address:?}"); + rustix::net::connect_unix(&socket, &addr) + .with_context(|| anyhow!("Failed to connect to {address:?}"))?; + let socket = socket.into_socketlike(); + crate::daemon::write_message( + &socket, + crate::daemon::SocketMessage::ClientHello { + selfid: crate::core::self_id()?, + }, + )?; + let resp = crate::daemon::recv_message(&socket)?; + match resp { + crate::daemon::SocketMessage::ServerOk => Ok(()), + crate::daemon::SocketMessage::ServerError { msg } => { + Err(anyhow!("server error: {msg}").into()) + } + o => Err(anyhow!("unexpected message: {o:?}").into()), } } +/// Returns the address of the client socket. +pub(crate) fn sockaddr() -> Result { + rustix::net::SocketAddrUnix::new("/run/rpm-ostree/client.sock").map_err(anyhow::Error::msg) +} + /// Explicitly ensure the daemon is started via systemd, if possible. /// /// This works around bugs from DBus activation, see @@ -186,6 +205,8 @@ fn start_daemon_via_socket() -> Result<()> { /// What we really should do probably is use native socket activation. #[cfg(not(feature = "client-socket"))] fn start_daemon_via_systemctl() -> Result<()> { + use std::process::Command; + let service = "rpm-ostreed.service"; // Assume non-root can't use systemd right now. if rustix::process::getuid().as_raw() != 0 { diff --git a/rust/src/core.rs b/rust/src/core.rs index fa2c6ad2c9..2beac9e630 100644 --- a/rust/src/core.rs +++ b/rust/src/core.rs @@ -329,6 +329,14 @@ fn stage_container_rpm_files(rpms: Vec) -> CxxResult> { Ok(r) } +/// Return an opaque identifier for the current executing binary. This can +/// be passed via IPC to verify that client and server are running the same code. +pub(crate) fn self_id() -> Result { + use std::os::unix::fs::MetadataExt; + let metadata = std::fs::metadata("/proc/self/exe").context("Failed to read /proc/self/exe")?; + Ok(format!("dev={};inode={}", metadata.dev(), metadata.ino())) +} + #[cfg(test)] mod test { use super::*; diff --git a/rust/src/daemon.rs b/rust/src/daemon.rs index 334370ee09..b40a80b655 100644 --- a/rust/src/daemon.rs +++ b/rust/src/daemon.rs @@ -10,6 +10,7 @@ use crate::ffi::{ }; use anyhow::{anyhow, format_err, Context, Result}; use cap_std::fs::Dir; +use cap_std::io_lifetimes::{IntoSocketlike, OwnedFd, OwnedSocketlike}; use cap_std_ext::dirext::CapStdExtDirExt; use cap_std_ext::{cap_std, rustix}; use fn_error_context::context; @@ -18,16 +19,29 @@ use once_cell::sync::Lazy; use ostree_ext::{gio, glib, ostree}; use rustix::fd::{BorrowedFd, FromRawFd}; use rustix::fs::MetadataExt; +use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, BTreeSet}; -use std::io::{Read, Write}; +use std::io::Read; use std::os::unix::fs::PermissionsExt; use std::path::Path; use std::sync::Mutex; -use tokio::net::{UnixListener, UnixStream}; -use tokio::sync::oneshot::{Receiver, Sender}; +use tokio::sync::oneshot::Sender; const RPM_OSTREED_COMMIT_VERIFICATION_CACHE: &str = "rpm-ostree/gpgcheck-cache"; +// Messages sent across the socket +#[derive(Debug, Serialize, Deserialize)] +pub(crate) enum SocketMessage { + ClientHello { selfid: String }, + ServerOk, + ServerError { msg: String }, +} + +impl SocketMessage { + // Maximum size of a message. + pub(crate) const BUFSIZE: usize = 8192; +} + /// Validate basic assumptions on daemon startup. pub(crate) fn daemon_sanitycheck_environment(sysroot: &crate::FFIOstreeSysroot) -> CxxResult<()> { let sysroot = &sysroot.glib_reborrow(); @@ -135,57 +149,83 @@ fn deployment_populate_variant_origin( Ok(()) } -async fn send_ok_result_to_client(_client: UnixStream) { - // On success we close the stream without writing anything, - // which acknowledges successful startup to the client. - // In the future we may actually implement a protocol here, so this - // is stubbed out as a full async fn in preparation for that. +pub(crate) fn write_message(conn: &OwnedFd, message: SocketMessage) -> Result<()> { + let sendbuf = serde_json::to_vec(&message)?; + rustix::net::send(conn, &sendbuf, rustix::net::SendFlags::empty())?; + Ok(()) +} + +pub(crate) fn recv_message(conn: &OwnedFd) -> Result { + let mut buf = [0u8; SocketMessage::BUFSIZE]; + let n = rustix::net::recv(&conn, &mut buf, rustix::net::RecvFlags::empty())?; + if n == SocketMessage::BUFSIZE { + anyhow::bail!("Buffer filled to {n} bytes when reading"); + } + assert!(n < SocketMessage::BUFSIZE); + let buf = &buf[0..n]; + let msg: SocketMessage = + serde_json::from_slice(buf).context("Parsing client message")?; + Ok(msg) +} + +fn client_hello(client: OwnedSocketlike, e: anyhow::Result<()>) -> Result<()> { + let msg = recv_message(&client)?; + let reply = match msg { + SocketMessage::ClientHello { selfid } => { + let myid = crate::core::self_id()?; + if selfid != myid { + // For now, make this not an error + tracing::warn!("Client reported id: {selfid} different from mine: {myid}"); + } + match e { + Ok(()) => SocketMessage::ServerOk, + Err(e) => SocketMessage::ServerError { + msg: format!("{e}"), + }, + } + } + o => SocketMessage::ServerError { + msg: format!("Unexpected message: {o:?}"), + }, + }; + write_message(&client, reply).context("Writing client reply")?; tracing::debug!("Acknowleged client"); + Ok(()) } static SHUTDOWN_SIGNAL: Lazy>>> = Lazy::new(|| Mutex::new(None)); -async fn process_clients_with_ok(listener: UnixListener, mut cancel: Receiver<()>) { +fn run_acknowledgement_worker(listener: OwnedSocketlike) { tracing::debug!("Processing clients..."); loop { - tokio::select! { - _ = &mut cancel => { - tracing::debug!("Got cancellation event"); - return + let sock = match rustix::net::accept(&listener) { + Ok(s) => s, + Err(e) => { + tracing::warn!("Failed to accept client: {e}"); + continue; } - r = listener.accept() => { - match r { - Ok((stream, _addr)) => { - send_ok_result_to_client(stream).await; - }, - Err(e) => { - tracing::debug!("failed to accept client: {e}") - } - } + }; + std::thread::spawn(move || { + if let Err(e) = client_hello(sock.into_socketlike(), Ok(())) { + tracing::warn!("error acknowledging client: {e}"); } - } + }); } } /// Ensure all asynchronous tasks in this Rust half of the daemon code are stopped. /// Called from C++. pub(crate) fn daemon_terminate() { - let chan = (*SHUTDOWN_SIGNAL).lock().unwrap().take().unwrap(); - let _ = chan.send(()); + if let Some(chan) = (*SHUTDOWN_SIGNAL).lock().unwrap().take() { + let _ = chan.send(()); + } } -fn process_one_client(listener: std::os::unix::net::UnixListener, e: anyhow::Error) -> Result<()> { - let mut incoming = match listener.incoming().next() { - Some(r) => r?, - None => { - anyhow::bail!("Expected to find client socket from activation"); - } - }; - - let buf = format!("{e}"); - incoming.write_all(buf.as_bytes())?; - - todo!() +fn process_one_client(listener: OwnedSocketlike, e: anyhow::Error) -> Result<()> { + let incoming = rustix::net::accept(&listener)?; + client_hello(incoming.into_socketlike(), Err(e))?; + // Now that we've acknowledged one client, exit the process with an error + Ok(()) } /// Perform initialization steps required by systemd service activation. @@ -195,7 +235,6 @@ fn process_one_client(listener: std::os::unix::net::UnixListener, e: anyhow::Err pub(crate) fn daemon_main(debug: bool) -> Result<()> { let handle = tokio::runtime::Handle::current(); let _tokio_guard = handle.enter(); - use std::os::unix::net::UnixListener as StdUnixListener; if !systemd::daemon::booted()? { return Err(anyhow!("not running as a systemd service")); } @@ -204,54 +243,65 @@ pub(crate) fn daemon_main(debug: bool) -> Result<()> { tracing::debug!("Initialization result: {init_res:?}"); let mut fds = systemd::daemon::listen_fds(false)?.iter(); - let listener = match fds.next() { + let (listener, init_res) = match fds.next() { None => { // If started directly via `systemctl start` or DBus activation, we - // directly propagate the error back to our exit code. + // directly propagate the error back to our exit code without even bothering + // with a socket. init_res?; tracing::debug!("Initializing directly (not socket activated)"); - cfg!(feature = "client-socket") - .then(|| StdUnixListener::bind("/run/rpm-ostree/client.sock")) + let listener = cfg!(feature = "client-socket") + .then(|| { + let socket = rustix::net::socket( + rustix::net::AddressFamily::UNIX, + rustix::net::SocketType::SEQPACKET, + rustix::net::Protocol::from_raw(0), + )?; + let addr = crate::client::sockaddr()?; + rustix::net::bind_unix(&socket, &addr)?; + Ok::<_, anyhow::Error>(socket.into_socketlike()) + }) .transpose() - .context("Binding to socket")? + .context("Binding to socket")?; + (listener, Ok(())) } Some(fd) => { if fds.next().is_some() { return Err(anyhow!("Expected exactly 1 fd from systemd activation")); } tracing::debug!("Initializing from socket activation; fd={fd}"); - let listener = unsafe { StdUnixListener::from_raw_fd(fd) }; - - match init_res { - Ok(_) => Some(listener), - Err(e) => { - let err_copy = anyhow!("{e}"); - tracing::debug!("Reporting initialization error: {e}"); - match process_one_client(listener, err_copy) { - Ok(()) => { - tracing::debug!("Acknowleged initial client"); - } - Err(e) => { - tracing::debug!("Caught error while processing client {e}"); - } - } - return Err(e); - } - } + let listener = unsafe { OwnedFd::from_raw_fd(fd) }.into_socketlike(); + // In the socket case, we will process the initialization error later. + (Some(listener), init_res) } }; if let Some(listener) = listener { - let (shutdown_send, shutdown_recv) = tokio::sync::oneshot::channel(); - (*SHUTDOWN_SIGNAL).lock().unwrap().replace(shutdown_send); - - let listener = UnixListener::from_std(listener)?; - // On success, we spawn a helper task that just responds with // sucess to clients that connect via the socket. In the future, // perhaps we'll expose an API here. tracing::debug!("Spawning acknowledgement task"); - tokio::task::spawn(async { process_clients_with_ok(listener, shutdown_recv).await }); + match init_res { + Ok(()) => { + std::thread::spawn(move || run_acknowledgement_worker(listener)); + } + Err(e) => { + let err_copy = anyhow::format_err!("{e}"); + let r = std::thread::spawn(move || { + if let Err(suberr) = process_one_client(listener, err_copy) { + tracing::warn!("Failed to respond to client: {suberr}") + } + }); + // Block until we've written the reply to the client; + if let Err(e) = r.join() { + tracing::warn!("Failed to join response thread: {e:?}"); + } + // And finally propagate out the error + return Err(e); + } + }; + } else { + init_res?; } tracing::debug!("Entering daemon mainloop"); diff --git a/src/daemon/rpm-ostreed.socket b/src/daemon/rpm-ostreed.socket index 020c640360..ba05a151fc 100644 --- a/src/daemon/rpm-ostreed.socket +++ b/src/daemon/rpm-ostreed.socket @@ -2,7 +2,7 @@ ConditionKernelCommandLine=ostree [Socket] -ListenStream=/run/rpm-ostree/client.sock +ListenSequentialPacket=/run/rpm-ostree/client.sock SocketMode=0600 [Install]