Skip to content

Commit

Permalink
refactor: remove BodyDataStream (#1455)
Browse files Browse the repository at this point in the history
Signed-off-by: tottoto <[email protected]>
  • Loading branch information
tottoto authored Apr 1, 2024
1 parent 156a6e6 commit 05d115b
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 47 deletions.
3 changes: 1 addition & 2 deletions kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ kubelet-debug = ["ws", "kube-core/kubelet-debug"]
oauth = ["client", "tame-oauth"]
oidc = ["client", "form_urlencoded"]
gzip = ["client", "tower-http/decompression-gzip"]
client = ["config", "__non_core", "hyper", "hyper-util", "http-body", "http-body-util", "tower", "tower-http", "hyper-timeout", "pin-project", "chrono", "jsonpath-rust", "bytes", "futures", "tokio", "tokio-util", "either"]
client = ["config", "__non_core", "hyper", "hyper-util", "http-body", "http-body-util", "tower", "tower-http", "hyper-timeout", "chrono", "jsonpath-rust", "bytes", "futures", "tokio", "tokio-util", "either"]
jsonpatch = ["kube-core/jsonpatch"]
admission = ["kube-core/admission"]
config = ["__non_core", "pem", "home"]
Expand Down Expand Up @@ -69,7 +69,6 @@ tower = { workspace = true, features = ["buffer", "filter", "util"], optional =
tower-http = { workspace = true, features = ["auth", "map-response-body", "trace"], optional = true }
hyper-timeout = { workspace = true, optional = true }
tame-oauth = { workspace = true, features = ["gcp"], optional = true }
pin-project = { workspace = true, optional = true }
rand = { workspace = true, optional = true }
secrecy = { workspace = true, features = ["alloc", "serde"] }
tracing = { workspace = true, features = ["log"], optional = true }
Expand Down
49 changes: 6 additions & 43 deletions kube-client/src/client/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use std::{
};

use bytes::Bytes;
use futures::stream::Stream;
use futures::{stream::Stream, TryStreamExt};
use http_body::{Body as HttpBody, Frame, SizeHint};
use http_body_util::{combinators::UnsyncBoxBody, BodyExt};
use pin_project::pin_project;
use http_body_util::{combinators::UnsyncBoxBody, BodyExt, BodyStream};

/// A request body.
pub struct Body {
Expand Down Expand Up @@ -45,11 +44,10 @@ impl Body {
Body::new(Kind::Wrap(body.map_err(Into::into).boxed_unsync()))
}

pub(crate) fn into_stream(self) -> BodyDataStream<Self>
where
Self: Sized,
{
BodyDataStream::new(self)
pub(crate) fn into_data_stream(
self,
) -> impl Stream<Item = Result<<Self as HttpBody>::Data, <Self as HttpBody>::Error>> {
Box::pin(BodyStream::new(self).try_filter_map(|frame| async { Ok(frame.into_data().ok()) }))
}
}

Expand Down Expand Up @@ -108,38 +106,3 @@ impl HttpBody for Body {
}
}
}

// Wrap `http_body::Body` to implement `Stream`.
#[pin_project]
pub struct BodyDataStream<B> {
#[pin]
body: B,
}

impl<B> BodyDataStream<B> {
pub(crate) fn new(body: B) -> Self {
Self { body }
}
}

impl<B> Stream for BodyDataStream<B>
where
B: HttpBody<Data = Bytes>,
{
type Item = Result<B::Data, B::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().body.poll_frame(cx)) {
Some(Ok(frame)) => {
let Ok(bytes) = frame.into_data() else {
continue;
};
Poll::Ready(Some(Ok(bytes)))
}
Some(Err(err)) => Poll::Ready(Some(Err(err))),
None => Poll::Ready(None),
};
}
}
}
4 changes: 2 additions & 2 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl Client {
let res = handle_api_errors(res).await?;
// Map the error, since we want to convert this into an `AsyncBufReader` using
// `into_async_read` which specifies `std::io::Error` as the stream's error type.
let body = res.into_body().into_stream().map_err(std::io::Error::other);
let body = res.into_body().into_data_stream().map_err(std::io::Error::other);
Ok(body.into_async_read())
}

Expand Down Expand Up @@ -309,7 +309,7 @@ impl Client {
tracing::trace!("headers: {:?}", res.headers());

let frames = FramedRead::new(
StreamReader::new(res.into_body().into_stream().map_err(|e| {
StreamReader::new(res.into_body().into_data_stream().map_err(|e| {
// Unexpected EOF from chunked decoder.
// Tends to happen when watching for 300+s. This will be ignored.
if e.to_string().contains("unexpected EOF during chunk") {
Expand Down

0 comments on commit 05d115b

Please sign in to comment.