From adcae3cd5504cd6624e81fbfb9695a258dbfa55d Mon Sep 17 00:00:00 2001 From: Alison Davis Date: Tue, 2 Jul 2024 08:42:34 -0500 Subject: [PATCH 1/4] Support shared futures on no_std --- futures-util/Cargo.toml | 1 + futures-util/src/future/future/mod.rs | 6 +++--- futures-util/src/future/future/shared.rs | 21 +++++++++++++-------- futures-util/src/future/mod.rs | 2 +- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 68dbd86b7..39dbee990 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -43,6 +43,7 @@ futures_01 = { version = "0.1.25", optional = true, package = "futures" } tokio-io = { version = "0.1.9", optional = true } pin-utils = "0.1.0" pin-project-lite = "0.2.6" +spin = "0.9.8" [dev-dependencies] futures = { path = "../futures", features = ["async-await", "thread-pool"] } diff --git a/futures-util/src/future/future/mod.rs b/futures-util/src/future/future/mod.rs index cf6bd8680..ba3c7d631 100644 --- a/futures-util/src/future/future/mod.rs +++ b/futures-util/src/future/future/mod.rs @@ -107,9 +107,9 @@ mod remote_handle; #[cfg(feature = "std")] pub use self::remote_handle::{Remote, RemoteHandle}; -#[cfg(feature = "std")] +#[cfg(feature = "alloc")] mod shared; -#[cfg(feature = "std")] +#[cfg(feature = "alloc")] pub use self::shared::{Shared, WeakShared}; impl FutureExt for T where T: Future {} @@ -474,7 +474,7 @@ pub trait FutureExt: Future { /// join_handle.join().unwrap(); /// # }); /// ``` - #[cfg(feature = "std")] + #[cfg(feature = "alloc")] fn shared(self) -> Shared where Self: Sized, diff --git a/futures-util/src/future/future/shared.rs b/futures-util/src/future/future/shared.rs index b4d9bff89..32f46e719 100644 --- a/futures-util/src/future/future/shared.rs +++ b/futures-util/src/future/future/shared.rs @@ -1,15 +1,20 @@ use crate::task::{waker_ref, ArcWake}; +use alloc::sync::{Arc, Weak}; +use core::cell::UnsafeCell; +use core::fmt; +use core::hash::Hasher; +use core::pin::Pin; +use core::ptr; +use core::sync::atomic::AtomicUsize; +use core::sync::atomic::Ordering::{Acquire, SeqCst}; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll, Waker}; use slab::Slab; -use std::cell::UnsafeCell; -use std::fmt; -use std::hash::Hasher; -use std::pin::Pin; -use std::ptr; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::{Acquire, SeqCst}; -use std::sync::{Arc, Mutex, Weak}; + +#[cfg(feature = "std")] +type Mutex = std::sync::Mutex; +#[cfg(not(feature = "std"))] +type Mutex = spin::Mutex; /// Future for the [`shared`](super::FutureExt::shared) method. #[must_use = "futures do nothing unless you `.await` or poll them"] diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 1280ce986..b717fc178 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -35,7 +35,7 @@ pub use self::future::CatchUnwind; #[cfg(feature = "std")] pub use self::future::{Remote, RemoteHandle}; -#[cfg(feature = "std")] +#[cfg(feature = "alloc")] pub use self::future::{Shared, WeakShared}; mod try_future; From af51ae2411059eeab533b4938b2dbf0e97251a1d Mon Sep 17 00:00:00 2001 From: Alison Davis Date: Tue, 2 Jul 2024 18:21:48 -0500 Subject: [PATCH 2/4] Fixes --- futures-util/Cargo.toml | 8 +++++--- futures-util/src/future/future/mod.rs | 6 +++--- futures-util/src/future/future/shared.rs | 16 ++++++++++++++++ futures-util/src/future/mod.rs | 2 +- 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 39dbee990..f4d5466f1 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -12,8 +12,8 @@ Common utilities and extension traits for the futures-rs library. [features] default = ["std", "async-await", "async-await-macro"] -std = ["alloc", "futures-core/std", "futures-task/std", "slab"] -alloc = ["futures-core/alloc", "futures-task/alloc"] +std = ["alloc", "futures-core/std", "futures-task/std", "slab/std"] +alloc = ["futures-core/alloc", "futures-task/alloc", "slab"] async-await = [] async-await-macro = ["async-await", "futures-macro"] compat = ["std", "futures_01"] @@ -37,12 +37,14 @@ futures-channel = { path = "../futures-channel", version = "=0.4.0-alpha.0", def futures-io = { path = "../futures-io", version = "0.3.30", default-features = false, features = ["std"], optional = true } futures-sink = { path = "../futures-sink", version = "=0.4.0-alpha.0", default-features = false, optional = true } futures-macro = { path = "../futures-macro", version = "=0.4.0-alpha.0", default-features = false, optional = true } -slab = { version = "0.4.2", optional = true } +slab = { version = "0.4.2", default-features = false, optional = true } memchr = { version = "2.2", optional = true } futures_01 = { version = "0.1.25", optional = true, package = "futures" } tokio-io = { version = "0.1.9", optional = true } pin-utils = "0.1.0" pin-project-lite = "0.2.6" + +[target.'cfg(target_has_atomic = "8")'.dependencies] spin = "0.9.8" [dev-dependencies] diff --git a/futures-util/src/future/future/mod.rs b/futures-util/src/future/future/mod.rs index ba3c7d631..ee757eb0a 100644 --- a/futures-util/src/future/future/mod.rs +++ b/futures-util/src/future/future/mod.rs @@ -107,9 +107,9 @@ mod remote_handle; #[cfg(feature = "std")] pub use self::remote_handle::{Remote, RemoteHandle}; -#[cfg(feature = "alloc")] +#[cfg(all(feature = "alloc", target_has_atomic = "8"))] mod shared; -#[cfg(feature = "alloc")] +#[cfg(all(feature = "alloc", target_has_atomic = "8"))] pub use self::shared::{Shared, WeakShared}; impl FutureExt for T where T: Future {} @@ -474,7 +474,7 @@ pub trait FutureExt: Future { /// join_handle.join().unwrap(); /// # }); /// ``` - #[cfg(feature = "alloc")] + #[cfg(all(feature = "alloc", target_has_atomic = "8"))] fn shared(self) -> Shared where Self: Sized, diff --git a/futures-util/src/future/future/shared.rs b/futures-util/src/future/future/shared.rs index 32f46e719..f26b20d92 100644 --- a/futures-util/src/future/future/shared.rs +++ b/futures-util/src/future/future/shared.rs @@ -209,7 +209,10 @@ where { /// Registers the current task to receive a wakeup when we are awoken. fn record_waker(&self, waker_key: &mut usize, cx: &mut Context<'_>) { + #[cfg(feature = "std")] let mut wakers_guard = self.notifier.wakers.lock().unwrap(); + #[cfg(not(feature = "std"))] + let mut wakers_guard = self.notifier.wakers.lock(); let wakers_mut = wakers_guard.as_mut(); @@ -350,7 +353,11 @@ where inner.notifier.state.store(COMPLETE, SeqCst); // Wake all tasks and drop the slab + #[cfg(feature = "std")] let mut wakers_guard = inner.notifier.wakers.lock().unwrap(); + #[cfg(not(feature = "std"))] + let mut wakers_guard = inner.notifier.wakers.lock(); + let mut wakers = wakers_guard.take().unwrap(); for waker in wakers.drain().flatten() { waker.wake(); @@ -380,11 +387,16 @@ where fn drop(&mut self) { if self.waker_key != NULL_WAKER_KEY { if let Some(ref inner) = self.inner { + #[cfg(feature = "std")] if let Ok(mut wakers) = inner.notifier.wakers.lock() { if let Some(wakers) = wakers.as_mut() { wakers.remove(self.waker_key); } } + #[cfg(not(feature = "std"))] + if let Some(wakers) = inner.notifier.wakers.lock().as_mut() { + wakers.remove(self.waker_key); + } } } } @@ -392,7 +404,11 @@ where impl ArcWake for Notifier { fn wake_by_ref(arc_self: &Arc) { + #[cfg(feature = "std")] let wakers = &mut *arc_self.wakers.lock().unwrap(); + #[cfg(not(feature = "std"))] + let wakers = &mut *arc_self.wakers.lock(); + if let Some(wakers) = wakers.as_mut() { for (_key, opt_waker) in wakers { if let Some(waker) = opt_waker.take() { diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index b717fc178..e3ca85f6f 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -35,7 +35,7 @@ pub use self::future::CatchUnwind; #[cfg(feature = "std")] pub use self::future::{Remote, RemoteHandle}; -#[cfg(feature = "alloc")] +#[cfg(all(feature = "alloc", target_has_atomic = "8"))] pub use self::future::{Shared, WeakShared}; mod try_future; From 7b85ccb250d94abfd3c035ba1e285cfc5a6035f3 Mon Sep 17 00:00:00 2001 From: Alison Davis Date: Sun, 22 Dec 2024 16:41:42 -0600 Subject: [PATCH 3/4] Feature gate spinlock in shared future --- futures-util/Cargo.toml | 5 ++--- futures-util/src/future/future/mod.rs | 8 ++++---- futures-util/src/future/mod.rs | 2 +- futures/Cargo.toml | 1 + 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index f4d5466f1..2ecd6f54b 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -22,6 +22,7 @@ sink = ["futures-sink"] io = ["std", "futures-io", "memchr"] channel = ["std", "futures-channel"] portable-atomic = ["futures-core/portable-atomic"] +spin = ["dep:spin"] # Unstable features # These features are outside of the normal semver guarantees and require the @@ -43,9 +44,7 @@ futures_01 = { version = "0.1.25", optional = true, package = "futures" } tokio-io = { version = "0.1.9", optional = true } pin-utils = "0.1.0" pin-project-lite = "0.2.6" - -[target.'cfg(target_has_atomic = "8")'.dependencies] -spin = "0.9.8" +spin = { version = "0.9.8", optional = true } [dev-dependencies] futures = { path = "../futures", features = ["async-await", "thread-pool"] } diff --git a/futures-util/src/future/future/mod.rs b/futures-util/src/future/future/mod.rs index ee757eb0a..e0f408739 100644 --- a/futures-util/src/future/future/mod.rs +++ b/futures-util/src/future/future/mod.rs @@ -107,9 +107,9 @@ mod remote_handle; #[cfg(feature = "std")] pub use self::remote_handle::{Remote, RemoteHandle}; -#[cfg(all(feature = "alloc", target_has_atomic = "8"))] +#[cfg(any(feature = "std", feature = "spin"))] mod shared; -#[cfg(all(feature = "alloc", target_has_atomic = "8"))] +#[cfg(any(feature = "std", feature = "spin"))] pub use self::shared::{Shared, WeakShared}; impl FutureExt for T where T: Future {} @@ -440,7 +440,7 @@ pub trait FutureExt: Future { /// into a cloneable future. It enables a future to be polled by multiple /// threads. /// - /// This method is only available when the `std` feature of this + /// This method is only available when the `std` or 'spin' feature of this /// library is activated, and it is activated by default. /// /// # Examples @@ -474,7 +474,7 @@ pub trait FutureExt: Future { /// join_handle.join().unwrap(); /// # }); /// ``` - #[cfg(all(feature = "alloc", target_has_atomic = "8"))] + #[cfg(any(feature = "std", feature = "spin"))] fn shared(self) -> Shared where Self: Sized, diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index e3ca85f6f..ce7182905 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -35,7 +35,7 @@ pub use self::future::CatchUnwind; #[cfg(feature = "std")] pub use self::future::{Remote, RemoteHandle}; -#[cfg(all(feature = "alloc", target_has_atomic = "8"))] +#[cfg(any(feature = "std", feature = "spin"))] pub use self::future::{Shared, WeakShared}; mod try_future; diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 84edf9000..c56228866 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -40,6 +40,7 @@ compat = ["std", "futures-util/compat"] io-compat = ["compat", "futures-util/io-compat"] executor = ["std", "futures-executor/std"] thread-pool = ["executor", "futures-executor/thread-pool"] +spin = ["futures-util/spin"] # Unstable features # These features are outside of the normal semver guarantees and require the From bcca58fa25ea19da9306824affed55e82fd22081 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Mon, 13 Jan 2025 01:24:48 +0900 Subject: [PATCH 4/4] Update futures-util/Cargo.toml --- futures-util/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 2ecd6f54b..55d827e13 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -22,7 +22,6 @@ sink = ["futures-sink"] io = ["std", "futures-io", "memchr"] channel = ["std", "futures-channel"] portable-atomic = ["futures-core/portable-atomic"] -spin = ["dep:spin"] # Unstable features # These features are outside of the normal semver guarantees and require the