diff --git a/lib/types/src/compilation/target.rs b/lib/types/src/compilation/target.rs index 672867c616c..176e1907cce 100644 --- a/lib/types/src/compilation/target.rs +++ b/lib/types/src/compilation/target.rs @@ -29,7 +29,7 @@ pub use target_lexicon::{ /// /// [`cpuid` crate]: https://docs.rs/cpuid/0.1.1/cpuid/enum.CpuFeature.html /// [`cranelift-native`]: https://github.com/bytecodealliance/cranelift/blob/6988545fd20249b084c53f4761b8c861266f5d31/cranelift-native/src/lib.rs#L51-L92 -#[allow(missing_docs, clippy::derive_hash_xor_eq)] +#[allow(missing_docs, clippy::derived_hash_with_manual_eq)] #[derive(EnumSetType, Debug, Hash)] pub enum CpuFeature { // X86 features diff --git a/lib/vm/src/trap/trap.rs b/lib/vm/src/trap/trap.rs index 54239fcd924..fa8de9c9390 100644 --- a/lib/vm/src/trap/trap.rs +++ b/lib/vm/src/trap/trap.rs @@ -91,7 +91,7 @@ impl Trap { pub fn downcast(self) -> Result { match self { // We only try to downcast user errors - Trap::User(err) if err.is::() => Ok(*err.downcast::().unwrap()), + Self::User(err) if err.is::() => Ok(*err.downcast::().unwrap()), _ => Err(self), } } @@ -100,7 +100,7 @@ impl Trap { pub fn downcast_ref(&self) -> Option<&T> { match &self { // We only try to downcast user errors - Trap::User(err) if err.is::() => err.downcast_ref::(), + Self::User(err) if err.is::() => err.downcast_ref::(), _ => None, } } @@ -108,7 +108,7 @@ impl Trap { /// Returns true if the `Trap` is the same as T pub fn is(&self) -> bool { match self { - Trap::User(err) => err.is::(), + Self::User(err) => err.is::(), _ => false, } } @@ -117,7 +117,7 @@ impl Trap { impl std::error::Error for Trap { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match &self { - Trap::User(err) => Some(&**err), + Self::User(err) => Some(&**err), _ => None, } } diff --git a/lib/wasix/src/os/task/control_plane.rs b/lib/wasix/src/os/task/control_plane.rs index 96d5575e62e..c141796e39c 100644 --- a/lib/wasix/src/os/task/control_plane.rs +++ b/lib/wasix/src/os/task/control_plane.rs @@ -133,12 +133,11 @@ impl WasiControlPlane { } // Create the process first to do all the allocations before locking. - let mut proc = WasiProcess::new(WasiProcessId::from(0), module_hash, self.handle()); let mut mutable = self.state.mutable.write().unwrap(); let pid = mutable.next_process_id()?; - proc.set_pid(pid); + let proc = WasiProcess::new(pid, module_hash, self.handle()); mutable.processes.insert(pid, proc.clone()); Ok(proc) } diff --git a/lib/wasix/src/os/task/process.rs b/lib/wasix/src/os/task/process.rs index d4501248f0e..790fcdb2473 100644 --- a/lib/wasix/src/os/task/process.rs +++ b/lib/wasix/src/os/task/process.rs @@ -8,13 +8,13 @@ use std::{ collections::HashMap, convert::TryInto, sync::{ - atomic::{AtomicU32, Ordering}, + atomic::{AtomicI32, AtomicU32, Ordering}, Arc, Condvar, Mutex, MutexGuard, RwLock, Weak, }, time::Duration, }; use tracing::trace; -use wasmer::FunctionEnvMut; +use wasmer::{FromToNativeWasmType, FunctionEnvMut}; use wasmer_wasix_types::{ types::Signal, wasi::{Errno, ExitCode, Snapshot0Clockid}, @@ -78,29 +78,41 @@ impl std::fmt::Debug for WasiProcessId { } } -pub type LockableWasiProcessInner = Arc<(Mutex, Condvar)>; - /// Represents a process running within the compute state /// TODO: fields should be private and only accessed via methods. #[derive(Debug, Clone)] pub struct WasiProcess { + state: Arc, +} + +#[derive(Debug)] +struct State { /// Unique ID of this process - pub(crate) pid: WasiProcessId, + pid: WasiProcessId, /// Hash of the module that this process is using - pub(crate) module_hash: ModuleHash, + module_hash: ModuleHash, /// List of all the children spawned from this thread - pub(crate) parent: Option>>, - /// The inner protected region of the process with a conditional - /// variable that is used for coordination such as checksums. - pub(crate) inner: LockableWasiProcessInner, + parent: Option>>, /// Reference back to the compute engine // TODO: remove this reference, access should happen via separate state instead // (we don't want cyclical references) - pub(crate) compute: WasiControlPlaneHandle, + compute: WasiControlPlaneHandle, + + /// This process was instructed to terminate and should stop executing + /// immediately. + /// The value is either 0 (= shouldl not terminate), or an exit code. + should_terminate_with_code: AtomicI32, + /// Reference to the exit code for the main thread - pub(crate) finished: Arc, + status: Arc, + /// Number of threads waiting for children to exit - pub(crate) waiting: Arc, + waiting: Arc, + + /// The inner protected region of the process with a conditional + /// variable that is used for coordination such as checksums. + lock_condvar: Condvar, + inner: Mutex, } /// Represents a freeze of all threads to perform some action @@ -108,7 +120,7 @@ pub struct WasiProcess { /// things like snapshots which require the memory to remain /// stable while it performs a diff. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -pub enum WasiProcessCheckpoint { +pub enum ProcessCheckpoint { /// No checkpoint will take place and the process /// should just execute as per normal Execute, @@ -117,7 +129,7 @@ pub enum WasiProcessCheckpoint { Snapshot { trigger: SnapshotTrigger }, } -// TODO: fields should be private and only accessed via methods. +// TODO(theduke): this struct should be private! #[derive(Debug)] pub struct WasiProcessInner { /// Unique ID of this process @@ -128,11 +140,11 @@ pub struct WasiProcessInner { pub thread_count: u32, /// Signals that will be triggered at specific intervals pub signal_intervals: HashMap, - /// List of all the children spawned from this thread + /// Child processes. pub children: Vec, /// Represents a checkpoint which blocks all the threads /// and then executes some maintenance action - pub checkpoint: WasiProcessCheckpoint, + pub checkpoint: ProcessCheckpoint, } pub enum MaybeCheckpointResult<'a> { @@ -140,182 +152,87 @@ pub enum MaybeCheckpointResult<'a> { Unwinding, } -impl WasiProcessInner { - /// Checkpoints the process which will cause all other threads to - /// pause and for the thread and memory state to be saved - #[cfg(feature = "journal")] - pub fn checkpoint( - inner: LockableWasiProcessInner, - ctx: FunctionEnvMut<'_, WasiEnv>, - for_what: WasiProcessCheckpoint, - ) -> WasiResult> { - // Set the checkpoint flag and then enter the normal processing loop - { - let mut inner = inner.0.lock().unwrap(); - inner.checkpoint = for_what; - } - - Self::maybe_checkpoint::(inner, ctx) - } - - /// If a checkpoint has been started this will block the current process - /// until the checkpoint operation has completed - #[cfg(feature = "journal")] - pub fn maybe_checkpoint( - inner: LockableWasiProcessInner, - ctx: FunctionEnvMut<'_, WasiEnv>, - ) -> WasiResult> { - // Enter the lock which will determine if we are in a checkpoint or not - - use bytes::Bytes; - use wasmer::AsStoreMut; - use wasmer_types::OnCalledAction; - - use crate::{rewind_ext, WasiError}; - let guard = inner.0.lock().unwrap(); - if guard.checkpoint == WasiProcessCheckpoint::Execute { - // No checkpoint so just carry on - return Ok(Ok(MaybeCheckpointResult::NotThisTime(ctx))); - } - trace!("checkpoint capture"); - drop(guard); - - // Perform the unwind action - unwind::(ctx, move |mut ctx, memory_stack, rewind_stack| { - // Grab all the globals and serialize them - let store_data = - crate::utils::store::capture_instance_snapshot(&mut ctx.as_store_mut()) - .serialize() - .unwrap(); - let memory_stack = memory_stack.freeze(); - let rewind_stack = rewind_stack.freeze(); - let store_data = Bytes::from(store_data); - - tracing::debug!( - "stack snapshot unwind (memory_stack={}, rewind_stack={}, store_data={})", - memory_stack.len(), - rewind_stack.len(), - store_data.len(), - ); - - // Write our thread state to the snapshot - let tid = ctx.data().thread.tid(); - if let Err(err) = JournalEffector::save_thread_state::( - &mut ctx, - tid, - memory_stack.clone(), - rewind_stack.clone(), - store_data.clone(), - ) { - return wasmer_types::OnCalledAction::Trap(err.into()); - } - - let mut guard = inner.0.lock().unwrap(); - - // Wait for the checkpoint to finish (or if we are the last thread - // to freeze then we have to execute the checksum operation) - loop { - if let WasiProcessCheckpoint::Snapshot { trigger } = guard.checkpoint { - ctx.data().thread.set_check_pointing(true); - - // Now if we are the last thread we also write the memory - let is_last_thread = guard.threads.values().all(WasiThread::is_check_pointing); - if is_last_thread { - if let Err(err) = - JournalEffector::save_memory_and_snapshot(&mut ctx, &mut guard, trigger) - { - inner.1.notify_all(); - return wasmer_types::OnCalledAction::Trap(err.into()); - } - - // Clear the checkpointing flag and notify everyone to wake up - ctx.data().thread.set_check_pointing(false); - guard.checkpoint = WasiProcessCheckpoint::Execute; - trace!("checkpoint complete"); - inner.1.notify_all(); - } else { - guard = inner.1.wait(guard).unwrap(); - } - continue; - } - - ctx.data().thread.set_check_pointing(false); - trace!("checkpoint finished"); - - // Rewind the stack and carry on - return match rewind_ext::(&mut ctx, memory_stack, rewind_stack, store_data, None) - { - Errno::Success => OnCalledAction::InvokeAgain, - err => { - tracing::warn!( - "snapshot resumption failed - could not rewind the stack - errno={}", - err - ); - OnCalledAction::Trap(Box::new(WasiError::Exit(err.into()))) - } - }; - } - })?; - - Ok(Ok(MaybeCheckpointResult::Unwinding)) - } -} - -// TODO: why do we need this, how is it used? -pub(crate) struct WasiProcessWait { - waiting: Arc, -} - -impl WasiProcessWait { - pub fn new(process: &WasiProcess) -> Self { - process.waiting.fetch_add(1, Ordering::AcqRel); - Self { - waiting: process.waiting.clone(), - } - } -} - -impl Drop for WasiProcessWait { - fn drop(&mut self) { - self.waiting.fetch_sub(1, Ordering::AcqRel); - } -} - impl WasiProcess { pub fn new(pid: WasiProcessId, module_hash: ModuleHash, plane: WasiControlPlaneHandle) -> Self { WasiProcess { - pid, - module_hash, - parent: None, - compute: plane, - inner: Arc::new(( - Mutex::new(WasiProcessInner { + state: Arc::new(State { + pid, + module_hash, + parent: None, + compute: plane, + lock_condvar: Condvar::new(), + inner: Mutex::new(WasiProcessInner { pid, threads: Default::default(), thread_count: Default::default(), signal_intervals: Default::default(), children: Default::default(), - checkpoint: WasiProcessCheckpoint::Execute, + checkpoint: ProcessCheckpoint::Execute, }), - Condvar::new(), - )), - finished: Arc::new(OwnedTaskStatus::default()), - waiting: Arc::new(AtomicU32::new(0)), + status: Arc::new(OwnedTaskStatus::default()), + should_terminate_with_code: AtomicI32::new(0), + waiting: Arc::new(AtomicU32::new(0)), + }), } } - pub(super) fn set_pid(&mut self, pid: WasiProcessId) { - self.pid = pid; + pub fn handle(&self) -> WasiProcessHandle { + WasiProcessHandle::new(&self.state) + } + + #[inline] + pub fn module_hash(&self) -> &ModuleHash { + &self.state.module_hash + } + + #[inline] + pub fn status(&self) -> &Arc { + &self.state.status + } + + #[inline] + pub fn control_plane(&self) -> &WasiControlPlaneHandle { + &self.state.compute } /// Gets the process ID of this process + #[inline] pub fn pid(&self) -> WasiProcessId { - self.pid + self.state.pid + } + + /// Notify the shared lock condvar to trigger waiters. + pub fn lock_notify_all(&self) { + self.state.lock_condvar.notify_all(); + } + + pub fn lock_wait<'a>( + &self, + guard: MutexGuard<'a, WasiProcessInner>, + ) -> MutexGuard<'a, WasiProcessInner> { + self.state.lock_condvar.wait(guard).unwrap() + } + + /// Whether the process should terminate. + /// + /// Returns `None` if the process should not terminate, or `Some(code)` if + /// the process should terminate with the given exit code. + pub fn should_terminate_with_code(&self) -> Option { + let v = self + .state + .should_terminate_with_code + .load(Ordering::Acquire); + if v == 0 { + None + } else { + Some(ExitCode::from_native(v)) + } } /// Gets the process ID of the parent process pub fn ppid(&self) -> WasiProcessId { - self.parent + self.state + .parent .iter() .filter_map(|parent| parent.upgrade()) .map(|parent| parent.read().unwrap().pid) @@ -326,7 +243,7 @@ impl WasiProcess { /// Gains access to the process internals // TODO: Make this private, all inner access should be exposed with methods. pub fn lock(&self) -> MutexGuard<'_, WasiProcessInner> { - self.inner.0.lock().unwrap() + self.state.inner.lock().unwrap() } /// Creates a a thread and returns it @@ -334,12 +251,12 @@ impl WasiProcess { &self, layout: WasiMemoryLayout, ) -> Result { - let control_plane = self.compute.must_upgrade(); + let control_plane = self.state.compute.must_upgrade(); let task_count_guard = control_plane.register_task()?; // Determine if its the main thread or not let is_main = { - let inner = self.inner.0.lock().unwrap(); + let inner = self.lock(); inner.thread_count == 0 }; @@ -353,9 +270,9 @@ impl WasiProcess { }; // The wait finished should be the process version if its the main thread - let mut inner = self.inner.0.lock().unwrap(); + let mut inner = self.lock(); let finished = if is_main { - self.finished.clone() + self.state.status.clone() } else { Arc::new(OwnedTaskStatus::default()) }; @@ -365,13 +282,12 @@ impl WasiProcess { inner.threads.insert(tid, ctrl.clone()); inner.thread_count += 1; - Ok(WasiThreadHandle::new(ctrl, &self.inner)) + Ok(WasiThreadHandle::new(ctrl, self.handle())) } /// Gets a reference to a particular thread pub fn get_thread(&self, tid: &WasiThreadId) -> Option { - let inner = self.inner.0.lock().unwrap(); - inner.threads.get(tid).cloned() + self.lock().threads.get(tid).cloned() } /// Signals a particular thread in the process @@ -386,7 +302,7 @@ impl WasiProcess { let pid = self.pid(); tracing::trace!(%pid, %tid, "signal-thread({:?})", signal); - let inner = self.inner.0.lock().unwrap(); + let inner = self.lock(); if let Some(thread) = inner.threads.get(&tid) { thread.signal(signal); } else { @@ -405,8 +321,8 @@ impl WasiProcess { tracing::trace!(%pid, "signal-process({:?})", signal); { - let inner = self.inner.0.lock().unwrap(); - if self.waiting.load(Ordering::Acquire) > 0 { + let inner = self.lock(); + if self.state.waiting.load(Ordering::Acquire) > 0 { let mut triggered = false; for child in inner.children.iter() { child.signal_process(signal); @@ -417,7 +333,7 @@ impl WasiProcess { } } } - let inner = self.inner.0.lock().unwrap(); + let inner = self.lock(); for thread in inner.threads.values() { thread.signal(signal); } @@ -425,7 +341,7 @@ impl WasiProcess { /// Signals one of the threads every interval pub fn signal_interval(&self, signal: Signal, interval: Option, repeat: bool) { - let mut inner = self.inner.0.lock().unwrap(); + let mut inner = self.lock(); let interval = match interval { None => { @@ -449,26 +365,26 @@ impl WasiProcess { /// Returns the number of active threads for this process pub fn active_threads(&self) -> u32 { - let inner = self.inner.0.lock().unwrap(); + let inner = self.lock(); inner.thread_count } /// Waits until the process is finished. pub async fn join(&self) -> Result> { let _guard = WasiProcessWait::new(self); - self.finished.await_termination().await + self.state.status.await_termination().await } /// Attempts to join on the process pub fn try_join(&self) -> Option>> { - self.finished.status().into_finished() + self.state.status.status().into_finished() } /// Waits for all the children to be finished - pub async fn join_children(&mut self) -> Option>> { + pub async fn join_children(&self) -> Option>> { let _guard = WasiProcessWait::new(self); let children: Vec<_> = { - let inner = self.inner.0.lock().unwrap(); + let inner = self.lock(); inner.children.clone() }; if children.is_empty() { @@ -476,12 +392,12 @@ impl WasiProcess { } let mut waits = Vec::new(); for child in children { - if let Some(process) = self.compute.must_upgrade().get_process(child.pid) { - let inner = self.inner.clone(); + if let Some(process) = self.state.compute.must_upgrade().get_process(child.pid()) { + let self_ = self.clone(); waits.push(async move { let join = process.join().await; - let mut inner = inner.0.lock().unwrap(); - inner.children.retain(|a| a.pid != child.pid); + let mut inner = self_.lock(); + inner.children.retain(|a| a.pid() != child.pid()); join }) } @@ -493,10 +409,10 @@ impl WasiProcess { } /// Waits for any of the children to finished - pub async fn join_any_child(&mut self) -> Result, Errno> { + pub async fn join_any_child(&self) -> Result, Errno> { let _guard = WasiProcessWait::new(self); let children: Vec<_> = { - let inner = self.inner.0.lock().unwrap(); + let inner = self.lock(); inner.children.clone() }; if children.is_empty() { @@ -505,12 +421,12 @@ impl WasiProcess { let mut waits = Vec::new(); for child in children { - if let Some(process) = self.compute.must_upgrade().get_process(child.pid) { - let inner = self.inner.clone(); + if let Some(process) = self.state.compute.must_upgrade().get_process(child.pid()) { + let self_ = self.clone(); waits.push(async move { let join = process.join().await; - let mut inner = inner.0.lock().unwrap(); - inner.children.retain(|a| a.pid != child.pid); + let mut inner = self_.lock(); + inner.children.retain(|a| a.pid() != child.pid()); (child, join) }) } @@ -522,17 +438,184 @@ impl WasiProcess { let code = res.unwrap_or_else(|e| e.as_exit_code().unwrap_or_else(|| Errno::Canceled.into())); - Ok(Some((child.pid, code))) + Ok(Some((child.pid(), code))) } /// Terminate the process and all its threads pub fn terminate(&self, exit_code: ExitCode) { - // FIXME: this is wrong, threads might still be running! - // Need special logic for the main thread. - let guard = self.inner.0.lock().unwrap(); - for thread in guard.threads.values() { - thread.set_status_finished(Ok(exit_code)) + self.state + .should_terminate_with_code + .store(exit_code.raw(), Ordering::Release); + self.signal_process(Signal::Sigkill); + } + + /// Terminate the process and wait for all its threads to finish. + pub async fn terminate_wait(&self, exit_code: ExitCode) { + self.terminate(exit_code); + self.join_children().await; + self.join().await.ok(); + } +} + +impl WasiProcessInner { + /// Checkpoints the process which will cause all other threads to + /// pause and for the thread and memory state to be saved + #[cfg(feature = "journal")] + pub fn checkpoint( + process: WasiProcess, + ctx: FunctionEnvMut<'_, WasiEnv>, + for_what: ProcessCheckpoint, + ) -> WasiResult> { + // Set the checkpoint flag and then enter the normal processing loop + { + // TODO: add set_checkpoint method + let mut inner = process.lock(); + inner.checkpoint = for_what; + } + + Self::maybe_checkpoint::(process, ctx) + } + + /// If a checkpoint has been started this will block the current process + /// until the checkpoint operation has completed + #[cfg(feature = "journal")] + pub fn maybe_checkpoint( + process: WasiProcess, + ctx: FunctionEnvMut<'_, WasiEnv>, + ) -> WasiResult> { + // Enter the lock which will determine if we are in a checkpoint or not + + use bytes::Bytes; + use wasmer::AsStoreMut; + use wasmer_types::OnCalledAction; + + use crate::{rewind_ext, WasiError}; + { + let guard = process.lock(); + if guard.checkpoint == ProcessCheckpoint::Execute { + // No checkpoint so just carry on + return Ok(Ok(MaybeCheckpointResult::NotThisTime(ctx))); + } + trace!("checkpoint capture"); + drop(guard); } + + // Perform the unwind action + unwind::(ctx, move |mut ctx, memory_stack, rewind_stack| { + // Grab all the globals and serialize them + let store_data = + crate::utils::store::capture_instance_snapshot(&mut ctx.as_store_mut()) + .serialize() + .unwrap(); + let memory_stack = memory_stack.freeze(); + let rewind_stack = rewind_stack.freeze(); + let store_data = Bytes::from(store_data); + + tracing::debug!( + "stack snapshot unwind (memory_stack={}, rewind_stack={}, store_data={})", + memory_stack.len(), + rewind_stack.len(), + store_data.len(), + ); + + // Write our thread state to the snapshot + let tid = ctx.data().thread.tid(); + if let Err(err) = JournalEffector::save_thread_state::( + &mut ctx, + tid, + memory_stack.clone(), + rewind_stack.clone(), + store_data.clone(), + ) { + return wasmer_types::OnCalledAction::Trap(err.into()); + } + + let mut guard = process.lock(); + + // Wait for the checkpoint to finish (or if we are the last thread + // to freeze then we have to execute the checksum operation) + loop { + if let ProcessCheckpoint::Snapshot { trigger } = guard.checkpoint { + ctx.data().thread.set_check_pointing(true); + + // Now if we are the last thread we also write the memory + let is_last_thread = guard.threads.values().all(WasiThread::is_check_pointing); + if is_last_thread { + if let Err(err) = + JournalEffector::save_memory_and_snapshot(&mut ctx, &mut guard, trigger) + { + process.lock_notify_all(); + return wasmer_types::OnCalledAction::Trap(err.into()); + } + + // Clear the checkpointing flag and notify everyone to wake up + ctx.data().thread.set_check_pointing(false); + guard.checkpoint = ProcessCheckpoint::Execute; + trace!("checkpoint complete"); + process.lock_notify_all(); + } else { + guard = process.lock_wait(guard); + } + continue; + } + + ctx.data().thread.set_check_pointing(false); + trace!("checkpoint finished"); + + // Rewind the stack and carry on + return match rewind_ext::(&mut ctx, memory_stack, rewind_stack, store_data, None) + { + Errno::Success => OnCalledAction::InvokeAgain, + err => { + tracing::warn!( + "snapshot resumption failed - could not rewind the stack - errno={}", + err + ); + OnCalledAction::Trap(Box::new(WasiError::Exit(err.into()))) + } + }; + } + })?; + + Ok(Ok(MaybeCheckpointResult::Unwinding)) + } +} + +/// Weak handle to a process. +#[derive(Debug, Clone)] +pub struct WasiProcessHandle { + process: Weak, +} + +impl WasiProcessHandle { + fn new(process: &Arc) -> Self { + Self { + process: Arc::downgrade(process), + } + } + + pub fn upgrade(&self) -> Option { + self.process.upgrade().map(|state| WasiProcess { state }) + } +} + +// TODO: why do we need this, how is it used? +pub(crate) struct WasiProcessWait { + waiting: Arc, +} + +impl WasiProcessWait { + pub fn new(process: &WasiProcess) -> Self { + process.state.waiting.fetch_add(1, Ordering::AcqRel); + Self { + waiting: process.state.waiting.clone(), + } + } +} + +impl Drop for WasiProcessWait { + fn drop(&mut self) { + self.waiting.fetch_sub(1, Ordering::AcqRel); } } diff --git a/lib/wasix/src/os/task/thread.rs b/lib/wasix/src/os/task/thread.rs index 94ee92a4ec4..ea1cfc661fd 100644 --- a/lib/wasix/src/os/task/thread.rs +++ b/lib/wasix/src/os/task/thread.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::{ collections::HashMap, ops::{Deref, DerefMut}, - sync::{Arc, Condvar, Mutex, Weak}, + sync::{Arc, Mutex}, task::Waker, }; @@ -15,14 +15,11 @@ use wasmer_wasix_types::{ wasi::{Errno, ExitCode}, }; -use crate::{ - os::task::process::{WasiProcessId, WasiProcessInner}, - syscalls::HandleRewindType, - WasiRuntimeError, -}; +use crate::{os::task::process::WasiProcessId, syscalls::HandleRewindType, WasiRuntimeError}; use super::{ control_plane::TaskCountGuard, + process::WasiProcessHandle, task_join_handle::{OwnedTaskStatus, TaskJoinHandle}, }; @@ -278,21 +275,6 @@ impl WasiThread { self.state.status.set_running(); } - /// Gets or sets the exit code based of a signal that was received - /// Note: if the exit code was already set earlier this method will - /// just return that earlier set exit code - pub fn set_or_get_exit_code_for_signal(&self, sig: Signal) -> ExitCode { - let default_exitcode: ExitCode = match sig { - Signal::Sigquit | Signal::Sigabrt => Errno::Success.into(), - _ => Errno::Intr.into(), - }; - // This will only set the status code if its not already set - self.set_status_finished(Ok(default_exitcode)); - self.try_join() - .map(|r| r.unwrap_or(default_exitcode)) - .unwrap_or(default_exitcode) - } - /// Marks the thread as finished (which will cause anyone that /// joined on it to wake up) pub fn set_status_finished(&self, res: Result) { @@ -494,7 +476,7 @@ impl WasiThread { #[derive(Debug)] pub struct WasiThreadHandleProtected { thread: WasiThread, - inner: Weak<(Mutex, Condvar)>, + process: WasiProcessHandle, } #[derive(Debug, Clone)] @@ -503,14 +485,11 @@ pub struct WasiThreadHandle { } impl WasiThreadHandle { - pub(crate) fn new( - thread: WasiThread, - inner: &Arc<(Mutex, Condvar)>, - ) -> WasiThreadHandle { + pub(crate) fn new(thread: WasiThread, proc: WasiProcessHandle) -> WasiThreadHandle { Self { protected: Arc::new(WasiThreadHandleProtected { thread, - inner: Arc::downgrade(inner), + process: proc, }), } } @@ -527,8 +506,8 @@ impl WasiThreadHandle { impl Drop for WasiThreadHandleProtected { fn drop(&mut self) { let id = self.thread.tid(); - if let Some(inner) = Weak::upgrade(&self.inner) { - let mut inner = inner.0.lock().unwrap(); + if let Some(proc) = self.process.upgrade() { + let mut inner = proc.lock(); if let Some(ctrl) = inner.threads.remove(&id) { ctrl.set_status_finished(Ok(Errno::Success.into())); } diff --git a/lib/wasix/src/runners/wcgi/handler.rs b/lib/wasix/src/runners/wcgi/handler.rs index 6f0eb012bc8..85c2cd8c16b 100644 --- a/lib/wasix/src/runners/wcgi/handler.rs +++ b/lib/wasix/src/runners/wcgi/handler.rs @@ -98,7 +98,7 @@ impl Handler { drop(token); } }; - let finished = env.process.finished.clone(); + let finished = env.process.status().clone(); /* * TODO: Reusing memory for DCGI calls and not just the file system diff --git a/lib/wasix/src/state/env.rs b/lib/wasix/src/state/env.rs index 71c331dbf43..9985b52dc33 100644 --- a/lib/wasix/src/state/env.rs +++ b/lib/wasix/src/state/env.rs @@ -372,7 +372,9 @@ impl WasiEnv { /// Forking the WasiState is used when either fork or vfork is called pub fn fork(&self) -> Result<(Self, WasiThreadHandle), ControlPlaneError> { - let process = self.control_plane.new_process(self.process.module_hash)?; + let process = self + .control_plane + .new_process(*self.process.module_hash())?; let handle = process.new_thread(self.layout.clone())?; let thread = handle.as_thread(); @@ -447,16 +449,19 @@ impl WasiEnv { // The process and thread state need to be reset self.process = WasiProcess::new( - self.process.pid, - self.process.module_hash, - self.process.compute.clone(), + self.process.pid(), + *self.process.module_hash(), + self.process.control_plane().clone(), ); self.thread = WasiThread::new( self.thread.pid(), self.thread.tid(), self.thread.is_main(), - self.process.finished.clone(), - self.process.compute.must_upgrade().register_task()?, + self.process.status().clone(), + self.process + .control_plane() + .must_upgrade() + .register_task()?, self.thread.memory_layout().clone(), ); @@ -670,6 +675,12 @@ impl WasiEnv { // If a signal handler has never been set then we need to handle signals // differently let env = ctx.data(); + + // Check for forced exit + if let Some(forced_exit) = env.process.should_terminate_with_code() { + return Err(WasiError::Exit(forced_exit)); + } + let inner = env .try_inner() .ok_or_else(|| WasiError::Exit(Errno::Fault.into()))?; @@ -682,8 +693,7 @@ impl WasiEnv { || sig == Signal::Sigkill || sig == Signal::Sigabrt { - let exit_code = env.thread.set_or_get_exit_code_for_signal(sig); - return Err(WasiError::Exit(exit_code)); + return Err(WasiError::Exit(ExitCode::Errno(Errno::Intr))); } else { tracing::trace!(pid=%env.pid(), ?sig, "Signal ignored"); } @@ -692,16 +702,11 @@ impl WasiEnv { } } - // Check for forced exit - if let Some(forced_exit) = env.should_exit() { - return Err(WasiError::Exit(forced_exit)); - } - Self::process_signals(ctx) } /// Porcesses any signals that are batched up - pub(crate) fn process_signals(ctx: &mut FunctionEnvMut<'_, Self>) -> WasiResult { + fn process_signals(ctx: &mut FunctionEnvMut<'_, Self>) -> WasiResult { // If a signal handler has never been set then we need to handle signals // differently let env = ctx.data(); @@ -737,7 +742,7 @@ impl WasiEnv { let mut now = 0; { let mut has_signal_interval = false; - let inner = env.process.inner.0.lock().unwrap(); + let inner = env.process.lock(); if !inner.signal_intervals.is_empty() { now = platform_clock_time_get(Snapshot0Clockid::Monotonic, 1_000_000).unwrap() as u128; @@ -750,7 +755,7 @@ impl WasiEnv { } } if has_signal_interval { - let mut inner = env.process.inner.0.lock().unwrap(); + let mut inner = env.process.lock(); for signal in inner.signal_intervals.values_mut() { let elapsed = now - signal.last_signal; if elapsed >= signal.interval.as_nanos() { @@ -798,30 +803,6 @@ impl WasiEnv { } } - /// Returns an exit code if the thread or process has been forced to exit - pub fn should_exit(&self) -> Option { - // Check for forced exit - if let Some(forced_exit) = self.thread.try_join() { - return Some(forced_exit.unwrap_or_else(|err| { - tracing::debug!( - error = &*err as &dyn std::error::Error, - "exit runtime error", - ); - Errno::Child.into() - })); - } - if let Some(forced_exit) = self.process.try_join() { - return Some(forced_exit.unwrap_or_else(|err| { - tracing::debug!( - error = &*err as &dyn std::error::Error, - "exit runtime error", - ); - Errno::Child.into() - })); - } - None - } - /// Accesses the virtual networking implementation pub fn net(&self) -> &DynVirtualNetworking { self.runtime.networking() diff --git a/lib/wasix/src/state/func_env.rs b/lib/wasix/src/state/func_env.rs index cff6ed1918a..668c630cb55 100644 --- a/lib/wasix/src/state/func_env.rs +++ b/lib/wasix/src/state/func_env.rs @@ -278,7 +278,7 @@ impl WasiFunctionEnv { // The first event we save is an event that records the module hash. // Note: This is used to detect if an incorrect journal is used on the wrong // process or if a process has been recompiled - let wasm_hash = self.data(&store).process.module_hash.as_bytes(); + let wasm_hash = self.data(&store).process.module_hash().as_bytes(); let mut ctx = self.env.clone().into_mut(&mut store); crate::journal::JournalEffector::save_event( &mut ctx, diff --git a/lib/wasix/src/syscalls/journal.rs b/lib/wasix/src/syscalls/journal.rs index c2040111291..daf9e704736 100644 --- a/lib/wasix/src/syscalls/journal.rs +++ b/lib/wasix/src/syscalls/journal.rs @@ -14,7 +14,7 @@ pub fn maybe_snapshot_once( mut ctx: FunctionEnvMut<'_, WasiEnv>, trigger: crate::journal::SnapshotTrigger, ) -> WasiResult> { - use crate::os::task::process::{WasiProcessCheckpoint, WasiProcessInner}; + use crate::os::task::process::{ProcessCheckpoint, WasiProcessInner}; unsafe { handle_rewind_ext::(&mut ctx, HandleRewindType::Resultless) }; @@ -23,11 +23,11 @@ pub fn maybe_snapshot_once( } if ctx.data_mut().pop_snapshot_trigger(trigger) { - let inner = ctx.data().process.inner.clone(); + let process = ctx.data().process.clone(); let res = wasi_try_ok_ok!(WasiProcessInner::checkpoint::( - inner, + process, ctx, - WasiProcessCheckpoint::Snapshot { trigger }, + ProcessCheckpoint::Snapshot { trigger }, )?); match res { MaybeCheckpointResult::Unwinding => return Ok(Err(Errno::Success)), @@ -51,14 +51,14 @@ pub fn maybe_snapshot( pub fn maybe_snapshot( mut ctx: FunctionEnvMut<'_, WasiEnv>, ) -> WasiResult> { - use crate::os::task::process::{WasiProcessCheckpoint, WasiProcessInner}; + use crate::os::task::process::{ProcessCheckpoint, WasiProcessInner}; if !ctx.data().enable_journal { return Ok(Ok(ctx)); } - let inner = ctx.data().process.inner.clone(); - let res = wasi_try_ok_ok!(WasiProcessInner::maybe_checkpoint::(inner, ctx)?); + let process = ctx.data().process.clone(); + let res = wasi_try_ok_ok!(WasiProcessInner::maybe_checkpoint::(process, ctx)?); match res { MaybeCheckpointResult::Unwinding => return Ok(Err(Errno::Success)), MaybeCheckpointResult::NotThisTime(c) => { @@ -102,7 +102,7 @@ pub unsafe fn restore_snapshot( stderr_fds.insert(2 as WasiFd); // Loop through all the events and process them - let cur_module_hash = Some(ctx.data().process.module_hash.as_bytes()); + let cur_module_hash = Some(ctx.data().process.module_hash().as_bytes()); let mut journal_module_hash = None; let mut rewind = None; while let Some(next) = journal.read().map_err(anyhow_err_to_runtime_err)? { diff --git a/lib/wasix/src/syscalls/mod.rs b/lib/wasix/src/syscalls/mod.rs index 610f4075419..b011daf8bfa 100644 --- a/lib/wasix/src/syscalls/mod.rs +++ b/lib/wasix/src/syscalls/mod.rs @@ -28,6 +28,7 @@ use futures::{ use tracing::instrument; pub use wasi::*; pub use wasix::*; +use wasmer::FromToNativeWasmType; pub mod legacy; @@ -305,7 +306,7 @@ where let mut env = ctx.data(); // Check if we need to exit the asynchronous loop - if let Some(exit_code) = env.should_exit() { + if let Some(exit_code) = env.process.should_terminate_with_code() { return Err(WasiError::Exit(exit_code)); } @@ -369,22 +370,24 @@ where } let env = self.ctx.data(); - if let Some(forced_exit) = env.thread.try_join() { - return Poll::Ready(Err(WasiError::Exit(forced_exit.unwrap_or_else(|err| { - tracing::debug!("exit runtime error - {}", err); - Errno::Child.into() - })))); + if let Some(code) = env.process.should_terminate_with_code() { + return Poll::Ready(Err(WasiError::Exit(code))); } if self.process_signals && env.thread.has_signals_or_subscribe(cx.waker()) { let signals = env.thread.signals().lock().unwrap(); for sig in signals.0.iter() { + // FIXME: quit and abort should invoke the handler. + // FIXME: prevent duplication with code in WasiEnv::process_signals_and_exit if *sig == Signal::Sigint || *sig == Signal::Sigquit || *sig == Signal::Sigkill || *sig == Signal::Sigabrt { - let exit_code = env.thread.set_or_get_exit_code_for_signal(*sig); - return Poll::Ready(Err(WasiError::Exit(exit_code))); + let code = env + .process + .should_terminate_with_code() + .unwrap_or(ExitCode::Errno(Errno::Intr)); + return Poll::Ready(Err(WasiError::Exit(code))); } } } @@ -507,7 +510,7 @@ where if let Poll::Ready(res) = Pin::new(&mut self.pinned_work).poll(cx) { return Poll::Ready(Ok(res)); } - if let Some(exit_code) = self.env.should_exit() { + if let Some(exit_code) = self.env.process.should_terminate_with_code() { return Poll::Ready(Err(WasiError::Exit(exit_code))); } if self.env.thread.has_signals_or_subscribe(cx.waker()) { diff --git a/lib/wasix/src/syscalls/wasi/fd_read.rs b/lib/wasix/src/syscalls/wasi/fd_read.rs index 2d16eda58b4..c7469865588 100644 --- a/lib/wasix/src/syscalls/wasi/fd_read.rs +++ b/lib/wasix/src/syscalls/wasi/fd_read.rs @@ -7,7 +7,7 @@ use crate::{ fs::NotificationInner, journal::SnapshotTrigger, net::socket::TimeType, - os::task::process::{MaybeCheckpointResult, WasiProcessCheckpoint, WasiProcessInner}, + os::task::process::{MaybeCheckpointResult, ProcessCheckpoint, WasiProcessInner}, syscalls::*, }; diff --git a/lib/wasix/src/syscalls/wasix/proc_exec.rs b/lib/wasix/src/syscalls/wasix/proc_exec.rs index 5e3b1d93215..e6a05aaeedb 100644 --- a/lib/wasix/src/syscalls/wasix/proc_exec.rs +++ b/lib/wasix/src/syscalls/wasix/proc_exec.rs @@ -82,7 +82,7 @@ pub fn proc_exec( // We will need the child pid later let child_process = ctx.data().process.clone(); let child_pid = child_process.pid(); - let child_finished = child_process.finished; + let child_finished = child_process.status(); // Restore the WasiEnv to the point when we vforked vfork.env.swap_inner(ctx.data_mut()); diff --git a/lib/wasix/src/syscalls/wasix/proc_fork.rs b/lib/wasix/src/syscalls/wasix/proc_fork.rs index 9bcdc337312..bdd9ffe769f 100644 --- a/lib/wasix/src/syscalls/wasix/proc_fork.rs +++ b/lib/wasix/src/syscalls/wasix/proc_fork.rs @@ -57,7 +57,7 @@ pub fn proc_fork( } }; let child_pid = child_env.process.pid(); - let child_finished = child_env.process.finished.clone(); + let child_finished = child_env.process.status().clone(); // We write a zero to the PID before we capture the stack // so that this is what will be returned to the child diff --git a/lib/wasix/src/syscalls/wasix/proc_join.rs b/lib/wasix/src/syscalls/wasix/proc_join.rs index a8b29b53344..12bc401188c 100644 --- a/lib/wasix/src/syscalls/wasix/proc_join.rs +++ b/lib/wasix/src/syscalls/wasix/proc_join.rs @@ -161,10 +161,10 @@ pub(super) fn proc_join_internal( let process = inner .children .iter() - .filter(|c| c.pid == pid) + .filter(|c| c.pid() == pid) .map(Clone::clone) .next(); - inner.children.retain(|c| c.pid != pid); + inner.children.retain(|c| c.pid() != pid); process };