diff --git a/examples/custom_client_tls.rs b/examples/custom_client_tls.rs index 659772ebf..5b271e04f 100644 --- a/examples/custom_client_tls.rs +++ b/examples/custom_client_tls.rs @@ -20,12 +20,14 @@ async fn main() -> anyhow::Result<()> { let https = config.openssl_https_connector()?; let service = ServiceBuilder::new() .layer(config.base_uri_layer()) + .option_layer(config.auth_layer()?) .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https)); Client::new(service, config.default_namespace) } else { let https = config.rustls_https_connector()?; let service = ServiceBuilder::new() .layer(config.base_uri_layer()) + .option_layer(config.auth_layer()?) .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https)); Client::new(service, config.default_namespace) }; diff --git a/examples/custom_client_trace.rs b/examples/custom_client_trace.rs index 573e6750e..866a5dbd3 100644 --- a/examples/custom_client_trace.rs +++ b/examples/custom_client_trace.rs @@ -26,6 +26,7 @@ async fn main() -> anyhow::Result<()> { .layer(tower::limit::ConcurrencyLimitLayer::new(4)) // Add `DecompressionLayer` to make request headers interesting. .layer(DecompressionLayer::new()) + .option_layer(config.auth_layer()?) .layer( // Attribute names follow [Semantic Conventions]. // [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-client diff --git a/kube-client/Cargo.toml b/kube-client/Cargo.toml index b8cf5e0fc..3007cbdd6 100644 --- a/kube-client/Cargo.toml +++ b/kube-client/Cargo.toml @@ -20,7 +20,7 @@ kubelet-debug = ["ws", "kube-core/kubelet-debug"] oauth = ["client", "tame-oauth"] oidc = ["client", "form_urlencoded"] gzip = ["client", "tower-http/decompression-gzip"] -client = ["config", "__non_core", "hyper", "hyper-util", "http-body", "http-body-util", "tower", "tower-http", "hyper-timeout", "pin-project", "chrono", "jsonpath-rust", "bytes", "futures", "tokio", "tokio-util", "either"] +client = ["config", "__non_core", "hyper", "hyper-util", "http-body", "http-body-util", "tower", "tower-http", "hyper-timeout", "chrono", "jsonpath-rust", "bytes", "futures", "tokio", "tokio-util", "either"] jsonpatch = ["kube-core/jsonpatch"] admission = ["kube-core/admission"] config = ["__non_core", "pem", "home"] @@ -69,7 +69,6 @@ tower = { workspace = true, features = ["buffer", "filter", "util"], optional = tower-http = { workspace = true, features = ["auth", "map-response-body", "trace"], optional = true } hyper-timeout = { workspace = true, optional = true } tame-oauth = { workspace = true, features = ["gcp"], optional = true } -pin-project = { workspace = true, optional = true } rand = { workspace = true, optional = true } secrecy = { workspace = true, features = ["alloc", "serde"] } tracing = { workspace = true, features = ["log"], optional = true } diff --git a/kube-client/src/client/body.rs b/kube-client/src/client/body.rs index c3cdb2dc3..521912258 100644 --- a/kube-client/src/client/body.rs +++ b/kube-client/src/client/body.rs @@ -6,10 +6,9 @@ use std::{ }; use bytes::Bytes; -use futures::stream::Stream; -use http_body::{Body as HttpBody, Frame}; -use http_body_util::{combinators::UnsyncBoxBody, BodyExt}; -use pin_project::pin_project; +use futures::{stream::Stream, TryStreamExt}; +use http_body::{Body as HttpBody, Frame, SizeHint}; +use http_body_util::{combinators::UnsyncBoxBody, BodyExt, BodyStream}; /// A request body. pub struct Body { @@ -49,6 +48,12 @@ impl Body { pub async fn collect_bytes(self) -> Result { Ok(::collect(self).await?.to_bytes()) } + + pub(crate) fn into_data_stream( + self, + ) -> impl Stream::Data, ::Error>> { + Box::pin(BodyStream::new(self).try_filter_map(|frame| async { Ok(frame.into_data().ok()) })) + } } impl From for Body { @@ -89,50 +94,20 @@ impl HttpBody for Body { ), } } -} - -// Wrap `http_body::Body` to implement `Stream`. -#[pin_project] -pub struct BodyDataStream { - #[pin] - body: B, -} - -impl BodyDataStream { - pub(crate) fn new(body: B) -> Self { - Self { body } - } -} -impl Stream for BodyDataStream -where - B: HttpBody, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - return match ready!(self.as_mut().project().body.poll_frame(cx)) { - Some(Ok(frame)) => { - let Ok(bytes) = frame.into_data() else { - continue; - }; - Poll::Ready(Some(Ok(bytes))) - } - Some(Err(err)) => Poll::Ready(Some(Err(err))), - None => Poll::Ready(None), - }; + fn size_hint(&self) -> SizeHint { + match &self.kind { + Kind::Once(Some(bytes)) => SizeHint::with_exact(bytes.len() as u64), + Kind::Once(None) => SizeHint::with_exact(0), + Kind::Wrap(body) => body.size_hint(), } } -} -pub trait IntoBodyDataStream: HttpBody { - fn into_stream(self) -> BodyDataStream - where - Self: Sized, - { - BodyDataStream::new(self) + fn is_end_stream(&self) -> bool { + match &self.kind { + Kind::Once(Some(bytes)) => bytes.is_empty(), + Kind::Once(None) => true, + Kind::Wrap(body) => body.is_end_stream(), + } } } - -impl IntoBodyDataStream for T where T: HttpBody {} diff --git a/kube-client/src/client/mod.rs b/kube-client/src/client/mod.rs index d17da9679..300a3bb81 100644 --- a/kube-client/src/client/mod.rs +++ b/kube-client/src/client/mod.rs @@ -31,8 +31,6 @@ use crate::{api::WatchEvent, error::ErrorResponse, Config, Error, Result}; mod auth; mod body; mod builder; -// Add `into_stream()` to `http::Body` -use body::IntoBodyDataStream as _; #[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))] #[cfg(feature = "unstable-client")] mod client_ext; @@ -271,10 +269,7 @@ impl Client { let res = handle_api_errors(res).await?; // Map the error, since we want to convert this into an `AsyncBufReader` using // `into_async_read` which specifies `std::io::Error` as the stream's error type. - let body = BodyExt::map_err(res.into_body(), |e| { - std::io::Error::new(std::io::ErrorKind::Other, e) - }) - .into_stream(); + let body = res.into_body().into_data_stream().map_err(std::io::Error::other); Ok(body.into_async_read()) } @@ -314,17 +309,14 @@ impl Client { tracing::trace!("headers: {:?}", res.headers()); let frames = FramedRead::new( - StreamReader::new( - BodyExt::map_err(res.into_body(), |e| { - // Unexpected EOF from chunked decoder. - // Tends to happen when watching for 300+s. This will be ignored. - if e.to_string().contains("unexpected EOF during chunk") { - return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e); - } - std::io::Error::new(std::io::ErrorKind::Other, e) - }) - .into_stream(), - ), + StreamReader::new(res.into_body().into_data_stream().map_err(|e| { + // Unexpected EOF from chunked decoder. + // Tends to happen when watching for 300+s. This will be ignored. + if e.to_string().contains("unexpected EOF during chunk") { + return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e); + } + std::io::Error::other(e) + })), LinesCodec::new(), );