diff --git a/rust/src/daemon.rs b/rust/src/daemon.rs index 242b825d88..2b952c0ce9 100644 --- a/rust/src/daemon.rs +++ b/rust/src/daemon.rs @@ -8,20 +8,23 @@ use crate::cxxrsutil::*; use crate::ffi::{ OverrideReplacementSource, OverrideReplacementType, ParsedRevision, ParsedRevisionKind, }; -use anyhow::{anyhow, format_err, Result, Context}; +use anyhow::{anyhow, format_err, Context, Result}; use cap_std::fs::Dir; use cap_std_ext::dirext::CapStdExtDirExt; use cap_std_ext::{cap_std, rustix}; use fn_error_context::context; use glib::prelude::*; +use once_cell::sync::Lazy; use ostree_ext::{gio, glib, ostree}; use rustix::fd::{BorrowedFd, FromRawFd}; use rustix::fs::MetadataExt; use std::collections::{BTreeMap, BTreeSet}; use std::io::{Read, Write}; use std::os::unix::fs::PermissionsExt; -use std::os::unix::net::{UnixStream, UnixListener}; use std::path::Path; +use std::sync::Mutex; +use tokio::net::{UnixListener, UnixStream}; +use tokio::sync::oneshot::{Receiver, Sender}; const RPM_OSTREED_COMMIT_VERIFICATION_CACHE: &str = "rpm-ostree/gpgcheck-cache"; @@ -137,7 +140,8 @@ fn deployment_populate_variant_origin( /// returned cleanly. pub(crate) fn start_daemon_via_socket() -> CxxResult<()> { let address = "/run/rpm-ostree/client.sock"; - let s = UnixStream::connect(address) + tracing::debug!("Starting daemon via {address}"); + let s = std::os::unix::net::UnixStream::connect(address) .with_context(|| anyhow!("Failed to connect to {}", address))?; let mut s = std::io::BufReader::new(s); let mut r = String::new(); @@ -150,44 +154,57 @@ pub(crate) fn start_daemon_via_socket() -> CxxResult<()> { } } -fn send_init_result_to_client(client: &UnixStream, err: &Result<()>) { - let mut client = std::io::BufWriter::new(client); - match err { - Ok(_) => { - // On successwe close the stream without writing anything, - // which acknowledges successful startup to the client. - } - Err(e) => { - let msg = e.to_string(); - match client - .write_all(msg.as_bytes()) - .and_then(|_| client.flush()) - { - Ok(_) => {} - Err(inner_err) => { - eprintln!( - "Failed to write error message to client socket (original error: {}): {}", - e, inner_err - ); +async fn send_ok_result_to_client(_client: UnixStream) { + // On success we close the stream without writing anything, + // which acknowledges successful startup to the client. + // In the future we may actually implement a protocol here, so this + // is stubbed out as a full async fn in preparation for that. + tracing::debug!("Acknowleged client"); +} + +static SHUTDOWN_SIGNAL: Lazy>>> = Lazy::new(|| Mutex::new(None)); + +async fn process_clients_with_ok(listener: UnixListener, mut cancel: Receiver<()>) { + tracing::debug!("Processing clients..."); + loop { + tokio::select! { + _ = &mut cancel => { + tracing::debug!("Got cancellation event"); + return + } + r = listener.accept() => { + match r { + Ok((stream, _addr)) => { + send_ok_result_to_client(stream).await; + }, + Err(e) => { + tracing::debug!("failed to accept client: {e}") + } } } } } } -fn process_clients(listener: UnixListener, res: &Result<()>) { - for stream in listener.incoming() { - match stream { - Ok(stream) => send_init_result_to_client(&stream, res), - Err(e) => { - // This shouldn't be fatal, we continue to start up. - eprintln!("Failed to listen for client stream: {}", e); - } - } - if res.is_err() { - break; +/// Ensure all asynchronous tasks in this Rust half of the daemon code are stopped. +/// Called from C++. +pub(crate) fn daemon_terminate() { + let chan = (*SHUTDOWN_SIGNAL).lock().unwrap().take().unwrap(); + let _ = chan.send(()); +} + +fn process_one_client(listener: std::os::unix::net::UnixListener, e: anyhow::Error) -> Result<()> { + let mut incoming = match listener.incoming().next() { + Some(r) => r?, + None => { + anyhow::bail!("Expected to find client socket from activation"); } - } + }; + + let buf = format!("{e}"); + incoming.write_all(buf.as_bytes())?; + + todo!() } /// Perform initialization steps required by systemd service activation. @@ -195,11 +212,15 @@ fn process_clients(listener: UnixListener, res: &Result<()>) { /// This ensures that the system is running under systemd, then receives the /// socket-FD for main IPC logic, and notifies systemd about ready-state. pub(crate) fn daemon_main(debug: bool) -> Result<()> { + let handle = tokio::runtime::Handle::current(); + let _tokio_guard = handle.enter(); + use std::os::unix::net::UnixListener as StdUnixListener; if !systemd::daemon::booted()? { return Err(anyhow!("not running as a systemd service")); } let init_res: Result<()> = crate::ffi::daemon_init_inner(debug).map_err(|e| e.into()); + tracing::debug!("Initialization result: {init_res:?}"); let mut fds = systemd::daemon::listen_fds(false)?.iter(); let listener = match fds.next() { @@ -207,29 +228,47 @@ pub(crate) fn daemon_main(debug: bool) -> Result<()> { // If started directly via `systemctl start` or DBus activation, we // directly propagate the error back to our exit code. init_res?; - UnixListener::bind("/run/rpmostreed.socket")? + tracing::debug!("Initializing directly (not socket activated)"); + StdUnixListener::bind("/run/rpm-ostree/client.sock")? } Some(fd) => { if fds.next().is_some() { return Err(anyhow!("Expected exactly 1 fd from systemd activation")); } - let listener = unsafe { UnixListener::from_raw_fd(fd) }; + tracing::debug!("Initializing from socket activation; fd={fd}"); + let listener = unsafe { StdUnixListener::from_raw_fd(fd) }; + match init_res { Ok(_) => listener, Err(e) => { - let err_copy = Err(anyhow!("{}", e)); - process_clients(listener, &err_copy); + let err_copy = anyhow!("{e}"); + tracing::debug!("Reporting initialization error: {e}"); + match process_one_client(listener, err_copy) { + Ok(()) => { + tracing::debug!("Acknowleged initial client"); + } + Err(e) => { + tracing::debug!("Caught error while processing client {e}"); + } + } return Err(e); } } } }; - // On success, we spawn a helper thread that just responds with + let (shutdown_send, shutdown_recv) = tokio::sync::oneshot::channel(); + (*SHUTDOWN_SIGNAL).lock().unwrap().replace(shutdown_send); + + let listener = UnixListener::from_std(listener)?; + + // On success, we spawn a helper task that just responds with // sucess to clients that connect via the socket. In the future, // perhaps we'll expose an API here. - std::thread::spawn(move || process_clients(listener, &Ok(()))); + tracing::debug!("Spawning acknowledgement task"); + tokio::task::spawn(async { process_clients_with_ok(listener, shutdown_recv).await }); + tracing::debug!("Entering daemon mainloop"); // And now, enter the main loop. Ok(crate::ffi::daemon_main_inner()?) } diff --git a/rust/src/lib.rs b/rust/src/lib.rs index ca3d046d6f..4e33c68d69 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -301,6 +301,7 @@ pub mod ffi { extern "Rust" { fn daemon_main(debug: bool) -> Result<()>; fn start_daemon_via_socket() -> Result<()>; + fn daemon_terminate(); fn daemon_sanitycheck_environment(sysroot: &OstreeSysroot) -> Result<()>; fn deployment_generate_id(deployment: &OstreeDeployment) -> String; fn deployment_populate_variant( diff --git a/rust/src/main.rs b/rust/src/main.rs index 4fb6b913fb..faf4fa305f 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -133,7 +133,9 @@ fn inner_main() -> Result { .enable_all() .build() .context("Failed to build tokio runtime")?; - runtime.block_on(dispatch_multicall(callname, args)) + let r = runtime.block_on(dispatch_multicall(callname, args)); + tracing::debug!("Exiting inner main with result: {r:?}"); + r } fn print_error(e: anyhow::Error) { diff --git a/src/app/libmain.cxx b/src/app/libmain.cxx index 676a11bdfd..161dfec268 100644 --- a/src/app/libmain.cxx +++ b/src/app/libmain.cxx @@ -281,7 +281,7 @@ rpmostree_option_context_parse (GOptionContext *context, const GOptionEntry *mai return rpmostreecxx::client_throw_non_ostree_host_error (error); } - rpmostreecxx::start_daemon_via_socket(); + CXX_TRY (rpmostreecxx::start_daemon_via_socket (), error); /* root never needs to auth */ if (getuid () != 0) diff --git a/src/app/rpmostree-builtin-start-daemon.cxx b/src/app/rpmostree-builtin-start-daemon.cxx index 0150437b13..79aacc3f84 100644 --- a/src/app/rpmostree-builtin-start-daemon.cxx +++ b/src/app/rpmostree-builtin-start-daemon.cxx @@ -34,8 +34,8 @@ #include "rpmostree-builtins.h" #include "rpmostree-libbuiltin.h" #include "rpmostree-util.h" -#include "rpmostreed-utils.h" #include "rpmostreed-daemon.h" +#include "rpmostreed-utils.h" typedef enum { @@ -213,13 +213,14 @@ on_log_handler (const gchar *log_domain, GLogLevelFlags log_level, const gchar * sd_journal_print (priority, "%s", message); } -namespace rpmostreecxx { +namespace rpmostreecxx +{ // This function is always called from the Rust side. Hopefully // soon we'll move more of this code into daemon.rs. void -daemon_init_inner (bool debug) +daemon_init_inner (bool debug) { - g_autoptr(GError) local_error = NULL; + g_autoptr (GError) local_error = NULL; if (debug) { g_autoptr (GIOChannel) channel = NULL; @@ -244,17 +245,17 @@ daemon_init_inner (bool debug) /* Get an explicit ref to the bus so we can use it later */ g_autoptr (GDBusConnection) bus = g_bus_get_sync (G_BUS_TYPE_SYSTEM, NULL, &local_error); if (!bus) - util::throw_gerror(local_error); + util::throw_gerror (local_error); if (!start_daemon (bus, &local_error)) { sd_notifyf (0, "STATUS=error: %s", local_error->message); - util::throw_gerror(local_error); + util::throw_gerror (local_error); } } // Called from rust side to enter mainloop. void -daemon_main_inner () +daemon_main_inner () { state_transition (APPSTATE_RUNNING); @@ -293,13 +294,10 @@ daemon_main_inner () } /* namespace */ gboolean -rpmostree_builtin_start_daemon (int argc, - char **argv, - RpmOstreeCommandInvocation *invocation, - GCancellable *cancellable, - GError **error) +rpmostree_builtin_start_daemon (int argc, char **argv, RpmOstreeCommandInvocation *invocation, + GCancellable *cancellable, GError **error) { - g_autoptr(GOptionContext) opt_context = g_option_context_new (" - start the daemon process"); + g_autoptr (GOptionContext) opt_context = g_option_context_new (" - start the daemon process"); g_option_context_add_main_entries (opt_context, opt_entries, NULL); if (!g_option_context_parse (opt_context, &argc, &argv, error)) diff --git a/src/daemon/rpmostreed-daemon.cxx b/src/daemon/rpmostreed-daemon.cxx index 74d46528c4..ffe2fa846c 100644 --- a/src/daemon/rpmostreed-daemon.cxx +++ b/src/daemon/rpmostreed-daemon.cxx @@ -799,6 +799,7 @@ rpmostreed_daemon_run_until_idle_exit (RpmostreedDaemon *self) update_status (self); while (self->running) g_main_context_iteration (NULL, TRUE); + rpmostreecxx::daemon_terminate (); } void