From 05d115be41ae67a5097f08b52b5f9e5fd730bd42 Mon Sep 17 00:00:00 2001 From: tottoto Date: Mon, 1 Apr 2024 23:32:10 +0900 Subject: [PATCH] refactor: remove BodyDataStream (#1455) Signed-off-by: tottoto --- kube-client/Cargo.toml | 3 +-- kube-client/src/client/body.rs | 49 +++++----------------------------- kube-client/src/client/mod.rs | 4 +-- 3 files changed, 9 insertions(+), 47 deletions(-) diff --git a/kube-client/Cargo.toml b/kube-client/Cargo.toml index b8cf5e0fc..3007cbdd6 100644 --- a/kube-client/Cargo.toml +++ b/kube-client/Cargo.toml @@ -20,7 +20,7 @@ kubelet-debug = ["ws", "kube-core/kubelet-debug"] oauth = ["client", "tame-oauth"] oidc = ["client", "form_urlencoded"] gzip = ["client", "tower-http/decompression-gzip"] -client = ["config", "__non_core", "hyper", "hyper-util", "http-body", "http-body-util", "tower", "tower-http", "hyper-timeout", "pin-project", "chrono", "jsonpath-rust", "bytes", "futures", "tokio", "tokio-util", "either"] +client = ["config", "__non_core", "hyper", "hyper-util", "http-body", "http-body-util", "tower", "tower-http", "hyper-timeout", "chrono", "jsonpath-rust", "bytes", "futures", "tokio", "tokio-util", "either"] jsonpatch = ["kube-core/jsonpatch"] admission = ["kube-core/admission"] config = ["__non_core", "pem", "home"] @@ -69,7 +69,6 @@ tower = { workspace = true, features = ["buffer", "filter", "util"], optional = tower-http = { workspace = true, features = ["auth", "map-response-body", "trace"], optional = true } hyper-timeout = { workspace = true, optional = true } tame-oauth = { workspace = true, features = ["gcp"], optional = true } -pin-project = { workspace = true, optional = true } rand = { workspace = true, optional = true } secrecy = { workspace = true, features = ["alloc", "serde"] } tracing = { workspace = true, features = ["log"], optional = true } diff --git a/kube-client/src/client/body.rs b/kube-client/src/client/body.rs index bc81121d5..1a0475f33 100644 --- a/kube-client/src/client/body.rs +++ b/kube-client/src/client/body.rs @@ -6,10 +6,9 @@ use std::{ }; use bytes::Bytes; -use futures::stream::Stream; +use futures::{stream::Stream, TryStreamExt}; use http_body::{Body as HttpBody, Frame, SizeHint}; -use http_body_util::{combinators::UnsyncBoxBody, BodyExt}; -use pin_project::pin_project; +use http_body_util::{combinators::UnsyncBoxBody, BodyExt, BodyStream}; /// A request body. pub struct Body { @@ -45,11 +44,10 @@ impl Body { Body::new(Kind::Wrap(body.map_err(Into::into).boxed_unsync())) } - pub(crate) fn into_stream(self) -> BodyDataStream - where - Self: Sized, - { - BodyDataStream::new(self) + pub(crate) fn into_data_stream( + self, + ) -> impl Stream::Data, ::Error>> { + Box::pin(BodyStream::new(self).try_filter_map(|frame| async { Ok(frame.into_data().ok()) })) } } @@ -108,38 +106,3 @@ impl HttpBody for Body { } } } - -// Wrap `http_body::Body` to implement `Stream`. -#[pin_project] -pub struct BodyDataStream { - #[pin] - body: B, -} - -impl BodyDataStream { - pub(crate) fn new(body: B) -> Self { - Self { body } - } -} - -impl Stream for BodyDataStream -where - B: HttpBody, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - return match ready!(self.as_mut().project().body.poll_frame(cx)) { - Some(Ok(frame)) => { - let Ok(bytes) = frame.into_data() else { - continue; - }; - Poll::Ready(Some(Ok(bytes))) - } - Some(Err(err)) => Poll::Ready(Some(Err(err))), - None => Poll::Ready(None), - }; - } - } -} diff --git a/kube-client/src/client/mod.rs b/kube-client/src/client/mod.rs index b284816c8..300a3bb81 100644 --- a/kube-client/src/client/mod.rs +++ b/kube-client/src/client/mod.rs @@ -269,7 +269,7 @@ impl Client { let res = handle_api_errors(res).await?; // Map the error, since we want to convert this into an `AsyncBufReader` using // `into_async_read` which specifies `std::io::Error` as the stream's error type. - let body = res.into_body().into_stream().map_err(std::io::Error::other); + let body = res.into_body().into_data_stream().map_err(std::io::Error::other); Ok(body.into_async_read()) } @@ -309,7 +309,7 @@ impl Client { tracing::trace!("headers: {:?}", res.headers()); let frames = FramedRead::new( - StreamReader::new(res.into_body().into_stream().map_err(|e| { + StreamReader::new(res.into_body().into_data_stream().map_err(|e| { // Unexpected EOF from chunked decoder. // Tends to happen when watching for 300+s. This will be ignored. if e.to_string().contains("unexpected EOF during chunk") {