From 67a70c5d11cc1e2a0a4c3e76e97161176547c426 Mon Sep 17 00:00:00 2001 From: Hanna Kruppe Date: Tue, 13 May 2025 01:02:27 +0200 Subject: [PATCH] refactor: limit dependency futures-util to tests and client-legacy Enable non-trivial applications based on hyper + tokio without pulling in this heavyweight dependency. Keep MSRV 1.63 by using `ready!` from `futures_core` and polyfilling `poll_fn`. Don't remove futures-util from the `client-legacy` code, because this would require re-implementing several non-trivial combinators. --- Cargo.toml | 8 +++--- src/client/legacy/client.rs | 5 ++-- src/client/legacy/connect/dns.rs | 2 +- src/client/legacy/connect/http.rs | 3 ++- src/client/legacy/connect/proxy/tunnel.rs | 4 +-- src/client/legacy/pool.rs | 2 +- src/common/future.rs | 30 +++++++++++++++++++++++ src/common/mod.rs | 2 ++ src/rt/io.rs | 9 ++++--- src/server/conn/auto/mod.rs | 2 +- src/service/oneshot.rs | 2 +- 11 files changed, 53 insertions(+), 16 deletions(-) create mode 100644 src/common/future.rs diff --git a/Cargo.toml b/Cargo.toml index bc639c06..747fbedf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,8 @@ rustdoc-args = ["--cfg", "docsrs"] base64 = { version = "0.22", optional = true } bytes = "1.7.1" futures-channel = { version = "0.3", optional = true } -futures-util = { version = "0.3.16", default-features = false } +futures-core = { version = "0.3" } +futures-util = { version = "0.3.16", default-features = false, optional = true } http = "1.0" http-body = "1.0.0" hyper = "1.6.0" @@ -37,6 +38,7 @@ tower-service = { version = "0.3", optional = true } [dev-dependencies] hyper = { version = "1.4.0", features = ["full"] } bytes = "1" +futures-util = { version = "0.3.16", default-features = false, features = ["alloc"] } http-body-util = "0.1.0" tokio = { version = "1", features = ["macros", "test-util", "signal"] } tokio-test = "0.4" @@ -69,13 +71,13 @@ full = [ ] client = ["hyper/client", "dep:tracing", "dep:futures-channel", "dep:tower-service"] -client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc"] +client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"] client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"] client-proxy-system = ["dep:system-configuration", "dep:windows-registry"] server = ["hyper/server"] server-auto = ["server", "http1", "http2"] -server-graceful = ["server", "tokio/sync", "futures-util/alloc"] +server-graceful = ["server", "tokio/sync"] service = ["dep:tower-service"] diff --git a/src/client/legacy/client.rs b/src/client/legacy/client.rs index 0c6960e3..67cd2221 100644 --- a/src/client/legacy/client.rs +++ b/src/client/legacy/client.rs @@ -25,6 +25,7 @@ use super::connect::HttpConnector; use super::connect::{Alpn, Connect, Connected, Connection}; use super::pool::{self, Ver}; +use crate::common::future::poll_fn; use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper}; type BoxSendFuture = Pin + Send>>; @@ -360,7 +361,7 @@ where } else if !res.body().is_end_stream() { //let (delayed_tx, delayed_rx) = oneshot::channel::<()>(); //res.body_mut().delayed_eof(delayed_rx); - let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| { + let on_idle = poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| { // At this point, `pooled` is dropped, and had a chance // to insert into the pool (if conn was idle) //drop(delayed_tx); @@ -370,7 +371,7 @@ where } else { // There's no body to delay, but the connection isn't // ready yet. Only re-insert when it's ready - let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ()); + let on_idle = poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ()); self.exec.execute(on_idle); } diff --git a/src/client/legacy/connect/dns.rs b/src/client/legacy/connect/dns.rs index fecb9963..7d88f768 100644 --- a/src/client/legacy/connect/dns.rs +++ b/src/client/legacy/connect/dns.rs @@ -291,7 +291,7 @@ pub(super) async fn resolve(resolver: &mut R, name: Name) -> Result; fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - futures_util::ready!(self.resolver.poll_ready(cx)).map_err(ConnectError::dns)?; + ready!(self.resolver.poll_ready(cx)).map_err(ConnectError::dns)?; Poll::Ready(Ok(())) } diff --git a/src/client/legacy/connect/proxy/tunnel.rs b/src/client/legacy/connect/proxy/tunnel.rs index 4f8c5153..ad948596 100644 --- a/src/client/legacy/connect/proxy/tunnel.rs +++ b/src/client/legacy/connect/proxy/tunnel.rs @@ -4,6 +4,7 @@ use std::marker::{PhantomData, Unpin}; use std::pin::Pin; use std::task::{self, Poll}; +use futures_core::ready; use http::{HeaderMap, HeaderValue, Uri}; use hyper::rt::{Read, Write}; use pin_project_lite::pin_project; @@ -127,8 +128,7 @@ where type Future = Tunneling; fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - futures_util::ready!(self.inner.poll_ready(cx)) - .map_err(|e| TunnelError::ConnectFailed(e.into()))?; + ready!(self.inner.poll_ready(cx)).map_err(|e| TunnelError::ConnectFailed(e.into()))?; Poll::Ready(Ok(())) } diff --git a/src/client/legacy/pool.rs b/src/client/legacy/pool.rs index c57b7ff9..d7f5d20c 100644 --- a/src/client/legacy/pool.rs +++ b/src/client/legacy/pool.rs @@ -14,7 +14,7 @@ use std::task::{self, Poll}; use std::time::{Duration, Instant}; use futures_channel::oneshot; -use futures_util::ready; +use futures_core::ready; use tracing::{debug, trace}; use hyper::rt::Sleep; diff --git a/src/common/future.rs b/src/common/future.rs new file mode 100644 index 00000000..47897f24 --- /dev/null +++ b/src/common/future.rs @@ -0,0 +1,30 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +// TODO: replace with `std::future::poll_fn` once MSRV >= 1.64 +pub(crate) fn poll_fn(f: F) -> PollFn +where + F: FnMut(&mut Context<'_>) -> Poll, +{ + PollFn { f } +} + +pub(crate) struct PollFn { + f: F, +} + +impl Unpin for PollFn {} + +impl Future for PollFn +where + F: FnMut(&mut Context<'_>) -> Poll, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + (self.f)(cx) + } +} diff --git a/src/common/mod.rs b/src/common/mod.rs index 63b82885..b45cd0b2 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -15,3 +15,5 @@ pub(crate) use exec::Exec; pub(crate) use lazy::{lazy, Started as Lazy}; #[cfg(feature = "client")] pub(crate) use sync::SyncWrapper; + +pub(crate) mod future; diff --git a/src/rt/io.rs b/src/rt/io.rs index 0ce3ea9d..888756f6 100644 --- a/src/rt/io.rs +++ b/src/rt/io.rs @@ -2,15 +2,16 @@ use std::marker::Unpin; use std::pin::Pin; use std::task::Poll; -use futures_util::future; -use futures_util::ready; +use futures_core::ready; use hyper::rt::{Read, ReadBuf, Write}; +use crate::common::future::poll_fn; + pub(crate) async fn read(io: &mut T, buf: &mut [u8]) -> Result where T: Read + Unpin, { - future::poll_fn(move |cx| { + poll_fn(move |cx| { let mut buf = ReadBuf::new(buf); ready!(Pin::new(&mut *io).poll_read(cx, buf.unfilled()))?; Poll::Ready(Ok(buf.filled().len())) @@ -23,7 +24,7 @@ where T: Write + Unpin, { let mut n = 0; - future::poll_fn(move |cx| { + poll_fn(move |cx| { while n < buf.len() { n += ready!(Pin::new(&mut *io).poll_write(cx, &buf[n..])?); } diff --git a/src/server/conn/auto/mod.rs b/src/server/conn/auto/mod.rs index 023e9f1f..b2fc6556 100644 --- a/src/server/conn/auto/mod.rs +++ b/src/server/conn/auto/mod.rs @@ -2,7 +2,6 @@ pub mod upgrade; -use futures_util::ready; use hyper::service::HttpService; use std::future::Future; use std::marker::PhantomPinned; @@ -12,6 +11,7 @@ use std::task::{Context, Poll}; use std::{error::Error as StdError, io, time::Duration}; use bytes::Bytes; +use futures_core::ready; use http::{Request, Response}; use http_body::Body; use hyper::{ diff --git a/src/service/oneshot.rs b/src/service/oneshot.rs index 0c520772..4584304d 100644 --- a/src/service/oneshot.rs +++ b/src/service/oneshot.rs @@ -1,4 +1,4 @@ -use futures_util::ready; +use futures_core::ready; use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin;