diff --git a/Cargo.lock b/Cargo.lock index f85932189cc9..481e0db077ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3700,6 +3700,7 @@ dependencies = [ "log", "once_cell", "rustix", + "smallvec", "system-interface", "tempfile", "test-log", diff --git a/crates/test-programs/src/bin/preview2_pollable_correct.rs b/crates/test-programs/src/bin/preview2_pollable_correct.rs new file mode 100644 index 000000000000..3ecc8a8e7fef --- /dev/null +++ b/crates/test-programs/src/bin/preview2_pollable_correct.rs @@ -0,0 +1,12 @@ +use test_programs::wasi::cli::stdin; + +fn main() { + let stdin = stdin::get_stdin(); + let p1 = stdin.subscribe(); + let p2 = stdin.subscribe(); + + // Should work: + // - Exactly the same pollable passed in multiple times. + // - Distinct pollables for the same backing resource (stdin in this case). + test_programs::wasi::io::poll::poll(&[&p1, &p2, &p1, &p2]); +} diff --git a/crates/test-programs/src/bin/preview2_pollable_traps.rs b/crates/test-programs/src/bin/preview2_pollable_traps.rs new file mode 100644 index 000000000000..55e61a5c6cc1 --- /dev/null +++ b/crates/test-programs/src/bin/preview2_pollable_traps.rs @@ -0,0 +1,4 @@ +fn main() { + // Polling an empty list should trap: + test_programs::wasi::io::poll::poll(&[]); +} diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 351f8f59d775..e254359393cd 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -644,7 +644,7 @@ impl crate::bindings::http::types::HostFutureTrailers for T { &mut self, index: Resource, ) -> wasmtime::Result> { - wasmtime_wasi::preview2::subscribe(self.table(), index) + wasmtime_wasi::preview2::subscribe(self.table(), &index) } fn get( @@ -852,7 +852,7 @@ impl crate::bindings::http::types::HostFutureIncomingResponse f &mut self, id: Resource, ) -> wasmtime::Result> { - wasmtime_wasi::preview2::subscribe(self.table(), id) + wasmtime_wasi::preview2::subscribe(self.table(), &id) } } diff --git a/crates/wasi-http/wit/deps/io/poll.wit b/crates/wasi-http/wit/deps/io/poll.wit index ddc67f8b7ace..8139064c39c3 100644 --- a/crates/wasi-http/wit/deps/io/poll.wit +++ b/crates/wasi-http/wit/deps/io/poll.wit @@ -27,8 +27,9 @@ interface poll { /// The result `list` contains one or more indices of handles in the /// argument list that is ready for I/O. /// - /// If the list contains more elements than can be indexed with a `u32` - /// value, this function traps. + /// This function traps if either: + /// - the list is empty, or: + /// - the list contains more elements than can be indexed with a `u32` value. /// /// A timeout can be implemented by adding a pollable from the /// wasi-clocks API to the list. diff --git a/crates/wasi/Cargo.toml b/crates/wasi/Cargo.toml index 28559f932eea..7e09e63974dc 100644 --- a/crates/wasi/Cargo.toml +++ b/crates/wasi/Cargo.toml @@ -39,6 +39,7 @@ io-lifetimes = { workspace = true, optional = true } fs-set-times = { workspace = true, optional = true } bitflags = { workspace = true, optional = true } async-trait = { workspace = true, optional = true } +smallvec = { workspace = true } system-interface = { workspace = true, optional = true} futures = { workspace = true, optional = true } diff --git a/crates/wasi/src/preview2/host/clocks.rs b/crates/wasi/src/preview2/host/clocks.rs index 42b6cb7b9236..de0c1b709096 100644 --- a/crates/wasi/src/preview2/host/clocks.rs +++ b/crates/wasi/src/preview2/host/clocks.rs @@ -1,10 +1,8 @@ -#![allow(unused_variables)] - use crate::preview2::bindings::{ clocks::monotonic_clock::{self, Duration as WasiDuration, Instant}, clocks::wall_clock::{self, Datetime}, }; -use crate::preview2::poll::{subscribe, Subscribe}; +use crate::preview2::poll; use crate::preview2::{Pollable, WasiView}; use cap_std::time::SystemTime; use std::time::Duration; @@ -42,23 +40,16 @@ impl wall_clock::Host for T { } } -fn subscribe_to_duration( - table: &mut wasmtime::component::ResourceTable, - duration: tokio::time::Duration, -) -> anyhow::Result> { - let sleep = if duration.is_zero() { - table.push(Deadline::Past)? +fn make_pollable(duration: tokio::time::Duration) -> Pollable { + if duration.is_zero() { + poll::ready() } else if let Some(deadline) = tokio::time::Instant::now().checked_add(duration) { - // NB: this resource created here is not actually exposed to wasm, it's - // only an internal implementation detail used to match the signature - // expected by `subscribe`. - table.push(Deadline::Instant(deadline))? + poll::once(async move { tokio::time::sleep_until(deadline).await }) } else { // If the user specifies a time so far in the future we can't // represent it, wait forever rather than trap. - table.push(Deadline::Never)? - }; - subscribe(table, sleep) + poll::pending() + } } impl monotonic_clock::Host for T { @@ -77,27 +68,12 @@ impl monotonic_clock::Host for T { } else { Duration::from_nanos(0) }; - subscribe_to_duration(&mut self.table(), duration) + let pollable = make_pollable(duration); + Ok(self.table().push(pollable)?) } fn subscribe_duration(&mut self, duration: WasiDuration) -> anyhow::Result> { - subscribe_to_duration(&mut self.table(), Duration::from_nanos(duration)) - } -} - -enum Deadline { - Past, - Instant(tokio::time::Instant), - Never, -} - -#[async_trait::async_trait] -impl Subscribe for Deadline { - async fn ready(&mut self) { - match self { - Deadline::Past => {} - Deadline::Instant(instant) => tokio::time::sleep_until(*instant).await, - Deadline::Never => std::future::pending().await, - } + let pollable = make_pollable(Duration::from_nanos(duration)); + Ok(self.table().push(pollable)?) } } diff --git a/crates/wasi/src/preview2/host/io.rs b/crates/wasi/src/preview2/host/io.rs index b82ccc8efe43..12c6ada66269 100644 --- a/crates/wasi/src/preview2/host/io.rs +++ b/crates/wasi/src/preview2/host/io.rs @@ -49,7 +49,7 @@ impl streams::HostOutputStream for T { } fn subscribe(&mut self, stream: Resource) -> anyhow::Result> { - subscribe(self.table(), stream) + subscribe(self.table(), &stream) } async fn blocking_write_and_flush( @@ -220,7 +220,7 @@ impl streams::HostInputStream for T { } fn subscribe(&mut self, stream: Resource) -> anyhow::Result> { - crate::preview2::poll::subscribe(self.table(), stream) + crate::preview2::poll::subscribe(self.table(), &stream) } } @@ -230,9 +230,8 @@ pub mod sync { self as async_streams, Host as AsyncHost, HostInputStream as AsyncHostInputStream, HostOutputStream as AsyncHostOutputStream, }, - bindings::sync_io::io::poll::Pollable, bindings::sync_io::io::streams::{self, InputStream, OutputStream}, - in_tokio, StreamError, StreamResult, WasiView, + in_tokio, Pollable, StreamError, StreamResult, WasiView, }; use wasmtime::component::Resource; diff --git a/crates/wasi/src/preview2/host/tcp.rs b/crates/wasi/src/preview2/host/tcp.rs index 52e2b86b612f..9ed5b7c70601 100644 --- a/crates/wasi/src/preview2/host/tcp.rs +++ b/crates/wasi/src/preview2/host/tcp.rs @@ -560,7 +560,7 @@ impl crate::preview2::host::tcp::tcp::HostTcpSocket for T { } fn subscribe(&mut self, this: Resource) -> anyhow::Result> { - crate::preview2::poll::subscribe(self.table(), this) + crate::preview2::poll::subscribe(self.table(), &this) } fn shutdown( diff --git a/crates/wasi/src/preview2/host/udp.rs b/crates/wasi/src/preview2/host/udp.rs index a33340017195..81d9ee97c7d7 100644 --- a/crates/wasi/src/preview2/host/udp.rs +++ b/crates/wasi/src/preview2/host/udp.rs @@ -98,12 +98,14 @@ impl udp::HostUdpSocket for T { )> { let table = self.table(); - let has_active_streams = table - .iter_children(&this)? - .any(|c| c.is::() || c.is::()); + for child_result in table.iter_children(&this)? { + let Ok(child) = child_result else { + return Err(SocketError::trap(anyhow!("UDP stream taken."))); + }; - if has_active_streams { - return Err(SocketError::trap(anyhow!("UDP streams not dropped yet"))); + if child.is::() || child.is::() { + return Err(SocketError::trap(anyhow!("UDP streams not dropped yet"))); + } } let socket = table.get_mut(&this)?; @@ -283,7 +285,7 @@ impl udp::HostUdpSocket for T { } fn subscribe(&mut self, this: Resource) -> anyhow::Result> { - crate::preview2::poll::subscribe(self.table(), this) + crate::preview2::poll::subscribe(self.table(), &this) } fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { @@ -363,7 +365,7 @@ impl udp::HostIncomingDatagramStream for T { &mut self, this: Resource, ) -> anyhow::Result> { - crate::preview2::poll::subscribe(self.table(), this) + crate::preview2::poll::subscribe(self.table(), &this) } fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { @@ -497,7 +499,7 @@ impl udp::HostOutgoingDatagramStream for T { &mut self, this: Resource, ) -> anyhow::Result> { - crate::preview2::poll::subscribe(self.table(), this) + crate::preview2::poll::subscribe(self.table(), &this) } fn drop(&mut self, this: Resource) -> Result<(), anyhow::Error> { diff --git a/crates/wasi/src/preview2/ip_name_lookup.rs b/crates/wasi/src/preview2/ip_name_lookup.rs index dd9243399956..9bd3459912a9 100644 --- a/crates/wasi/src/preview2/ip_name_lookup.rs +++ b/crates/wasi/src/preview2/ip_name_lookup.rs @@ -69,7 +69,7 @@ impl HostResolveAddressStream for T { &mut self, resource: Resource, ) -> Result> { - subscribe(self.table(), resource) + subscribe(self.table(), &resource) } fn drop(&mut self, resource: Resource) -> Result<()> { diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index 16087504364c..dea0a366319e 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -28,7 +28,7 @@ mod host; mod ip_name_lookup; mod network; pub mod pipe; -mod poll; +pub mod poll; #[cfg(feature = "preview1-on-preview2")] pub mod preview0; #[cfg(feature = "preview1-on-preview2")] @@ -45,7 +45,7 @@ pub use self::ctx::{WasiCtx, WasiCtxBuilder, WasiView}; pub use self::error::{I32Exit, TrappableError}; pub use self::filesystem::{DirPerms, FilePerms, FsError, FsResult}; pub use self::network::{Network, SocketError, SocketResult}; -pub use self::poll::{subscribe, ClosureFuture, MakeFuture, Pollable, PollableFuture, Subscribe}; +pub use self::poll::{subscribe, Pollable, Subscribe}; pub use self::random::{thread_rng, Deterministic}; pub use self::stdio::{ stderr, stdin, stdout, IsATTY, Stderr, Stdin, StdinStream, Stdout, StdoutStream, @@ -214,7 +214,7 @@ impl From> for AbortOnDropJoinHandle { impl Future for AbortOnDropJoinHandle { type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.as_mut().0).poll(cx) { + match Future::poll(Pin::new(&mut self.as_mut().0), cx) { Poll::Pending => Poll::Pending, Poll::Ready(r) => Poll::Ready(r.expect("child task panicked")), } diff --git a/crates/wasi/src/preview2/poll.rs b/crates/wasi/src/preview2/poll.rs index 64be2420f0b4..fbea298334d8 100644 --- a/crates/wasi/src/preview2/poll.rs +++ b/crates/wasi/src/preview2/poll.rs @@ -1,161 +1,368 @@ use crate::preview2::{bindings::io::poll, WasiView}; -use anyhow::Result; -use std::any::Any; -use std::collections::HashMap; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +use anyhow::{anyhow, Result}; +use futures::Future; +use smallvec::{smallvec, SmallVec}; +use std::{ + any::Any, + collections::{hash_map::Entry, HashMap}, + pin::Pin, + task::{Context, Poll}, +}; use wasmtime::component::{Resource, ResourceTable}; -pub type PollableFuture<'a> = Pin + Send + 'a>>; -pub type MakeFuture = for<'a> fn(&'a mut dyn Any) -> PollableFuture<'a>; -pub type ClosureFuture = Box PollableFuture<'static> + Send + 'static>; +/// For all intents and purposes this is just a regular [`Future`], except that +/// the `poll` method has access to the current [`WasiView`]. +trait WasiFuture { + /// See [Future::Output] + type Output; -/// A host representation of the `wasi:io/poll.pollable` resource. -/// -/// A pollable is not the same thing as a Rust Future: the same pollable may be used to -/// repeatedly check for readiness of a given condition, e.g. if a stream is readable -/// or writable. So, rather than containing a Future, which can only become Ready once, a -/// Pollable contains a way to create a Future in each call to `poll`. -pub struct Pollable { - index: u32, - make_future: MakeFuture, - remove_index_on_delete: Option Result<()>>, + /// See [Future::poll] + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + view: &mut dyn WasiView, + ) -> Poll; +} + +impl WasiFuture for F { + type Output = F::Output; + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + _view: &mut dyn WasiView, + ) -> Poll { + Future::poll(self, cx) + } } +/// Internal helper trait to unify Subsribe resources and standalone Pollables. +trait PollableInternal: Send + 'static { + fn ready<'a>(&'a mut self) -> Pin + Send + 'a>>; + + fn is_ready(&mut self, view: &mut dyn WasiView) -> bool { + let mut future = self.ready(); + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + future.as_mut().poll(&mut cx, view).is_ready() + } +} + +/// A host implementation of the `wasi:io/poll.pollable` contract. +/// +/// A pollable is not the same thing as a Rust Future: in WASI, the same pollable +/// may be used to repeatedly check for readiness of a given condition, e.g. if +/// a stream is readable or writable. So, rather than containing a Future, which +/// can only become Ready once, Subscribe contains a way to create a Future in +/// each call to `poll`. #[async_trait::async_trait] pub trait Subscribe: Send + 'static { + /// Wait for the pollable to be ready. + /// + /// This can be called repeatedly as the readiness state of a pollable is + /// able to change many times during its lifetime. + /// + /// # Cancel safety + /// The implementation must make sure to only await futures that are + /// cancel-safe, as the returned future will most likely be canceled, even + /// during normal operation. async fn ready(&mut self); + + /// Check to see if the pollable is currently ready. + fn is_ready(&mut self) -> bool { + let mut future = self.ready(); + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + future.as_mut().poll(&mut cx).is_ready() + } } -/// Creates a `pollable` resource which is susbcribed to the provided -/// `resource`. -/// -/// If `resource` is an owned resource then it will be deleted when the returned -/// resource is deleted. Otherwise the returned resource is considered a "child" -/// of the given `resource` which means that the given resource cannot be -/// deleted while the `pollable` is still alive. -pub fn subscribe(table: &mut ResourceTable, resource: Resource) -> Result> +impl PollableInternal for T { + fn ready<'a>(&'a mut self) -> Pin + Send + 'a>> { + Box::pin(Subscribe::ready(self)) + } + + fn is_ready(&mut self, _view: &mut dyn WasiView) -> bool { + Subscribe::is_ready(self) + } +} + +/// Create a pollable that is always ready. +pub fn ready() -> Pollable { + poll_ready_fn(|_, _| Poll::Ready(())) +} + +/// Create a pollable that is never ready. +pub fn pending() -> Pollable { + poll_ready_fn(|_, _| Poll::Pending) +} + +/// Create an ad-hoc Pollable implementation from a closure. The closure will be +/// called repeatedly, even after it has already returned [Poll::Ready] before. +pub fn poll_ready_fn(poll_ready_fn: F) -> Pollable where - T: Subscribe, + F: FnMut(&mut Context<'_>, &mut dyn WasiView) -> Poll<()> + Send + 'static, { - fn make_future<'a, T>(stream: &'a mut dyn Any) -> PollableFuture<'a> + struct PollReadyFn { + poll_ready_fn: R, + } + impl PollableInternal for PollReadyFn where - T: Subscribe, + F: FnMut(&mut Context<'_>, &mut dyn WasiView) -> Poll<()> + Send + 'static, { - stream.downcast_mut::().unwrap().ready() + fn ready<'a>(&'a mut self) -> Pin + Send + 'a>> { + Box::pin(PollReadyFnFuture { pollable: self }) + } + } + + struct PollReadyFnFuture<'a, F> { + pollable: &'a mut PollReadyFn, } - let pollable = Pollable { - index: resource.rep(), - remove_index_on_delete: if resource.owned() { - Some(|table, idx| { - let resource = Resource::::new_own(idx); - table.delete(resource)?; - Ok(()) - }) - } else { - None - }, - make_future: make_future::, - }; + impl WasiFuture for PollReadyFnFuture<'_, F> + where + F: FnMut(&mut Context<'_>, &mut dyn WasiView) -> Poll<()> + Send + 'static, + { + type Output = (); + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + view: &mut dyn WasiView, + ) -> Poll<()> { + (self.pollable.poll_ready_fn)(cx, view) + } + } - Ok(table.push_child(pollable, &resource)?) + Pollable(PollableInner::Own(Box::new(PollReadyFn { poll_ready_fn }))) +} + +/// Create a pollable that initially starts out as pending and transitions to +/// ready once the future resolves. After that the pollable will always be ready. +pub fn once(future: F) -> Pollable +where + F: Future + Send + 'static, +{ + enum Once { + Pending(Pin>), + Ready, + } + impl PollableInternal for Once + where + F: Future + Send + 'static, + { + fn ready<'a>(&'a mut self) -> Pin + Send + 'a>> { + Box::pin(OnceFuture { pollable: self }) + } + } + + struct OnceFuture<'a, F> { + pollable: &'a mut Once, + } + + impl WasiFuture for OnceFuture<'_, F> + where + F: Future + Send + 'static, + { + type Output = (); + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + _view: &mut dyn WasiView, + ) -> Poll<()> { + let Once::Pending(future) = &mut self.pollable else { + return Poll::Ready(()); + }; + + let Poll::Ready(()) = future.as_mut().poll(cx) else { + return Poll::Pending; + }; + + *self.pollable = Once::Ready; + Poll::Ready(()) + } + } + + Pollable(PollableInner::Own(Box::new(Once::Pending(Box::pin( + future, + ))))) +} + +/// Creates a new handle which is subscribed to the pollable `parent`. +/// The handle will be added as a child of `parent`. +pub fn subscribe( + table: &mut ResourceTable, + parent: &Resource, +) -> Result> { + let pollable = Pollable(PollableInner::Child { + parent_key: parent.rep(), + as_pollable: |target| target.downcast_mut::().unwrap(), + }); + + Ok(table.push_child(pollable, &parent)?) +} + +type AsPollableFn = for<'a> fn(&'a mut dyn Any) -> &'a mut dyn PollableInternal; +type TargetKey = u32; + +/// A host representation of the `wasi:io/poll.pollable` resource. +pub struct Pollable(PollableInner); + +enum PollableInner { + Own(Box), + Child { + parent_key: TargetKey, + as_pollable: AsPollableFn, + }, +} + +/// Using the term "target" to mean: where the actual PollableInternal implementation lives. +/// Sometimes this is the Pollable itself, sometimes this is a parent. +struct TargetInfo { + key: TargetKey, + as_pollable: AsPollableFn, +} +impl TargetInfo { + fn gather(table: &ResourceTable, handle: &Resource) -> Result { + match &table.get(&handle)?.0 { + PollableInner::Own(_) => Ok(Self { + key: handle.rep(), + as_pollable: |target| match &mut target.downcast_mut::().unwrap().0 { + PollableInner::Own(p) => p.as_mut(), + PollableInner::Child { .. } => unreachable!(), + }, + }), + PollableInner::Child { + parent_key, + as_pollable, + } => Ok(Self { + key: *parent_key, + as_pollable: *as_pollable, + }), + } + } + + fn lease(self, table: &mut ResourceTable) -> Result { + Ok(TargetLease { + data: table.take_any(self.key)?, + info: self, + }) + } +} + +struct TargetLease { + info: TargetInfo, + data: Box, +} +impl TargetLease { + fn take(table: &mut ResourceTable, handle: &Resource) -> Result { + Ok(TargetInfo::gather(table, handle)?.lease(table)?) + } + + fn restore(self, table: &mut ResourceTable) -> Result<()> { + table.restore_any(self.info.key, self.data)?; + Ok(()) + } + + fn as_pollable(&mut self) -> &mut dyn PollableInternal { + (self.info.as_pollable)(self.data.as_mut()) + } } #[async_trait::async_trait] impl poll::Host for T { async fn poll(&mut self, pollables: Vec>) -> Result> { + if pollables.is_empty() { + return Err(anyhow!("empty poll list")); + } + type ReadylistIndex = u32; + struct PollEntry { + lease: TargetLease, + indexes: SmallVec<[ReadylistIndex; 1]>, + } let table = self.table(); - let mut table_futures: HashMap)> = HashMap::new(); - - for (ix, p) in pollables.iter().enumerate() { - let ix: u32 = ix.try_into()?; + let mut entries: HashMap = HashMap::with_capacity(pollables.len()); + for (input_index, pollable) in pollables.into_iter().enumerate() { + let input_index = ReadylistIndex::try_from(input_index).expect("poll list too big"); - let pollable = table.get(p)?; - let (_, list) = table_futures - .entry(pollable.index) - .or_insert((pollable.make_future, Vec::new())); - list.push(ix); + let info = TargetInfo::gather(table, &pollable)?; + match entries.entry(info.key) { + Entry::Vacant(v) => { + v.insert(PollEntry { + lease: info.lease(table)?, + indexes: smallvec![input_index], + }); + } + Entry::Occupied(mut o) => { + o.get_mut().indexes.push(input_index); + } + } } - let mut futures: Vec<(PollableFuture<'_>, Vec)> = Vec::new(); - for (entry, (make_future, readylist_indices)) in table.iter_entries(table_futures) { - let entry = entry?; - futures.push((make_future(entry), readylist_indices)); - } + let self_ref = &mut self; + let mut futures: Vec<_> = entries + .values_mut() + .map(|e| (e.lease.as_pollable().ready(), &e.indexes)) + .collect(); - struct PollList<'a> { - futures: Vec<(PollableFuture<'a>, Vec)>, - } - impl<'a> Future for PollList<'a> { - type Output = Vec; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut any_ready = false; - let mut results = Vec::new(); - for (fut, readylist_indicies) in self.futures.iter_mut() { - match fut.as_mut().poll(cx) { - Poll::Ready(()) => { - results.extend_from_slice(readylist_indicies); - any_ready = true; - } - Poll::Pending => {} - } - } - if any_ready { - Poll::Ready(results) - } else { - Poll::Pending + let results = futures::future::poll_fn(move |cx| { + let mut results = Vec::new(); + + for (future, indexes) in futures.iter_mut() { + match future.as_mut().poll(cx, *self_ref) { + Poll::Ready(()) => results.extend_from_slice(indexes.as_slice()), + Poll::Pending => {} } } + if results.is_empty() { + Poll::Pending + } else { + Poll::Ready(results) + } + }) + .await; + + let table = self.table(); + for entry in entries.into_values() { + entry.lease.restore(table)?; } - Ok(PollList { futures }.await) + Ok(results) } } #[async_trait::async_trait] impl crate::preview2::bindings::io::poll::HostPollable for T { - async fn block(&mut self, pollable: Resource) -> Result<()> { - let table = self.table(); - let pollable = table.get(&pollable)?; - let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?); - ready.await; + async fn block(&mut self, handle: Resource) -> Result<()> { + let mut lease = TargetLease::take(self.table(), &handle)?; + { + let mut future = lease.as_pollable().ready(); + let self_ref = &mut self; + futures::future::poll_fn(move |cx| future.as_mut().poll(cx, *self_ref)).await; + } + lease.restore(self.table())?; Ok(()) } - async fn ready(&mut self, pollable: Resource) -> Result { - let table = self.table(); - let pollable = table.get(&pollable)?; - let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?); - futures::pin_mut!(ready); - Ok(matches!( - futures::future::poll_immediate(ready).await, - Some(()) - )) - } - fn drop(&mut self, pollable: Resource) -> Result<()> { - let pollable = self.table().delete(pollable)?; - if let Some(delete) = pollable.remove_index_on_delete { - delete(self.table(), pollable.index)?; - } + async fn ready(&mut self, handle: Resource) -> Result { + let mut lease = TargetLease::take(self.table(), &handle)?; + let is_ready = lease.as_pollable().is_ready(self); + lease.restore(self.table())?; + Ok(is_ready) + } + fn drop(&mut self, handle: Resource) -> Result<()> { + self.table().delete(handle)?; Ok(()) } } -pub mod sync { - use crate::preview2::{ - bindings::io::poll as async_poll, - bindings::sync_io::io::poll::{self, Pollable}, - in_tokio, WasiView, - }; +pub(crate) mod sync { + use crate::preview2::{bindings::io::poll as async_poll, in_tokio, Pollable, WasiView}; use anyhow::Result; use wasmtime::component::Resource; - impl poll::Host for T { + impl crate::preview2::bindings::sync_io::io::poll::Host for T { fn poll(&mut self, pollables: Vec>) -> Result> { in_tokio(async { async_poll::Host::poll(self, pollables).await }) } diff --git a/crates/wasi/tests/all/async_.rs b/crates/wasi/tests/all/async_.rs index 1aaabe0d5ddf..063a23b03748 100644 --- a/crates/wasi/tests/all/async_.rs +++ b/crates/wasi/tests/all/async_.rs @@ -359,6 +359,22 @@ async fn preview2_stream_pollable_traps() { ) } #[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn preview2_pollable_correct() { + run(PREVIEW2_POLLABLE_CORRECT_COMPONENT, false) + .await + .unwrap() +} +#[test_log::test(tokio::test(flavor = "multi_thread"))] +async fn preview2_pollable_traps() { + let e = run(PREVIEW2_POLLABLE_TRAPS_COMPONENT, false) + .await + .unwrap_err(); + assert_eq!( + format!("{}", e.source().expect("trap source")), + "empty poll list" + ) +} +#[test_log::test(tokio::test(flavor = "multi_thread"))] async fn preview2_adapter_badfd() { run(PREVIEW2_ADAPTER_BADFD_COMPONENT, false).await.unwrap() } diff --git a/crates/wasi/tests/all/sync.rs b/crates/wasi/tests/all/sync.rs index c12454643721..d0446c444509 100644 --- a/crates/wasi/tests/all/sync.rs +++ b/crates/wasi/tests/all/sync.rs @@ -298,6 +298,18 @@ fn preview2_stream_pollable_traps() { ) } #[test_log::test] +fn preview2_pollable_correct() { + run(PREVIEW2_POLLABLE_CORRECT_COMPONENT, false).unwrap() +} +#[test_log::test] +fn preview2_pollable_traps() { + let e = run(PREVIEW2_POLLABLE_TRAPS_COMPONENT, false).unwrap_err(); + assert_eq!( + format!("{}", e.source().expect("trap source")), + "empty poll list" + ) +} +#[test_log::test] fn preview2_adapter_badfd() { run(PREVIEW2_ADAPTER_BADFD_COMPONENT, false).unwrap() } diff --git a/crates/wasi/wit/deps/io/poll.wit b/crates/wasi/wit/deps/io/poll.wit index ddc67f8b7ace..8139064c39c3 100644 --- a/crates/wasi/wit/deps/io/poll.wit +++ b/crates/wasi/wit/deps/io/poll.wit @@ -27,8 +27,9 @@ interface poll { /// The result `list` contains one or more indices of handles in the /// argument list that is ready for I/O. /// - /// If the list contains more elements than can be indexed with a `u32` - /// value, this function traps. + /// This function traps if either: + /// - the list is empty, or: + /// - the list contains more elements than can be indexed with a `u32` value. /// /// A timeout can be implemented by adding a pollable from the /// wasi-clocks API to the list. diff --git a/crates/wasmtime/src/runtime/component/resource_table.rs b/crates/wasmtime/src/runtime/component/resource_table.rs index 4b57ce792679..cf321958fa8b 100644 --- a/crates/wasmtime/src/runtime/component/resource_table.rs +++ b/crates/wasmtime/src/runtime/component/resource_table.rs @@ -1,6 +1,6 @@ use super::Resource; -use std::any::Any; -use std::collections::{BTreeSet, HashMap}; +use std::any::{Any, TypeId}; +use std::collections::BTreeSet; #[derive(Debug)] /// Errors returned by operations on `ResourceTable` @@ -14,6 +14,11 @@ pub enum ResourceTableError { /// Resource cannot be deleted because child resources exist in the table. Consult wit docs for /// the particular resource to see which methods may return child resources. HasChildren, + /// Resource has been temporarily taken from the table. + Taken, + /// Resource is not taken. This can happen when attempting to restore a resource + /// that has already been restored, or was never taken in the first place. + NotTaken, } impl std::fmt::Display for ResourceTableError { @@ -23,6 +28,8 @@ impl std::fmt::Display for ResourceTableError { Self::NotPresent => write!(f, "resource not present"), Self::WrongType => write!(f, "resource is of another type"), Self::HasChildren => write!(f, "resource has children"), + Self::Taken => write!(f, "resource is taken"), + Self::NotTaken => write!(f, "resource is not taken"), } } } @@ -57,6 +64,32 @@ impl Entry { } } +#[derive(Debug)] +enum Slot { + /// The resource is present in the table, ready for use. + Present(Box), + /// The resource is temporarily taken out for external mutation. + /// To ensure we're getting back the same type of resource as the one we've + /// handed out, we remember the TypeId of the data and validate it on restore. + Taken(TypeId), +} + +impl Slot { + fn get(&self) -> Result<&(dyn Any + Send + 'static), ResourceTableError> { + match self { + Slot::Present(data) => Ok(data.as_ref()), + Slot::Taken(_) => Err(ResourceTableError::Taken), + } + } + + fn get_mut(&mut self) -> Result<&mut (dyn Any + Send + 'static), ResourceTableError> { + match self { + Slot::Present(data) => Ok(data.as_mut()), + Slot::Taken(_) => Err(ResourceTableError::Taken), + } + } +} + /// This structure tracks parent and child relationships for a given table entry. /// /// Parents and children are referred to by table index. We maintain the @@ -68,8 +101,8 @@ impl Entry { /// * an entry with children may not be deleted. #[derive(Debug)] struct TableEntry { - /// The entry in the table, as a boxed dynamically-typed object - entry: Box, + /// The entry in the table. + slot: Slot, /// The index of the parent of this entry, if it has one. parent: Option, /// The indicies of any children of this entry. @@ -79,7 +112,7 @@ struct TableEntry { impl TableEntry { fn new(entry: Box, parent: Option) -> Self { Self { - entry, + slot: Slot::Present(entry), parent, children: BTreeSet::new(), } @@ -231,7 +264,7 @@ impl ResourceTable { fn get_(&self, key: u32) -> Result<&dyn Any, ResourceTableError> { let r = self.occupied(key)?; - Ok(&*r.entry) + Ok(r.slot.get()?) } /// Get an mutable reference to a resource of a given type at a given @@ -248,7 +281,7 @@ impl ResourceTable { /// Returns the raw `Any` at the `key` index provided. pub fn get_any_mut(&mut self, key: u32) -> Result<&mut dyn Any, ResourceTableError> { let r = self.occupied_mut(key)?; - Ok(&mut *r.entry) + Ok(r.slot.get_mut()?) } /// Same as `delete`, but typed @@ -257,17 +290,19 @@ impl ResourceTable { T: Any, { debug_assert!(resource.owned()); - let entry = self.delete_entry(resource.rep())?; - match entry.entry.downcast() { + let data = self.delete_entry(resource.rep())?; + match data.downcast() { Ok(t) => Ok(*t), Err(_e) => Err(ResourceTableError::WrongType), } } - fn delete_entry(&mut self, key: u32) -> Result { - if !self.occupied(key)?.children.is_empty() { + fn delete_entry(&mut self, key: u32) -> Result, ResourceTableError> { + let entry = self.occupied_mut(key)?; + if !entry.children.is_empty() { return Err(ResourceTableError::HasChildren); } + let data = self.take_any(key)?; let e = self.free_entry(key as usize); if let Some(parent) = e.parent { // Remove deleted resource from parent's child list. @@ -277,40 +312,112 @@ impl ResourceTable { .expect("missing parent") .remove_child(key); } - Ok(e) + Ok(data) } - /// Zip the values of the map with mutable references to table entries corresponding to each - /// key. As the keys in the [HashMap] are unique, this iterator can give mutable references - /// with the same lifetime as the mutable reference to the [ResourceTable]. - pub fn iter_entries<'a, T>( - &'a mut self, - map: HashMap, - ) -> impl Iterator, T)> { - map.into_iter().map(move |(k, v)| { - let item = self - .occupied_mut(k) - .map(|e| Box::as_mut(&mut e.entry)) - // Safety: extending the lifetime of the mutable reference. - .map(|item| unsafe { &mut *(item as *mut dyn Any) }); - (item, v) - }) - } - - /// Iterate over all children belonging to the provided parent + /// Iterate over all children belonging to the provided parent. + /// This returns an iterator of results, because some children may be taken. pub fn iter_children( &self, parent: &Resource, - ) -> Result, ResourceTableError> + ) -> Result< + impl Iterator>, + ResourceTableError, + > where T: 'static, { let parent_entry = self.occupied(parent.rep())?; Ok(parent_entry.children.iter().map(|child_index| { let child = self.occupied(*child_index).expect("missing child"); - child.entry.as_ref() + child.slot.get() })) } + + /// Temporarily take the resource out of the table. + /// + /// This is an advanced operation to allow mutating resources independent of + /// the table's mutable reference lifetime. For simple access to the resource, + /// try [ResourceTable::get_mut] instead. + /// + /// Unlike deleting the resource and pushing it back in, this method retains + /// the resource's index in the table and the parent/children relationships. + /// + /// While a resource is taken out, any attempt to access that resource's + /// index through the table returns [ResourceTableError::Taken]. It's the + /// caller's responsibility to put the resource back in using [ResourceTable::restore]. + pub fn take(&mut self, resource: &Resource) -> Result, ResourceTableError> + where + T: Any + Send + 'static, + { + match self.take_any(resource.rep())?.downcast() { + Ok(data) => Ok(data), + Err(data) => { + self.restore_any(resource.rep(), data) + .expect("resource was just taken"); + Err(ResourceTableError::WrongType) + } + } + } + + /// Put the resource back into the table. + pub fn restore( + &mut self, + resource: &Resource, + data: Box, + ) -> Result<(), ResourceTableError> + where + T: Any + Send + 'static, + { + self.restore_any(resource.rep(), data) + } + + /// Temporarily take the resource out of the table. + /// + /// This is an advanced operation to allow mutating resources independent of + /// the table's mutable reference lifetime. For simple access to the resource, + /// try [ResourceTable::get_any_mut] instead. + /// + /// Unlike deleting the resource and pushing it back in, this method retains + /// the resource's index in the table and the parent/children relationships. + /// + /// While a resource is taken out, any attempt to access that resource's + /// index through the table returns [ResourceTableError::Taken]. It's the + /// caller's responsibility to put the resource back in using [ResourceTable::restore_any]. + pub fn take_any( + &mut self, + key: u32, + ) -> Result, ResourceTableError> { + let entry = self.occupied_mut(key)?; + + let replacement: Slot = match &entry.slot { + Slot::Present(data) => Slot::Taken(data.as_ref().type_id()), + Slot::Taken(_) => return Err(ResourceTableError::Taken), + }; + match std::mem::replace(&mut entry.slot, replacement) { + Slot::Present(data) => Ok(data), + Slot::Taken(_) => unreachable!(), + } + } + + /// Put the resource back into the table. + pub fn restore_any( + &mut self, + key: u32, + data: Box, + ) -> Result<(), ResourceTableError> { + let entry = self.occupied_mut(key)?; + + let replacement = match &entry.slot { + Slot::Taken(id) if *id == data.as_ref().type_id() => Slot::Present(data), + Slot::Taken(_) => return Err(ResourceTableError::WrongType), + Slot::Present(_) => return Err(ResourceTableError::NotTaken), + }; + match std::mem::replace(&mut entry.slot, replacement) { + Slot::Taken(_) => Ok(()), + Slot::Present(_) => unreachable!(), + } + } } impl Default for ResourceTable { @@ -320,7 +427,7 @@ impl Default for ResourceTable { } #[test] -pub fn test_free_list() { +fn test_free_list() { let mut table = ResourceTable::new(); let x = table.push(()).unwrap(); @@ -348,3 +455,24 @@ pub fn test_free_list() { let x = table.push(()).unwrap(); assert_eq!(x.rep(), 2); } + +#[test] +fn test_take_restore() { + let mut table = ResourceTable::new(); + let a_u32: Resource = table.push(42).unwrap(); + let a_f64: Resource = Resource::new_borrow(a_u32.rep()); + + table.take(&a_u32).unwrap(); + + assert!(matches!(table.get(&a_f64), Err(ResourceTableError::Taken))); + + assert!(matches!( + table.restore(&a_f64, Box::new(42f64)), + Err(ResourceTableError::WrongType) + )); + table.restore(&a_u32, Box::new(42u32)).unwrap(); + assert!(matches!( + table.restore(&a_u32, Box::new(42u32)), + Err(ResourceTableError::NotTaken) + )); +}