Skip to content

Commit

Permalink
Merge branch 'main' into expose-body-collect
Browse files Browse the repository at this point in the history
Signed-off-by: Eirik A <[email protected]>
  • Loading branch information
clux authored Apr 1, 2024
2 parents 21044ef + 05d115b commit 57493c2
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 64 deletions.
2 changes: 2 additions & 0 deletions examples/custom_client_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ async fn main() -> anyhow::Result<()> {
let https = config.openssl_https_connector()?;
let service = ServiceBuilder::new()
.layer(config.base_uri_layer())
.option_layer(config.auth_layer()?)
.service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
Client::new(service, config.default_namespace)
} else {
let https = config.rustls_https_connector()?;
let service = ServiceBuilder::new()
.layer(config.base_uri_layer())
.option_layer(config.auth_layer()?)
.service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
Client::new(service, config.default_namespace)
};
Expand Down
1 change: 1 addition & 0 deletions examples/custom_client_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async fn main() -> anyhow::Result<()> {
.layer(tower::limit::ConcurrencyLimitLayer::new(4))
// Add `DecompressionLayer` to make request headers interesting.
.layer(DecompressionLayer::new())
.option_layer(config.auth_layer()?)
.layer(
// Attribute names follow [Semantic Conventions].
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-client
Expand Down
3 changes: 1 addition & 2 deletions kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ kubelet-debug = ["ws", "kube-core/kubelet-debug"]
oauth = ["client", "tame-oauth"]
oidc = ["client", "form_urlencoded"]
gzip = ["client", "tower-http/decompression-gzip"]
client = ["config", "__non_core", "hyper", "hyper-util", "http-body", "http-body-util", "tower", "tower-http", "hyper-timeout", "pin-project", "chrono", "jsonpath-rust", "bytes", "futures", "tokio", "tokio-util", "either"]
client = ["config", "__non_core", "hyper", "hyper-util", "http-body", "http-body-util", "tower", "tower-http", "hyper-timeout", "chrono", "jsonpath-rust", "bytes", "futures", "tokio", "tokio-util", "either"]
jsonpatch = ["kube-core/jsonpatch"]
admission = ["kube-core/admission"]
config = ["__non_core", "pem", "home"]
Expand Down Expand Up @@ -69,7 +69,6 @@ tower = { workspace = true, features = ["buffer", "filter", "util"], optional =
tower-http = { workspace = true, features = ["auth", "map-response-body", "trace"], optional = true }
hyper-timeout = { workspace = true, optional = true }
tame-oauth = { workspace = true, features = ["gcp"], optional = true }
pin-project = { workspace = true, optional = true }
rand = { workspace = true, optional = true }
secrecy = { workspace = true, features = ["alloc", "serde"] }
tracing = { workspace = true, features = ["log"], optional = true }
Expand Down
65 changes: 20 additions & 45 deletions kube-client/src/client/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use std::{
};

use bytes::Bytes;
use futures::stream::Stream;
use http_body::{Body as HttpBody, Frame};
use http_body_util::{combinators::UnsyncBoxBody, BodyExt};
use pin_project::pin_project;
use futures::{stream::Stream, TryStreamExt};
use http_body::{Body as HttpBody, Frame, SizeHint};
use http_body_util::{combinators::UnsyncBoxBody, BodyExt, BodyStream};

/// A request body.
pub struct Body {
Expand Down Expand Up @@ -49,6 +48,12 @@ impl Body {
pub async fn collect_bytes(self) -> Result<Bytes, crate::Error> {

Check warning on line 48 in kube-client/src/client/body.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/body.rs#L48

Added line #L48 was not covered by tests
Ok(<Self as BodyExt>::collect(self).await?.to_bytes())
}

pub(crate) fn into_data_stream(
self,
) -> impl Stream<Item = Result<<Self as HttpBody>::Data, <Self as HttpBody>::Error>> {
Box::pin(BodyStream::new(self).try_filter_map(|frame| async { Ok(frame.into_data().ok()) }))
}
}

impl From<Bytes> for Body {
Expand Down Expand Up @@ -89,50 +94,20 @@ impl HttpBody for Body {
),
}
}
}

// Wrap `http_body::Body` to implement `Stream`.
#[pin_project]
pub struct BodyDataStream<B> {
#[pin]
body: B,
}

impl<B> BodyDataStream<B> {
pub(crate) fn new(body: B) -> Self {
Self { body }
}
}

impl<B> Stream for BodyDataStream<B>
where
B: HttpBody<Data = Bytes>,
{
type Item = Result<B::Data, B::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().body.poll_frame(cx)) {
Some(Ok(frame)) => {
let Ok(bytes) = frame.into_data() else {
continue;
};
Poll::Ready(Some(Ok(bytes)))
}
Some(Err(err)) => Poll::Ready(Some(Err(err))),
None => Poll::Ready(None),
};
fn size_hint(&self) -> SizeHint {
match &self.kind {
Kind::Once(Some(bytes)) => SizeHint::with_exact(bytes.len() as u64),
Kind::Once(None) => SizeHint::with_exact(0),
Kind::Wrap(body) => body.size_hint(),
}
}
}

pub trait IntoBodyDataStream: HttpBody {
fn into_stream(self) -> BodyDataStream<Self>
where
Self: Sized,
{
BodyDataStream::new(self)
fn is_end_stream(&self) -> bool {
match &self.kind {
Kind::Once(Some(bytes)) => bytes.is_empty(),
Kind::Once(None) => true,
Kind::Wrap(body) => body.is_end_stream(),
}
}
}

impl<T> IntoBodyDataStream for T where T: HttpBody {}
26 changes: 9 additions & 17 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use crate::{api::WatchEvent, error::ErrorResponse, Config, Error, Result};
mod auth;
mod body;
mod builder;
// Add `into_stream()` to `http::Body`
use body::IntoBodyDataStream as _;
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
#[cfg(feature = "unstable-client")]
mod client_ext;
Expand Down Expand Up @@ -271,10 +269,7 @@ impl Client {
let res = handle_api_errors(res).await?;
// Map the error, since we want to convert this into an `AsyncBufReader` using
// `into_async_read` which specifies `std::io::Error` as the stream's error type.
let body = BodyExt::map_err(res.into_body(), |e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})
.into_stream();
let body = res.into_body().into_data_stream().map_err(std::io::Error::other);
Ok(body.into_async_read())
}

Expand Down Expand Up @@ -314,17 +309,14 @@ impl Client {
tracing::trace!("headers: {:?}", res.headers());

let frames = FramedRead::new(
StreamReader::new(
BodyExt::map_err(res.into_body(), |e| {
// Unexpected EOF from chunked decoder.
// Tends to happen when watching for 300+s. This will be ignored.
if e.to_string().contains("unexpected EOF during chunk") {
return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
}
std::io::Error::new(std::io::ErrorKind::Other, e)
})
.into_stream(),
),
StreamReader::new(res.into_body().into_data_stream().map_err(|e| {
// Unexpected EOF from chunked decoder.
// Tends to happen when watching for 300+s. This will be ignored.
if e.to_string().contains("unexpected EOF during chunk") {
return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
}
std::io::Error::other(e)
})),
LinesCodec::new(),
);

Expand Down

0 comments on commit 57493c2

Please sign in to comment.