diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..fd958937 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,11 @@ +root = true + +[*] +end_of_line = lf +indent_style = space +insert_final_newline = true +trim_trailing_whitespace = true + +[*.toml] +indent_size = tab +tab_width = 2 diff --git a/Cargo.toml b/Cargo.toml index 14d75d3f..3f9397c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,9 +25,10 @@ readme = "README.md" ahash = "0.8" bytesize = { package = "foyer-bytesize", version = "2" } clap = { version = "4", features = ["derive"] } -crossbeam = "0.8" equivalent = "1" fastrace = "0.7" +futures-core = { version = "0.3" } +futures-util = { version = "0.3", default-features = false, features = ["std"] } hashbrown = "0.15" itertools = "0.14" parking_lot = { version = "0.12" } diff --git a/foyer-bench/Cargo.toml b/foyer-bench/Cargo.toml index 7fe9a01d..99e771ea 100644 --- a/foyer-bench/Cargo.toml +++ b/foyer-bench/Cargo.toml @@ -20,7 +20,7 @@ console-subscriber = { version = "0.4", optional = true } fastrace = { workspace = true, optional = true } fastrace-jaeger = { version = "0.7", optional = true } foyer = { workspace = true } -futures = "0.3" +futures-util = { workspace = true } hdrhistogram = "7" http-body-util = "0.1" humantime = "2" diff --git a/foyer-bench/src/main.rs b/foyer-bench/src/main.rs index 381aef33..932d564f 100644 --- a/foyer-bench/src/main.rs +++ b/foyer-bench/src/main.rs @@ -42,7 +42,7 @@ use foyer::{ HybridCacheBuilder, InvalidRatioPicker, LargeEngineOptions, LfuConfig, LruConfig, RateLimitPicker, RecoverMode, RuntimeOptions, S3FifoConfig, SmallEngineOptions, TokioRuntimeOptions, TracingOptions, }; -use futures::future::join_all; +use futures_util::future::join_all; use itertools::Itertools; use mixtrics::registry::prometheus::PrometheusMetricsRegistry; use prometheus::Registry; diff --git a/foyer-common/Cargo.toml b/foyer-common/Cargo.toml index 0ebea304..6acd369e 100644 --- a/foyer-common/Cargo.toml +++ b/foyer-common/Cargo.toml @@ -17,7 +17,6 @@ ahash = { workspace = true } bytes = "1" cfg-if = "1" fastrace = { workspace = true } -futures = "0.3" itertools = { workspace = true } mixtrics = { workspace = true } parking_lot = { workspace = true } @@ -26,7 +25,7 @@ serde = { workspace = true } tokio = { workspace = true } [dev-dependencies] -futures = "0.3" +futures-util = { workspace = true } mixtrics = { workspace = true, features = ["test-utils"] } rand = "0.8.5" diff --git a/foyer-common/src/countdown.rs b/foyer-common/src/countdown.rs index d6738153..89da861c 100644 --- a/foyer-common/src/countdown.rs +++ b/foyer-common/src/countdown.rs @@ -56,7 +56,7 @@ impl Countdown { mod tests { use std::time::Duration; - use futures::future::join_all; + use futures_util::future::join_all; use super::*; @@ -66,7 +66,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(10)).await; cd.countdown() })) - .await; + .await; assert_eq!(counter, res.into_iter().filter(|b| !b).count()); } diff --git a/foyer-common/src/future.rs b/foyer-common/src/future.rs index d8fb59ca..903601a6 100644 --- a/foyer-common/src/future.rs +++ b/foyer-common/src/future.rs @@ -93,10 +93,9 @@ where #[cfg(test)] mod tests { + use std::future::poll_fn; use std::pin::pin; - use futures::future::poll_fn; - use super::*; #[tokio::test] diff --git a/foyer-common/src/lib.rs b/foyer-common/src/lib.rs index b069f285..1cf39f81 100644 --- a/foyer-common/src/lib.rs +++ b/foyer-common/src/lib.rs @@ -14,7 +14,7 @@ //! Shared components and utils for foyer. -/// Allow enable debug assertions in release profile with feature "strict_assertion". +/// Allow to enable debug assertions in release profile with feature "strict_assertion". pub mod assert; /// The util that convert the blocking call to async call. pub mod asyncify; @@ -48,5 +48,3 @@ pub mod runtime; pub mod scope; /// Tracing related components. pub mod tracing; -/// An async wait group implementation. -pub mod wait_group; diff --git a/foyer-common/src/tracing.rs b/foyer-common/src/tracing.rs index 12f1ddbf..0fef8648 100644 --- a/foyer-common/src/tracing.rs +++ b/foyer-common/src/tracing.rs @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use fastrace::prelude::*; +use pin_project::pin_project; +use serde::{Deserialize, Serialize}; +use std::future::Future; use std::{ ops::Deref, pin::Pin, @@ -20,11 +24,6 @@ use std::{ time::Duration, }; -use fastrace::prelude::*; -use futures::{ready, Future}; -use pin_project::pin_project; -use serde::{Deserialize, Serialize}; - /// Configurations for tracing. #[derive(Debug, Default)] pub struct TracingConfig { @@ -201,7 +200,10 @@ where let this = self.project(); let _guard = this.root.as_ref().map(|s| s.set_local_parent()); - let res = ready!(this.inner.poll(cx)); + let res = match this.inner.poll(cx) { + Poll::Ready(res) => res, + Poll::Pending => return Poll::Pending, + }; let mut root = this.root.take().unwrap(); diff --git a/foyer-common/src/wait_group.rs b/foyer-common/src/wait_group.rs deleted file mode 100644 index c4906946..00000000 --- a/foyer-common/src/wait_group.rs +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright 2025 foyer Project Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{ - pin::Pin, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - task::{Context, Poll}, -}; - -use futures::{task::AtomicWaker, Future}; - -#[derive(Debug, Default)] -struct WaitGroupInner { - counter: AtomicUsize, - waker: AtomicWaker, -} - -/// A [`WaitGroup`] waits for all acquired [`WaitGroupGuard`] to drop. -#[derive(Debug, Default)] -pub struct WaitGroup { - inner: Arc, -} - -impl WaitGroup { - /// Acquire a [`WaitGroupGuard`] for the [`WaitGroup`] to wait for. - pub fn acquire(&self) -> WaitGroupGuard { - self.inner.counter.fetch_add(1, Ordering::SeqCst); - WaitGroupGuard { - inner: self.inner.clone(), - } - } - - /// Consume the [`WaitGroup`] and generate a [`WaitGroupFuture`]. - pub fn wait(self) -> WaitGroupFuture { - WaitGroupFuture { - inner: self.inner, - initialized: false, - } - } -} - -/// A [`WaitGroupGuard`] is generated by [`WaitGroup::acquire`]. -#[derive(Debug)] -pub struct WaitGroupGuard { - inner: Arc, -} - -impl Drop for WaitGroupGuard { - fn drop(&mut self) { - if self.inner.counter.fetch_sub(1, Ordering::SeqCst) - 1 == 0 { - // Wake up the future if this is the last count. - // - // - If the waker is not set yet, this is a no-op. The counter might be increased again later. - // - If the waker is already set, the counter will be no longer increased, so this is the actual last count. - self.inner.waker.wake(); - } - } -} - -/// A [`WaitGroupFuture`] is generated by [`WaitGroup::wait`]. -/// -/// A [`WaitGroupFuture`] will not be ready until all related [`WaitGroupGuard`]s are dropped. -#[must_use] -#[derive(Debug)] -pub struct WaitGroupFuture { - inner: Arc, - initialized: bool, -} - -impl Future for WaitGroupFuture { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if !self.initialized { - self.initialized = true; - self.inner.waker.register(cx.waker()); - } - - if self.inner.counter.load(Ordering::SeqCst) == 0 { - Poll::Ready(()) - } else { - Poll::Pending - } - } -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use tokio::time::sleep; - - use super::*; - - #[tokio::test] - async fn test_wait_group_empty() { - let wg = WaitGroup::default(); - wg.wait().await; - } - - #[tokio::test] - async fn test_wait_group_basic() { - let v = Arc::new(AtomicUsize::new(0)); - let wg = WaitGroup::default(); - - let g = wg.acquire(); - let vv = v.clone(); - tokio::spawn(async move { - sleep(Duration::from_millis(100)).await; - vv.fetch_add(1, Ordering::SeqCst); - drop(g); - }); - - sleep(Duration::from_millis(10)).await; - wg.wait().await; - assert_eq!(v.load(Ordering::SeqCst), 1); - } - - #[tokio::test] - async fn test_wait_group_dip_rise() { - let v = Arc::new(AtomicUsize::new(0)); - let wg = WaitGroup::default(); - - let g1 = wg.acquire(); - let vv = v.clone(); - tokio::spawn(async move { - sleep(Duration::from_millis(10)).await; - vv.fetch_add(1, Ordering::SeqCst); - drop(g1); - }); - - let g2 = wg.acquire(); - let vv = v.clone(); - tokio::spawn(async move { - sleep(Duration::from_millis(100)).await; - vv.fetch_add(1, Ordering::SeqCst); - drop(g2); - }); - - sleep(Duration::from_millis(50)).await; - wg.wait().await; - assert_eq!(v.load(Ordering::SeqCst), 2); - } -} diff --git a/foyer-memory/Cargo.toml b/foyer-memory/Cargo.toml index a4a9d286..a00f2b09 100644 --- a/foyer-memory/Cargo.toml +++ b/foyer-memory/Cargo.toml @@ -20,7 +20,6 @@ cmsketch = "0.2.1" equivalent = { workspace = true } fastrace = { workspace = true } foyer-common = { workspace = true } -futures = "0.3" hashbrown = { workspace = true } intrusive-collections = { package = "foyer-intrusive-collections", version = "0.10.0-dev" } itertools = { workspace = true } @@ -34,6 +33,7 @@ tracing = { workspace = true } [dev-dependencies] csv = "1.3.0" +futures-util = { workspace = true } moka = { version = "0.12", features = ["sync"] } rand = { version = "0.8", features = ["small_rng"] } test-log = { workspace = true } diff --git a/foyer-memory/src/cache.rs b/foyer-memory/src/cache.rs index c3512a40..d19ec787 100644 --- a/foyer-memory/src/cache.rs +++ b/foyer-memory/src/cache.rs @@ -891,7 +891,7 @@ where mod tests { use std::{ops::Range, time::Duration}; - use futures::future::join_all; + use futures_util::future::join_all; use itertools::Itertools; use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng}; diff --git a/foyer-storage/Cargo.toml b/foyer-storage/Cargo.toml index ffc3798d..c56cbbc3 100644 --- a/foyer-storage/Cargo.toml +++ b/foyer-storage/Cargo.toml @@ -24,14 +24,14 @@ auto_enums = { version = "0.8", features = ["futures03"] } bincode = "1" bytes = "1" clap = { workspace = true } -either = "1" equivalent = { workspace = true } fastrace = { workspace = true } flume = "0.11" foyer-common = { workspace = true } foyer-memory = { workspace = true } fs4 = { version = "0.12", default-features = false } -futures = "0.3" +futures-core = { workspace = true } +futures-util = { workspace = true } itertools = { workspace = true } libc = "0.2" lz4 = "1.24" diff --git a/foyer-storage/src/device/direct_fs.rs b/foyer-storage/src/device/direct_fs.rs index df22bb77..fa44af1f 100644 --- a/foyer-storage/src/device/direct_fs.rs +++ b/foyer-storage/src/device/direct_fs.rs @@ -20,7 +20,7 @@ use std::{ use foyer_common::{asyncify::asyncify_with_runtime, bits}; use fs4::free_space; -use futures::future::try_join_all; +use futures_util::future::try_join_all; use itertools::Itertools; use serde::{Deserialize, Serialize}; diff --git a/foyer-storage/src/engine.rs b/foyer-storage/src/engine.rs index f3df0875..cb62ba4d 100644 --- a/foyer-storage/src/engine.rs +++ b/foyer-storage/src/engine.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt::Debug, marker::PhantomData, sync::Arc}; - use auto_enums::auto_enum; use foyer_common::code::{StorageKey, StorageValue}; use foyer_memory::Piece; -use futures::Future; +use std::future::Future; +use std::{fmt::Debug, marker::PhantomData, sync::Arc}; use crate::{ error::Result, diff --git a/foyer-storage/src/large/flusher.rs b/foyer-storage/src/large/flusher.rs index a0478aa2..d19b2f17 100644 --- a/foyer-storage/src/large/flusher.rs +++ b/foyer-storage/src/large/flusher.rs @@ -26,7 +26,7 @@ use foyer_common::{ metrics::Metrics, }; use foyer_memory::Piece; -use futures::future::{try_join, try_join_all}; +use futures_util::future::{try_join, try_join_all}; use tokio::sync::{oneshot, OwnedSemaphorePermit, Semaphore}; use super::{ diff --git a/foyer-storage/src/large/generic.rs b/foyer-storage/src/large/generic.rs index 6f49601a..5aa1f9a2 100644 --- a/foyer-storage/src/large/generic.rs +++ b/foyer-storage/src/large/generic.rs @@ -31,7 +31,7 @@ use foyer_common::{ metrics::Metrics, }; use foyer_memory::Piece; -use futures::future::{join_all, try_join_all}; +use futures_util::future::{join_all, try_join_all}; use tokio::sync::Semaphore; use super::{ diff --git a/foyer-storage/src/large/reclaimer.rs b/foyer-storage/src/large/reclaimer.rs index 47be097d..d8c4355c 100644 --- a/foyer-storage/src/large/reclaimer.rs +++ b/foyer-storage/src/large/reclaimer.rs @@ -18,7 +18,7 @@ use foyer_common::{ code::{StorageKey, StorageValue}, metrics::Metrics, }; -use futures::future::join_all; +use futures_util::future::join_all; use itertools::Itertools; use tokio::sync::{mpsc, oneshot, Semaphore, SemaphorePermit}; diff --git a/foyer-storage/src/large/recover.rs b/foyer-storage/src/large/recover.rs index a9050d67..99512ea5 100644 --- a/foyer-storage/src/large/recover.rs +++ b/foyer-storage/src/large/recover.rs @@ -24,7 +24,7 @@ use foyer_common::{ code::{StorageKey, StorageValue}, metrics::Metrics, }; -use futures::future::try_join_all; +use futures_util::future::try_join_all; use itertools::Itertools; use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; diff --git a/foyer-storage/src/large/tombstone.rs b/foyer-storage/src/large/tombstone.rs index b716ed63..14a6a9b2 100644 --- a/foyer-storage/src/large/tombstone.rs +++ b/foyer-storage/src/large/tombstone.rs @@ -20,7 +20,7 @@ use std::{ use array_util::SliceExt; use bytes::{Buf, BufMut}; use foyer_common::{bits, metrics::Metrics, strict_assert_eq}; -use futures::future::try_join_all; +use futures_util::future::try_join_all; use tokio::sync::Mutex; use crate::{ diff --git a/foyer-storage/src/region.rs b/foyer-storage/src/region.rs index fc267672..51ffa4f9 100644 --- a/foyer-storage/src/region.rs +++ b/foyer-storage/src/region.rs @@ -26,10 +26,9 @@ use std::{ use async_channel::{Receiver, Sender}; use foyer_common::{countdown::Countdown, metrics::Metrics}; -use futures::{ - future::{BoxFuture, Shared}, - FutureExt, -}; +use futures_core::future::BoxFuture; +use futures_util::future::Shared; +use futures_util::FutureExt; use itertools::Itertools; use parking_lot::Mutex; use pin_project::pin_project; diff --git a/foyer-storage/src/small/flusher.rs b/foyer-storage/src/small/flusher.rs index b349ef89..43479252 100644 --- a/foyer-storage/src/small/flusher.rs +++ b/foyer-storage/src/small/flusher.rs @@ -23,7 +23,7 @@ use foyer_common::{ metrics::Metrics, }; use foyer_memory::Piece; -use futures::future::try_join_all; +use futures_util::future::try_join_all; use tokio::sync::{oneshot, OwnedSemaphorePermit, Semaphore}; use super::{ diff --git a/foyer-storage/src/small/generic.rs b/foyer-storage/src/small/generic.rs index 958e8dce..7d12a6de 100644 --- a/foyer-storage/src/small/generic.rs +++ b/foyer-storage/src/small/generic.rs @@ -21,10 +21,10 @@ use std::{ Arc, }, }; - +use std::future::Future; use foyer_common::code::{StorageKey, StorageValue}; use foyer_memory::Piece; -use futures::{future::join_all, Future}; +use futures_util::future::join_all; use itertools::Itertools; use crate::{ diff --git a/foyer-storage/src/storage/either.rs b/foyer-storage/src/storage/either.rs index 1e447eb8..69b516cc 100644 --- a/foyer-storage/src/storage/either.rs +++ b/foyer-storage/src/storage/either.rs @@ -12,16 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt::Debug, sync::Arc}; - use auto_enums::auto_enum; use foyer_common::code::{StorageKey, StorageValue}; use foyer_memory::Piece; -use futures::{ +use futures_util::{ future::{join, ready, select, try_join, Either as EitherFuture}, - pin_mut, Future, FutureExt, + pin_mut, FutureExt, }; use serde::{Deserialize, Serialize}; +use std::future::Future; +use std::{fmt::Debug, sync::Arc}; use crate::{error::Result, storage::Storage, DeviceStats}; @@ -30,11 +30,11 @@ use crate::{error::Result, storage::Storage, DeviceStats}; pub enum Order { /// Use the left engine first. /// - /// If the op does returns the expected result, use then right engine then. + /// If the op does return the expected result, use then right engine then. LeftFirst, /// Use the right engine first. /// - /// If the op does returns the expected result, use then left engine then. + /// If the op does return the expected result, use then left engine then. RightFirst, /// Use the left engine and the right engine in parallel. /// @@ -234,7 +234,7 @@ where Ok(()) } - fn stats(&self) -> std::sync::Arc { + fn stats(&self) -> Arc { // The two engines share the same device, so it is okay to use either device stats of those. self.left.stats() } diff --git a/foyer-storage/src/storage/noop.rs b/foyer-storage/src/storage/noop.rs index 50627049..6aa0a866 100644 --- a/foyer-storage/src/storage/noop.rs +++ b/foyer-storage/src/storage/noop.rs @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt::Debug, future::Future, marker::PhantomData, sync::Arc}; - use foyer_common::code::{StorageKey, StorageValue}; use foyer_memory::Piece; -use futures::future::ready; +use std::future::ready; +use std::{fmt::Debug, future::Future, marker::PhantomData, sync::Arc}; use crate::{device::monitor::DeviceStats, error::Result, storage::Storage}; diff --git a/foyer/Cargo.toml b/foyer/Cargo.toml index 8e716d92..c96d3fe4 100644 --- a/foyer/Cargo.toml +++ b/foyer/Cargo.toml @@ -24,7 +24,6 @@ fastrace = { workspace = true } foyer-common = { workspace = true } foyer-memory = { workspace = true } foyer-storage = { workspace = true } -futures = "0.3" mixtrics = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/foyer/src/hybrid/cache.rs b/foyer/src/hybrid/cache.rs index e173da34..42ce8d86 100644 --- a/foyer/src/hybrid/cache.rs +++ b/foyer/src/hybrid/cache.rs @@ -34,7 +34,6 @@ use foyer_common::{ }; use foyer_memory::{Cache, CacheEntry, CacheHint, Fetch, FetchMark, FetchState, Piece, Pipe}; use foyer_storage::{DeviceStats, Store}; -use futures::FutureExt; use tokio::sync::oneshot; use super::writer::HybridCacheStorageWriter; @@ -494,18 +493,15 @@ where metrics.hybrid_miss.increase(1); metrics.hybrid_miss_duration.record(now.elapsed().as_secs_f64()); - runtime - .user() - .spawn( - future - .map(|res| Diversion { - target: res, - store: Some(FetchMark), - }) - .in_span(Span::enter_with_local_parent("foyer::hybrid::fetch::fn")), - ) - .await - .unwrap() + let fut = async move { + Diversion { + target: future.await, + store: Some(FetchMark), + } + } + .in_span(Span::enter_with_local_parent("foyer::hybrid::fetch::fn")); + + runtime.user().spawn(fut).await.unwrap() } }, self.storage().runtime().read(),