diff --git a/Cargo.lock b/Cargo.lock index 5b19bc139384..27c1abaf8935 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7934,6 +7934,7 @@ dependencies = [ "parity-scale-codec", "polkadot-parachain", "polkadot-primitives", + "rand 0.8.5", "sc-executor", "sc-executor-common", "sc-executor-wasmtime", diff --git a/node/core/pvf/common/Cargo.toml b/node/core/pvf/common/Cargo.toml index dfb490455b3d..8de983180851 100644 --- a/node/core/pvf/common/Cargo.toml +++ b/node/core/pvf/common/Cargo.toml @@ -29,6 +29,7 @@ sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master [target.'cfg(target_os = "linux")'.dependencies] landlock = "0.2.0" +rand = "0.8.5" [dev-dependencies] assert_matches = "1.4.0" diff --git a/node/core/pvf/common/src/execute.rs b/node/core/pvf/common/src/execute.rs index de5ce39f7838..c988b2123a55 100644 --- a/node/core/pvf/common/src/execute.rs +++ b/node/core/pvf/common/src/execute.rs @@ -26,6 +26,8 @@ use std::time::Duration; pub struct Handshake { /// The executor parameters. pub executor_params: ExecutorParams, + /// Whether the host has determined that landlock is enabled. + pub landlock_enabled: bool, } /// The response from an execution job on the worker. diff --git a/node/core/pvf/common/src/prepare.rs b/node/core/pvf/common/src/prepare.rs index c205eddfb8b1..e3ae1d7f9a46 100644 --- a/node/core/pvf/common/src/prepare.rs +++ b/node/core/pvf/common/src/prepare.rs @@ -55,3 +55,11 @@ pub enum PrepareJobKind { /// A prechecking job. Prechecking, } + +/// The payload of the one-time handshake that is done when a worker process is created. Carries +/// data from the host to the worker. +#[derive(Encode, Decode)] +pub struct Handshake { + /// Whether the host has determined that landlock is enabled. + pub landlock_enabled: bool, +} diff --git a/node/core/pvf/common/src/worker/mod.rs b/node/core/pvf/common/src/worker/mod.rs index 8b41cb82f73b..e40c03dcf7ae 100644 --- a/node/core/pvf/common/src/worker/mod.rs +++ b/node/core/pvf/common/src/worker/mod.rs @@ -23,7 +23,7 @@ use cpu_time::ProcessTime; use futures::never::Never; use std::{ any::Any, - path::PathBuf, + path::{Path, PathBuf}, sync::mpsc::{Receiver, RecvTimeoutError}, time::Duration, }; @@ -70,17 +70,23 @@ macro_rules! decl_worker_main { } let mut node_version = None; - let mut socket_path: &str = ""; + let mut socket_path = None; + let mut cache_path = None; for i in (2..args.len()).step_by(2) { match args[i].as_ref() { - "--socket-path" => socket_path = args[i + 1].as_str(), + "--socket-path" => socket_path = Some(args[i + 1].as_str()), "--node-impl-version" => node_version = Some(args[i + 1].as_str()), + "--cache-path" => cache_path = Some(args[i + 1].as_str()), arg => panic!("Unexpected argument found: {}", arg), } } + let socket_path = socket_path.expect("the --socket-path argument is required"); + let cache_path = cache_path.expect("the --cache-path argument is required"); - $entrypoint(&socket_path, node_version, Some($worker_version)); + let cache_path = &std::path::Path::new(cache_path); + + $entrypoint(&socket_path, node_version, Some($worker_version), cache_path); } }; } @@ -102,6 +108,7 @@ pub fn worker_event_loop( socket_path: &str, node_version: Option<&str>, worker_version: Option<&str>, + cache_path: &Path, mut event_loop: F, ) where F: FnMut(UnixStream) -> Fut, @@ -115,6 +122,7 @@ pub fn worker_event_loop( if node_version != worker_version { gum::error!( target: LOG_TARGET, + %debug_id, %worker_pid, %node_version, %worker_version, @@ -127,8 +135,28 @@ pub fn worker_event_loop( } } + #[cfg(target_os = "linux")] + { + if let Err(err_ctx) = change_root(cache_path) { + let err = io::Error::last_os_error(); + gum::error!( + target: LOG_TARGET, + %debug_id, + %worker_pid, + %err_ctx, + ?cache_path, + "Could not change root to be the cache path: {}", + err + ); + worker_shutdown_message(debug_id, worker_pid, err); + return + } + } + remove_env_vars(debug_id); + gum::info!(target: LOG_TARGET, "5. {:?}", std::fs::read_dir(".").unwrap().map(|entry| entry.unwrap().path()).collect::>()); + // Run the main worker loop. let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it."); let err = rt @@ -151,6 +179,87 @@ pub fn worker_event_loop( rt.shutdown_background(); } +/// Change root to be the artifact directory. +#[cfg(target_os = "linux")] +fn change_root(cache_path: &Path) -> Result<(), &'static str> { + use rand::{distributions::Alphanumeric, Rng}; + use std::{ffi::CString, os::unix::ffi::OsStrExt, ptr}; + + const RANDOM_LEN: usize = 10; + let mut buf = Vec::with_capacity(RANDOM_LEN); + buf.extend(rand::thread_rng().sample_iter(&Alphanumeric).take(RANDOM_LEN)); + let s = std::str::from_utf8(&buf) + .expect("the string is collected from a valid utf-8 sequence; qed"); + + let cache_path_str = match cache_path.to_str() { + Some(s) => s, + None => return Err("cache path is not valid UTF-8") + }; + let cache_path_c = CString::new(cache_path.as_os_str().as_bytes()).unwrap(); + let root_absolute_c = CString::new("/").unwrap(); + // Append a random string to prevent races and to avoid dealing with the dir already existing. + let oldroot_relative_c = CString::new(format!("{}/oldroot-{}", cache_path_str, s)).unwrap(); + let oldroot_absolute_c = CString::new(format!("/oldroot-{}", s)).unwrap(); + + // SAFETY: TODO + unsafe { + // 1. `unshare` the user and the mount namespaces. + if libc::unshare(libc::CLONE_NEWUSER) < 0 { + return Err("unshare user namespace") + } + if libc::unshare(libc::CLONE_NEWNS) < 0 { + return Err("unshare mount namespace") + } + + // 2. `pivot_root` to the artifact directory. + gum::info!(target: LOG_TARGET, "1. {:?}", std::env::current_dir()); + gum::info!(target: LOG_TARGET, "1.5. {:?}", std::fs::read_dir(".").unwrap().map(|entry| entry.unwrap().path()).collect::>()); + // TODO: Ensure that 'new_root' and its parent mount don't have shared propagation. + if libc::mount(ptr::null(), root_absolute_c.as_ptr(), ptr::null(), libc::MS_REC | libc::MS_PRIVATE, ptr::null()) < 0 { + return Err("mount MS_PRIVATE") + } + if libc::mount( + cache_path_c.as_ptr(), + cache_path_c.as_ptr(), + ptr::null(), // ignored when MS_BIND is used + libc::MS_BIND | libc::MS_REC | libc::MS_NOEXEC, + ptr::null(), // ignored when MS_BIND is used + ) < 0 + { + return Err("mount MS_BIND") + } + if libc::mkdir(oldroot_relative_c.as_ptr(), 0755) < 0 { + return Err("mkdir oldroot") + } + if libc::syscall( + libc::SYS_pivot_root, + cache_path_c.as_ptr(), + oldroot_relative_c.as_ptr(), + ) < 0 + { + return Err("pivot_root") + } + + // 3. Change to the new root, `unmount2` and remove the old root. + if libc::chdir(root_absolute_c.as_ptr()) < 0 { + return Err("chdir to new root") + } + gum::info!(target: LOG_TARGET, "2. {:?}", std::env::current_dir()); + gum::info!(target: LOG_TARGET, "3. {:?}", std::fs::read_dir(".").unwrap().map(|entry| entry.unwrap().path()).collect::>()); + if libc::umount2(oldroot_absolute_c.as_ptr(), libc::MNT_DETACH) < 0 { + return Err("umount2 the oldroot") + } + if libc::rmdir(oldroot_absolute_c.as_ptr()) < 0 { + return Err("rmdir the oldroot") + } + gum::info!(target: LOG_TARGET, "4. {:?}", std::fs::read_dir(".").unwrap().map(|entry| entry.unwrap().path()).collect::>()); + + // TODO: do some assertions + } + + Ok(()) +} + /// Delete all env vars to prevent malicious code from accessing them. fn remove_env_vars(debug_id: &'static str) { for (key, value) in std::env::vars_os() { @@ -299,7 +408,7 @@ pub mod thread { Arc::new((Mutex::new(WaitOutcome::Pending), Condvar::new())) } - /// Runs a worker thread. Will first enable security features, and afterwards notify the threads + /// Runs a worker thread. Will run the requested function, and afterwards notify the threads /// waiting on the condvar. Catches panics during execution and resumes the panics after /// triggering the condvar, so that the waiting thread is notified on panics. /// diff --git a/node/core/pvf/common/src/worker/security.rs b/node/core/pvf/common/src/worker/security.rs index 6c5f96e0b5db..36fe07d03aaf 100644 --- a/node/core/pvf/common/src/worker/security.rs +++ b/node/core/pvf/common/src/worker/security.rs @@ -20,6 +20,7 @@ /// To what degree landlock is enabled. It's a separate struct from `RulesetStatus` because that is /// only available on Linux, plus this has a nicer name. +#[derive(Debug)] pub enum LandlockStatus { FullyEnforced, PartiallyEnforced, @@ -52,14 +53,19 @@ impl LandlockStatus { /// [landlock]: https://docs.rs/landlock/latest/landlock/index.html #[cfg(target_os = "linux")] pub mod landlock { - use landlock::{Access, AccessFs, Ruleset, RulesetAttr, RulesetError, RulesetStatus, ABI}; + pub use landlock::{path_beneath_rules, Access, AccessFs}; + + use landlock::{ + PathBeneath, PathFd, Ruleset, RulesetAttr, RulesetCreatedAttr, RulesetError, RulesetStatus, + ABI, + }; /// Landlock ABI version. We use ABI V1 because: /// /// 1. It is supported by our reference kernel version. /// 2. Later versions do not (yet) provide additional security. /// - /// # Versions (June 2023) + /// # Versions (as of June 2023) /// /// - Polkadot reference kernel version: 5.16+ /// - ABI V1: 5.13 - introduces landlock, including full restrictions on file reads @@ -87,10 +93,10 @@ pub mod landlock { /// Returns to what degree landlock is enabled with the given ABI on the current Linux /// environment. pub fn get_status() -> Result> { - match std::thread::spawn(|| try_restrict_thread()).join() { + match std::thread::spawn(|| try_restrict(std::iter::empty())).join() { Ok(Ok(status)) => Ok(status), Ok(Err(ruleset_err)) => Err(ruleset_err.into()), - Err(_err) => Err("a panic occurred in try_restrict_thread".into()), + Err(_err) => Err("a panic occurred in try_restrict".into()), } } @@ -108,20 +114,24 @@ pub mod landlock { status_is_fully_enabled(&get_status()) } - /// Tries to restrict the current thread with the following landlock access controls: + /// Tries to restrict the current thread (should only be called in a process' main thread) with + /// the following landlock access controls: /// - /// 1. all global filesystem access - /// 2. ... more may be supported in the future. + /// 1. all global filesystem access restricted, with optional exceptions + /// 2. ... more sandbox types (e.g. networking) may be supported in the future. /// /// If landlock is not supported in the current environment this is simply a noop. /// /// # Returns /// /// The status of the restriction (whether it was fully, partially, or not-at-all enforced). - pub fn try_restrict_thread() -> Result { + pub fn try_restrict( + fs_exceptions: impl Iterator, RulesetError>>, + ) -> Result { let status = Ruleset::new() .handle_access(AccessFs::from_all(LANDLOCK_ABI))? .create()? + .add_rules(fs_exceptions)? .restrict_self()?; Ok(status.ruleset) } @@ -132,55 +142,169 @@ pub mod landlock { use std::{fs, io::ErrorKind, thread}; #[test] - fn restricted_thread_cannot_access_fs() { + fn restricted_thread_cannot_read_file() { // TODO: This would be nice: . if !check_is_fully_enabled() { return } // Restricted thread cannot read from FS. - let handle = thread::spawn(|| { - // Write to a tmp file, this should succeed before landlock is applied. - let text = "foo"; - let tmpfile = tempfile::NamedTempFile::new().unwrap(); - let path = tmpfile.path(); - fs::write(path, text).unwrap(); - let s = fs::read_to_string(path).unwrap(); - assert_eq!(s, text); - - let status = try_restrict_thread().unwrap(); - if !matches!(status, RulesetStatus::FullyEnforced) { - panic!("Ruleset should be enforced since we checked if landlock is enabled"); - } - - // Try to read from the tmp file after landlock. - let result = fs::read_to_string(path); - assert!(matches!( - result, - Err(err) if matches!(err.kind(), ErrorKind::PermissionDenied) - )); - }); + let handle = + thread::spawn(|| { + // Create, write, and read two tmp files. This should succeed before any + // landlock restrictions are applied. + const TEXT: &str = "foo"; + let tmpfile1 = tempfile::NamedTempFile::new().unwrap(); + let path1 = tmpfile1.path(); + let tmpfile2 = tempfile::NamedTempFile::new().unwrap(); + let path2 = tmpfile2.path(); + + fs::write(path1, TEXT).unwrap(); + let s = fs::read_to_string(path1).unwrap(); + assert_eq!(s, TEXT); + fs::write(path2, TEXT).unwrap(); + let s = fs::read_to_string(path2).unwrap(); + assert_eq!(s, TEXT); + + // Apply Landlock with a read exception for only one of the files. + let status = try_restrict(path_beneath_rules( + &[path1], + AccessFs::from_read(LANDLOCK_ABI), + )); + if !matches!(status, Ok(RulesetStatus::FullyEnforced)) { + panic!("Ruleset should be enforced since we checked if landlock is enabled: {:?}", status); + } + + // Try to read from both files, only tmpfile1 should succeed. + let result = fs::read_to_string(path1); + assert!(matches!( + result, + Ok(s) if s == TEXT + )); + let result = fs::read_to_string(path2); + assert!(matches!( + result, + Err(err) if matches!(err.kind(), ErrorKind::PermissionDenied) + )); + + // Apply Landlock for all files. + let status = try_restrict(std::iter::empty()); + if !matches!(status, Ok(RulesetStatus::FullyEnforced)) { + panic!("Ruleset should be enforced since we checked if landlock is enabled: {:?}", status); + } + + // Try to read from tmpfile1 after landlock, it should fail. + let result = fs::read_to_string(path1); + assert!(matches!( + result, + Err(err) if matches!(err.kind(), ErrorKind::PermissionDenied) + )); + }); assert!(handle.join().is_ok()); + } + + #[test] + fn restricted_thread_cannot_write_file() { + // TODO: This would be nice: . + if !check_is_fully_enabled() { + return + } // Restricted thread cannot write to FS. - let handle = thread::spawn(|| { - let text = "foo"; - let tmpfile = tempfile::NamedTempFile::new().unwrap(); - let path = tmpfile.path(); - - let status = try_restrict_thread().unwrap(); - if !matches!(status, RulesetStatus::FullyEnforced) { - panic!("Ruleset should be enforced since we checked if landlock is enabled"); - } - - // Try to write to the tmp file after landlock. - let result = fs::write(path, text); - assert!(matches!( - result, - Err(err) if matches!(err.kind(), ErrorKind::PermissionDenied) - )); - }); + let handle = + thread::spawn(|| { + // Create and write two tmp files. This should succeed before any landlock + // restrictions are applied. + const TEXT: &str = "foo"; + let tmpfile1 = tempfile::NamedTempFile::new().unwrap(); + let path1 = tmpfile1.path(); + let tmpfile2 = tempfile::NamedTempFile::new().unwrap(); + let path2 = tmpfile2.path(); + + fs::write(path1, TEXT).unwrap(); + fs::write(path2, TEXT).unwrap(); + + // Apply Landlock with a write exception for only one of the files. + let status = try_restrict(path_beneath_rules( + &[path1], + AccessFs::from_write(LANDLOCK_ABI), + )); + if !matches!(status, Ok(RulesetStatus::FullyEnforced)) { + panic!("Ruleset should be enforced since we checked if landlock is enabled: {:?}", status); + } + + // Try to write to both files, only tmpfile1 should succeed. + let result = fs::write(path1, TEXT); + assert!(matches!(result, Ok(_))); + let result = fs::write(path2, TEXT); + assert!(matches!( + result, + Err(err) if matches!(err.kind(), ErrorKind::PermissionDenied) + )); + + // Apply Landlock for all files. + let status = try_restrict(std::iter::empty()); + if !matches!(status, Ok(RulesetStatus::FullyEnforced)) { + panic!("Ruleset should be enforced since we checked if landlock is enabled: {:?}", status); + } + + // Try to write to tmpfile1 after landlock, it should fail. + let result = fs::write(path1, TEXT); + assert!(matches!( + result, + Err(err) if matches!(err.kind(), ErrorKind::PermissionDenied) + )); + }); + + assert!(handle.join().is_ok()); + } + + #[test] + fn restricted_thread_can_read_files_but_not_list_dir() { + // TODO: This would be nice: . + if !check_is_fully_enabled() { + return + } + + // Restricted thread can read files but not list directory contents. + let handle = + thread::spawn(|| { + // Create, write to and read a tmp file. This should succeed before any landlock + // restrictions are applied. + const TEXT: &str = "foo"; + let tmpfile = tempfile::NamedTempFile::new().unwrap(); + let filepath = tmpfile.path(); + let dirpath = filepath.parent().unwrap(); + + fs::write(filepath, TEXT).unwrap(); + let s = fs::read_to_string(filepath).unwrap(); + assert_eq!(s, TEXT); + + // Apply Landlock with a general read exception for the directory, *without* the + // `ReadDir` exception. + let status = try_restrict(path_beneath_rules( + &[dirpath], + AccessFs::from_read(LANDLOCK_ABI) ^ AccessFs::ReadDir, + )); + if !matches!(status, Ok(RulesetStatus::FullyEnforced)) { + panic!("Ruleset should be enforced since we checked if landlock is enabled: {:?}", status); + } + + // Try to read file, should still be able to. + let result = fs::read_to_string(filepath); + assert!(matches!( + result, + Ok(s) if s == TEXT + )); + + // Try to list dir contents, should fail. + let result = fs::read_dir(dirpath); + assert!(matches!( + result, + Err(err) if matches!(err.kind(), ErrorKind::PermissionDenied) + )); + }); assert!(handle.join().is_ok()); } diff --git a/node/core/pvf/execute-worker/src/lib.rs b/node/core/pvf/execute-worker/src/lib.rs index 7a14de18a82f..7c6e3bf5db70 100644 --- a/node/core/pvf/execute-worker/src/lib.rs +++ b/node/core/pvf/execute-worker/src/lib.rs @@ -39,7 +39,7 @@ use polkadot_node_core_pvf_common::{ }; use polkadot_parachain::primitives::ValidationResult; use std::{ - path::PathBuf, + path::{Path, PathBuf}, sync::{mpsc::channel, Arc}, time::Duration, }; @@ -119,28 +119,75 @@ async fn send_response(stream: &mut UnixStream, response: Response) -> io::Resul /// /// # Parameters /// -/// The `socket_path` specifies the path to the socket used to communicate with the host. The -/// `node_version`, if `Some`, is checked against the worker version. A mismatch results in -/// immediate worker termination. `None` is used for tests and in other situations when version -/// check is not necessary. +/// - `socket_path` specifies the path to the socket used to communicate with the host. +/// +/// - `node_version`, if `Some`, is checked against the `worker_version`. A mismatch results in +/// immediate worker termination. `None` is used for tests and in other situations when version +/// check is not necessary. +/// +/// - `worker_version`: see above +/// +/// - `cache_path` contains the expected cache path for artifacts and is used to provide a sandbox +/// exception for landlock. pub fn worker_entrypoint( socket_path: &str, node_version: Option<&str>, worker_version: Option<&str>, + cache_path: &Path, ) { worker_event_loop( "execute", socket_path, node_version, worker_version, + cache_path, |mut stream| async move { let worker_pid = std::process::id(); - let handshake = recv_handshake(&mut stream).await?; - let executor = Executor::new(handshake.executor_params).map_err(|e| { + let Handshake { executor_params, landlock_enabled } = + recv_handshake(&mut stream).await?; + let executor = Executor::new(executor_params).map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) })?; + // Try to enable landlock. + { + #[cfg(target_os = "linux")] + let landlock_status = { + use polkadot_node_core_pvf_common::worker::security::landlock::{ + path_beneath_rules, try_restrict, Access, AccessFs, LANDLOCK_ABI, + }; + + // Allow an exception for reading from the artifact cache, but disallow listing + // the directory contents. Since we prepend artifact names with a random hash, + // this means attackers can't discover artifacts apart from the current job. + try_restrict(path_beneath_rules( + &[cache_path], + AccessFs::from_read(LANDLOCK_ABI) ^ AccessFs::ReadDir, + )) + .map(LandlockStatus::from_ruleset_status) + .map_err(|e| e.to_string()) + }; + #[cfg(not(target_os = "linux"))] + let landlock_status: Result = Ok(LandlockStatus::NotEnforced); + + // Error if the host determined that landlock is fully enabled and we couldn't fully + // enforce it here. + if landlock_enabled && !matches!(landlock_status, Ok(LandlockStatus::FullyEnforced)) + { + gum::warn!( + target: LOG_TARGET, + %worker_pid, + "could not fully enable landlock: {:?}", + landlock_status + ); + return Err(io::Error::new( + io::ErrorKind::Other, + format!("could not fully enable landlock: {:?}", landlock_status), + )) + } + } + loop { let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; gum::debug!( @@ -150,9 +197,11 @@ pub fn worker_entrypoint( artifact_path.display(), ); + if !artifact_path.starts_with(cache_path) { + return Err(io::Error::new(io::ErrorKind::Other, format!("received an artifact path {artifact_path:?} that does not belong to expected artifact dir {cache_path:?}"))) + } + // Get the artifact bytes. - // - // We do this outside the thread so that we can lock down filesystem access there. let compiled_artifact_blob = match std::fs::read(artifact_path) { Ok(bytes) => bytes, Err(err) => { @@ -187,22 +236,11 @@ pub fn worker_entrypoint( let execute_thread = thread::spawn_worker_thread_with_stack_size( "execute thread", move || { - // Try to enable landlock. - #[cfg(target_os = "linux")] - let landlock_status = polkadot_node_core_pvf_common::worker::security::landlock::try_restrict_thread() - .map(LandlockStatus::from_ruleset_status) - .map_err(|e| e.to_string()); - #[cfg(not(target_os = "linux"))] - let landlock_status: Result = Ok(LandlockStatus::NotEnforced); - - ( - validate_using_artifact( - &compiled_artifact_blob, - ¶ms, - executor_2, - cpu_time_start, - ), - landlock_status, + validate_using_artifact( + &compiled_artifact_blob, + ¶ms, + executor_2, + cpu_time_start, ) }, Arc::clone(&condvar), @@ -215,24 +253,9 @@ pub fn worker_entrypoint( let response = match outcome { WaitOutcome::Finished => { let _ = cpu_time_monitor_tx.send(()); - let (result, landlock_status) = execute_thread.join().unwrap_or_else(|e| { - ( - Response::Panic(stringify_panic_payload(e)), - Ok(LandlockStatus::Unavailable), - ) - }); - - // Log if landlock threw an error. - if let Err(err) = landlock_status { - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "error enabling landlock: {}", - err - ); - } - - result + execute_thread + .join() + .unwrap_or_else(|e| Response::Panic(stringify_panic_payload(e))) }, // If the CPU thread is not selected, we signal it to end, the join handle is // dropped and the thread will finish in the background. diff --git a/node/core/pvf/prepare-worker/src/lib.rs b/node/core/pvf/prepare-worker/src/lib.rs index caa7d33df12a..8eea18edb449 100644 --- a/node/core/pvf/prepare-worker/src/lib.rs +++ b/node/core/pvf/prepare-worker/src/lib.rs @@ -34,7 +34,7 @@ use polkadot_node_core_pvf_common::{ error::{PrepareError, PrepareResult}, executor_intf::Executor, framed_recv, framed_send, - prepare::{MemoryStats, PrepareJobKind, PrepareStats}, + prepare::{Handshake, MemoryStats, PrepareJobKind, PrepareStats}, pvf::PvfPrepData, worker::{ bytes_to_path, cpu_time_monitor_loop, @@ -47,7 +47,7 @@ use polkadot_node_core_pvf_common::{ }; use polkadot_primitives::ExecutorParams; use std::{ - path::PathBuf, + path::{Path, PathBuf}, sync::{mpsc::channel, Arc}, time::Duration, }; @@ -69,6 +69,17 @@ impl AsRef<[u8]> for CompiledArtifact { } } +async fn recv_handshake(stream: &mut UnixStream) -> io::Result { + let handshake_enc = framed_recv(stream).await?; + let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "prepare pvf recv_handshake: failed to decode Handshake".to_owned(), + ) + })?; + Ok(handshake) +} + async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> { let pvf = framed_recv(stream).await?; let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| { @@ -95,10 +106,16 @@ async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Re /// /// # Parameters /// -/// The `socket_path` specifies the path to the socket used to communicate with the host. The -/// `node_version`, if `Some`, is checked against the worker version. A mismatch results in -/// immediate worker termination. `None` is used for tests and in other situations when version -/// check is not necessary. +/// - `socket_path` specifies the path to the socket used to communicate with the host. +/// +/// - `node_version`, if `Some`, is checked against the `worker_version`. A mismatch results in +/// immediate worker termination. `None` is used for tests and in other situations when version +/// check is not necessary. +/// +/// - `worker_version`: see above +/// +/// - `cache_path` contains the expected cache path for artifacts and is used to provide a sandbox +/// exception for landlock. /// /// # Flow /// @@ -122,15 +139,62 @@ pub fn worker_entrypoint( socket_path: &str, node_version: Option<&str>, worker_version: Option<&str>, + cache_path: &Path, ) { worker_event_loop( "prepare", socket_path, node_version, worker_version, + cache_path, |mut stream| async move { let worker_pid = std::process::id(); + gum::info!(target: LOG_TARGET, "10. {:?}", std::fs::read_dir(".").unwrap().map(|entry| entry.unwrap().path()).collect::>()); + + let Handshake { landlock_enabled } = recv_handshake(&mut stream).await?; + + gum::info!(target: LOG_TARGET, "11. {:?}", std::fs::read_dir(".").unwrap().map(|entry| entry.unwrap().path()).collect::>()); + + // Try to enable landlock. + // { + // #[cfg(target_os = "linux")] + // let landlock_status = { + // use polkadot_node_core_pvf_common::worker::security::landlock::{ + // path_beneath_rules, try_restrict, Access, AccessFs, LANDLOCK_ABI, + // }; + + // // Allow an exception for writing to the artifact cache, with no allowance for + // // listing the directory contents. Since we prepend artifact names with a random + // // hash, this means attackers can't discover artifacts apart from the current + // // job. + // try_restrict(path_beneath_rules( + // &[cache_path], + // AccessFs::from_write(LANDLOCK_ABI), + // )) + // .map(LandlockStatus::from_ruleset_status) + // .map_err(|e| e.to_string()) + // }; + // #[cfg(not(target_os = "linux"))] + // let landlock_status: Result = Ok(LandlockStatus::NotEnforced); + + // // Error if the host determined that landlock is fully enabled and we couldn't fully + // // enforce it here. + // if landlock_enabled && !matches!(landlock_status, Ok(LandlockStatus::FullyEnforced)) + // { + // gum::warn!( + // target: LOG_TARGET, + // %worker_pid, + // "could not fully enable landlock: {:?}", + // landlock_status + // ); + // return Err(io::Error::new( + // io::ErrorKind::Other, + // format!("could not fully enable landlock: {:?}", landlock_status), + // )) + // } + // } + loop { let (pvf, temp_artifact_dest) = recv_request(&mut stream).await?; gum::debug!( @@ -139,6 +203,10 @@ pub fn worker_entrypoint( "worker: preparing artifact", ); + // if !temp_artifact_dest.starts_with(cache_path) { + // return Err(io::Error::new(io::ErrorKind::Other, format!("received an artifact path {temp_artifact_dest:?} that does not belong to expected cache path {cache_path:?}"))) + // } + let preparation_timeout = pvf.prep_timeout(); let prepare_job_kind = pvf.prep_kind(); let executor_params = (*pvf.executor_params()).clone(); @@ -172,14 +240,6 @@ pub fn worker_entrypoint( let prepare_thread = thread::spawn_worker_thread( "prepare thread", move || { - // Try to enable landlock. - #[cfg(target_os = "linux")] - let landlock_status = polkadot_node_core_pvf_common::worker::security::landlock::try_restrict_thread() - .map(LandlockStatus::from_ruleset_status) - .map_err(|e| e.to_string()); - #[cfg(not(target_os = "linux"))] - let landlock_status: Result = Ok(LandlockStatus::NotEnforced); - #[allow(unused_mut)] let mut result = prepare_artifact(pvf, cpu_time_start); @@ -200,7 +260,7 @@ pub fn worker_entrypoint( }); } - (result, landlock_status) + result }, Arc::clone(&condvar), WaitOutcome::Finished, @@ -213,16 +273,13 @@ pub fn worker_entrypoint( let _ = cpu_time_monitor_tx.send(()); match prepare_thread.join().unwrap_or_else(|err| { - ( - Err(PrepareError::Panic(stringify_panic_payload(err))), - Ok(LandlockStatus::Unavailable), - ) + Err(PrepareError::Panic(stringify_panic_payload(err))) }) { - (Err(err), _) => { + Err(err) => { // Serialized error will be written into the socket. Err(err) }, - (Ok(ok), landlock_status) => { + Ok(ok) => { #[cfg(not(target_os = "linux"))] let (artifact, cpu_time_elapsed) = ok; #[cfg(target_os = "linux")] @@ -242,16 +299,6 @@ pub fn worker_entrypoint( max_rss: extract_max_rss_stat(max_rss, worker_pid), }; - // Log if landlock threw an error. - if let Err(err) = landlock_status { - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "error enabling landlock: {}", - err - ); - } - // Write the serialized artifact into a temp file. // // PVF host only keeps artifacts statuses in its memory, diff --git a/node/core/pvf/src/execute/queue.rs b/node/core/pvf/src/execute/queue.rs index acb260e25693..b6dcac9abf91 100644 --- a/node/core/pvf/src/execute/queue.rs +++ b/node/core/pvf/src/execute/queue.rs @@ -141,6 +141,8 @@ struct Queue { program_path: PathBuf, spawn_timeout: Duration, node_version: Option, + cache_path: PathBuf, + landlock_enabled: bool, /// The queue of jobs that are waiting for a worker to pick up. queue: VecDeque, @@ -155,6 +157,8 @@ impl Queue { worker_capacity: usize, spawn_timeout: Duration, node_version: Option, + cache_path: PathBuf, + landlock_enabled: bool, to_queue_rx: mpsc::Receiver, ) -> Self { Self { @@ -162,6 +166,8 @@ impl Queue { program_path, spawn_timeout, node_version, + cache_path, + landlock_enabled, to_queue_rx, queue: VecDeque::new(), mux: Mux::new(), @@ -408,6 +414,8 @@ fn spawn_extra_worker(queue: &mut Queue, job: ExecuteJob) { job, queue.spawn_timeout, queue.node_version.clone(), + queue.cache_path.clone(), + queue.landlock_enabled, ) .boxed(), ); @@ -426,6 +434,8 @@ async fn spawn_worker_task( job: ExecuteJob, spawn_timeout: Duration, node_version: Option, + cache_path: PathBuf, + landlock_enabled: bool, ) -> QueueEvent { use futures_timer::Delay; @@ -435,6 +445,8 @@ async fn spawn_worker_task( job.executor_params.clone(), spawn_timeout, node_version.as_deref(), + &cache_path, + landlock_enabled, ) .await { @@ -499,6 +511,8 @@ pub fn start( worker_capacity: usize, spawn_timeout: Duration, node_version: Option, + cache_path: PathBuf, + landlock_enabled: bool, ) -> (mpsc::Sender, impl Future) { let (to_queue_tx, to_queue_rx) = mpsc::channel(20); let run = Queue::new( @@ -507,6 +521,8 @@ pub fn start( worker_capacity, spawn_timeout, node_version, + cache_path, + landlock_enabled, to_queue_rx, ) .run(); diff --git a/node/core/pvf/src/execute/worker_intf.rs b/node/core/pvf/src/execute/worker_intf.rs index 948abd2261d7..6e39c6a3eb9e 100644 --- a/node/core/pvf/src/execute/worker_intf.rs +++ b/node/core/pvf/src/execute/worker_intf.rs @@ -46,14 +46,21 @@ pub async fn spawn( executor_params: ExecutorParams, spawn_timeout: Duration, node_version: Option<&str>, + cache_path: &Path, + landlock_enabled: bool, ) -> Result<(IdleWorker, WorkerHandle), SpawnErr> { - let mut extra_args = vec!["execute-worker"]; + let cache_path_str = match cache_path.to_str() { + Some(a) => a, + None => return Err(SpawnErr::InvalidCachePath(cache_path.to_owned())), + }; + let mut extra_args = vec!["execute-worker", "--cache-path", cache_path_str]; if let Some(node_version) = node_version { extra_args.extend_from_slice(&["--node-impl-version", node_version]); } + let (mut idle_worker, worker_handle) = - spawn_with_program_path("execute", program_path, &extra_args, spawn_timeout).await?; - send_handshake(&mut idle_worker.stream, Handshake { executor_params }) + spawn_with_program_path("execute", program_path, Some(cache_path), &extra_args, spawn_timeout).await?; + send_handshake(&mut idle_worker.stream, Handshake { executor_params, landlock_enabled }) .await .map_err(|error| { gum::warn!( diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 9f3b7e23fd89..0c15ecf14943 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -202,8 +202,8 @@ impl Config { pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future) { gum::debug!(target: LOG_TARGET, ?config, "starting PVF validation host"); - // Run checks for supported security features once per host startup. - warn_if_no_landlock(); + // Run checks for supported security features once per host startup. Warn here if not enabled. + let landlock_enabled = check_landlock(); let (to_host_tx, to_host_rx) = mpsc::channel(10); @@ -215,6 +215,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future (ValidationHost, impl Future impl futures::Stream } /// Check if landlock is supported and emit a warning if not. -fn warn_if_no_landlock() { +fn check_landlock() -> bool { #[cfg(target_os = "linux")] { use polkadot_node_core_pvf_common::worker::security::landlock; + let status = landlock::get_status(); if !landlock::status_is_fully_enabled(&status) { let abi = landlock::LANDLOCK_ABI as u8; @@ -887,14 +891,20 @@ fn warn_if_no_landlock() { %abi, "Cannot fully enable landlock, a Linux kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider upgrading the kernel version for maximum security." ); + false + } else { + true } } #[cfg(not(target_os = "linux"))] - gum::warn!( - target: LOG_TARGET, - "Cannot enable landlock, a Linux kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider running on Linux with landlock support for maximum security." - ); + { + gum::warn!( + target: LOG_TARGET, + "Cannot enable landlock, a Linux kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider running on Linux with landlock support for maximum security." + ); + false + } } #[cfg(test)] diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 92aa4896c263..a3b44642b0a0 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -110,10 +110,12 @@ enum PoolEvent { type Mux = FuturesUnordered>; struct Pool { + // Some variables related to the current session. program_path: PathBuf, cache_path: PathBuf, spawn_timeout: Duration, node_version: Option, + landlock_enabled: bool, to_pool: mpsc::Receiver, from_pool: mpsc::UnboundedSender, @@ -132,6 +134,7 @@ async fn run( cache_path, spawn_timeout, node_version, + landlock_enabled, to_pool, mut from_pool, mut spawned, @@ -160,6 +163,7 @@ async fn run( &cache_path, spawn_timeout, node_version.clone(), + landlock_enabled, &mut spawned, &mut mux, to_pool, @@ -207,6 +211,7 @@ fn handle_to_pool( cache_path: &Path, spawn_timeout: Duration, node_version: Option, + landlock_enabled: bool, spawned: &mut HopSlotMap, mux: &mut Mux, to_pool: ToPool, @@ -216,7 +221,14 @@ fn handle_to_pool( gum::debug!(target: LOG_TARGET, "spawning a new prepare worker"); metrics.prepare_worker().on_begin_spawn(); mux.push( - spawn_worker_task(program_path.to_owned(), spawn_timeout, node_version).boxed(), + spawn_worker_task( + program_path.to_owned(), + spawn_timeout, + node_version, + cache_path.to_owned(), + landlock_enabled, + ) + .boxed(), ); }, ToPool::StartWork { worker, pvf, artifact_path } => { @@ -260,11 +272,21 @@ async fn spawn_worker_task( program_path: PathBuf, spawn_timeout: Duration, node_version: Option, + cache_path: PathBuf, + landlock_enabled: bool, ) -> PoolEvent { use futures_timer::Delay; loop { - match worker_intf::spawn(&program_path, spawn_timeout, node_version.as_deref()).await { + match worker_intf::spawn( + &program_path, + spawn_timeout, + node_version.as_deref(), + &cache_path, + landlock_enabled, + ) + .await + { Ok((idle, handle)) => break PoolEvent::Spawn(idle, handle), Err(err) => { gum::warn!(target: LOG_TARGET, "failed to spawn a prepare worker: {:?}", err); @@ -434,6 +456,7 @@ pub fn start( cache_path: PathBuf, spawn_timeout: Duration, node_version: Option, + landlock_enabled: bool, ) -> (mpsc::Sender, mpsc::UnboundedReceiver, impl Future) { let (to_pool_tx, to_pool_rx) = mpsc::channel(10); let (from_pool_tx, from_pool_rx) = mpsc::unbounded(); @@ -444,6 +467,7 @@ pub fn start( cache_path, spawn_timeout, node_version, + landlock_enabled, to_pool: to_pool_rx, from_pool: from_pool_tx, spawned: HopSlotMap::with_capacity_and_key(20), diff --git a/node/core/pvf/src/prepare/worker_intf.rs b/node/core/pvf/src/prepare/worker_intf.rs index 5280ab6b42a2..52fb8b75a1f5 100644 --- a/node/core/pvf/src/prepare/worker_intf.rs +++ b/node/core/pvf/src/prepare/worker_intf.rs @@ -28,7 +28,7 @@ use parity_scale_codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ error::{PrepareError, PrepareResult}, framed_recv, framed_send, - prepare::PrepareStats, + prepare::{Handshake, PrepareStats}, pvf::PvfPrepData, }; @@ -46,12 +46,38 @@ pub async fn spawn( program_path: &Path, spawn_timeout: Duration, node_version: Option<&str>, + cache_path: &Path, + landlock_enabled: bool, ) -> Result<(IdleWorker, WorkerHandle), SpawnErr> { - let mut extra_args = vec!["prepare-worker"]; + let cache_path_str = match cache_path.to_str() { + Some(a) => a, + None => return Err(SpawnErr::InvalidCachePath(cache_path.to_owned())), + }; + let mut extra_args = vec!["prepare-worker", "--cache-path", cache_path_str]; if let Some(node_version) = node_version { extra_args.extend_from_slice(&["--node-impl-version", node_version]); } - spawn_with_program_path("prepare", program_path, &extra_args, spawn_timeout).await + + let (mut idle_worker, worker_handle) = spawn_with_program_path( + "prepare", + program_path, + Some(cache_path), + &extra_args, + spawn_timeout, + ) + .await?; + send_handshake(&mut idle_worker.stream, Handshake { landlock_enabled }) + .await + .map_err(|error| { + gum::warn!( + target: LOG_TARGET, + worker_pid = %idle_worker.pid, + ?error, + "failed to send a handshake to the spawned worker", + ); + SpawnErr::Handshake + })?; + Ok((idle_worker, worker_handle)) } pub enum Outcome { @@ -97,6 +123,12 @@ pub async fn start_work( ); with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move { + // Linux: Pass the socket path relative to the cache_path (what the child thinks is root). + #[cfg(target_os = "linux")] + let tmp_file = Path::new(".").with_file_name( + tmp_file.file_name().expect("tmp files are created with a filename; qed"), + ); + let preparation_timeout = pvf.prep_timeout(); if let Err(err) = send_request(&mut stream, pvf, &tmp_file).await { gum::warn!( @@ -278,6 +310,10 @@ async fn send_request( Ok(()) } +async fn send_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Result<()> { + framed_send(stream, &handshake.encode()).await +} + async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result { let result = framed_recv(stream).await?; let result = PrepareResult::decode(&mut &result[..]).map_err(|e| { diff --git a/node/core/pvf/src/testing.rs b/node/core/pvf/src/testing.rs index 980a28c01566..129d55337ca3 100644 --- a/node/core/pvf/src/testing.rs +++ b/node/core/pvf/src/testing.rs @@ -75,17 +75,23 @@ macro_rules! decl_puppet_worker_main { }; let mut node_version = None; - let mut socket_path: &str = ""; + let mut socket_path = None; + let mut cache_path = None; for i in (2..args.len()).step_by(2) { match args[i].as_ref() { - "--socket-path" => socket_path = args[i + 1].as_str(), + "--socket-path" => socket_path = Some(args[i + 1].as_str()), "--node-impl-version" => node_version = Some(args[i + 1].as_str()), + "--cache-path" => cache_path = Some(args[i + 1].as_str()), arg => panic!("Unexpected argument found: {}", arg), } } + let socket_path = socket_path.expect("the --socket-path argument is required"); + let cache_path = cache_path.expect("the --cache-path argument is required"); - entrypoint(&socket_path, node_version, None); + let cache_path = &std::path::Path::new(cache_path); + + entrypoint(&socket_path, node_version, None, cache_path); } }; } diff --git a/node/core/pvf/src/worker_intf.rs b/node/core/pvf/src/worker_intf.rs index 795ad4524443..81fe79ae4349 100644 --- a/node/core/pvf/src/worker_intf.rs +++ b/node/core/pvf/src/worker_intf.rs @@ -39,15 +39,31 @@ use tokio::{ pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4; /// This is publicly exposed only for integration tests. +/// +/// # Parameters +/// +/// - `debug_id`: An identifier for the process (e.g. "execute" or "prepare"). +/// +/// - `program_path`: The path to the program. +/// +/// - `socket_dir_path`: An optional path to the dir where the socket should be created, if `None` +/// use a temp dir. +/// +/// - `extra_args`: Optional extra CLI arguments to the program. NOTE: Should only contain data +/// required before the handshake, like node/worker versions for the version check. Other data +/// should go through the handshake. +/// +/// - `spawn_timeout`: The amount of time to wait for the child process to spawn. #[doc(hidden)] pub async fn spawn_with_program_path( debug_id: &'static str, program_path: impl Into, + socket_dir_path: Option<&Path>, extra_args: &[&str], spawn_timeout: Duration, ) -> Result<(IdleWorker, WorkerHandle), SpawnErr> { let program_path = program_path.into(); - with_transient_socket_path(debug_id, |socket_path| { + with_transient_socket_path(debug_id, socket_dir_path, |socket_path| { let socket_path = socket_path.to_owned(); let extra_args: Vec = extra_args.iter().map(|arg| arg.to_string()).collect(); @@ -109,14 +125,23 @@ pub async fn spawn_with_program_path( .await } -async fn with_transient_socket_path(debug_id: &'static str, f: F) -> Result +async fn with_transient_socket_path( + debug_id: &'static str, + socket_dir_path: Option<&Path>, + f: F, +) -> Result where F: FnOnce(&Path) -> Fut, Fut: futures::Future> + 'static, { - let socket_path = tmpfile(&format!("pvf-host-{}", debug_id)) - .await - .map_err(|_| SpawnErr::TmpFile)?; + let socket_prefix = format!("pvf-host-{}-", debug_id); + let socket_path = if let Some(socket_dir_path) = socket_dir_path { + tmpfile_in(&socket_prefix, socket_dir_path).await + } else { + tmpfile(&socket_prefix).await + } + .map_err(|_| SpawnErr::TmpFile)?; + let result = f(&socket_path).await; // Best effort to remove the socket file. Under normal circumstances the socket will be removed @@ -194,6 +219,8 @@ pub enum SpawnErr { AcceptTimeout, /// Failed to send handshake after successful spawning was signaled Handshake, + /// Cache path is not a valid UTF-8 str. + InvalidCachePath(PathBuf), } /// This is a representation of a potentially running worker. Drop it and the process will be @@ -221,10 +248,22 @@ impl WorkerHandle { extra_args: &[String], socket_path: impl AsRef, ) -> io::Result { + // Linux: Pass the socket path relative to the cache_path (what the child thinks is root). + #[cfg(target_os = "linux")] + let socket_path = Path::new(".").with_file_name( + socket_path + .as_ref() + .file_name() + .expect("socket paths are created with a filename; qed"), + ); + // Non-Linux: We are only able to pivot-root on Linux, so pass the socket path as-is. + #[cfg(not(target_os = "linux"))] + let socket_path = socket_path.as_ref().as_os_str(); + let mut child = process::Command::new(program.as_ref()) .args(extra_args) .arg("--socket-path") - .arg(socket_path.as_ref().as_os_str()) + .arg(socket_path) .stdout(std::process::Stdio::piped()) .kill_on_drop(true) .spawn()?; diff --git a/node/core/pvf/tests/it/main.rs b/node/core/pvf/tests/it/main.rs index 72c459c2f632..0f30efefc4cd 100644 --- a/node/core/pvf/tests/it/main.rs +++ b/node/core/pvf/tests/it/main.rs @@ -258,7 +258,7 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() { #[tokio::test] async fn deleting_prepared_artifact_does_not_dispute() { let host = TestHost::new(); - let cache_dir = host.cache_dir.path().clone(); + let cache_dir = host.cache_dir.path(); let result = host .validate_candidate( diff --git a/node/core/pvf/tests/it/worker_common.rs b/node/core/pvf/tests/it/worker_common.rs index a3bf552e894a..9fb8be3fc08a 100644 --- a/node/core/pvf/tests/it/worker_common.rs +++ b/node/core/pvf/tests/it/worker_common.rs @@ -24,7 +24,7 @@ use crate::PUPPET_EXE; #[tokio::test] async fn spawn_immediate_exit() { let result = - spawn_with_program_path("integration-test", PUPPET_EXE, &["exit"], Duration::from_secs(2)) + spawn_with_program_path("integration-test", PUPPET_EXE, None, &["exit"], Duration::from_secs(2)) .await; assert!(matches!(result, Err(SpawnErr::AcceptTimeout))); } @@ -32,17 +32,35 @@ async fn spawn_immediate_exit() { #[tokio::test] async fn spawn_timeout() { let result = - spawn_with_program_path("integration-test", PUPPET_EXE, &["sleep"], Duration::from_secs(2)) + spawn_with_program_path("integration-test", PUPPET_EXE, None, &["sleep"], Duration::from_secs(2)) .await; assert!(matches!(result, Err(SpawnErr::AcceptTimeout))); } +#[tokio::test] +async fn should_fail_without_cache_path() { + // --socket-path is handled by `spawn_with_program_path` so we don't pass it here. + let result = spawn_with_program_path( + "integration-test", + PUPPET_EXE, + None, + &["prepare-worker"], + Duration::from_secs(2), + ) + .await; + assert!(matches!(result, Err(SpawnErr::AcceptTimeout))); +} + #[tokio::test] async fn should_connect() { + let cache_path = tempfile::tempdir().unwrap(); + let cache_path_str = cache_path.path().to_str().unwrap(); + let _ = spawn_with_program_path( "integration-test", PUPPET_EXE, - &["prepare-worker"], + Some(cache_path.path()), + &["prepare-worker", "--cache-path", cache_path_str], Duration::from_secs(2), ) .await