Skip to content

Commit

Permalink
comment out some stuff and relax signatures
Browse files Browse the repository at this point in the history
trying to focus on the streaming part first

Signed-off-by: clux <[email protected]>
  • Loading branch information
clux committed Jan 24, 2024
1 parent ea86d81 commit d58138a
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 26 deletions.
8 changes: 4 additions & 4 deletions kube-client/src/client/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use tower::{filter::AsyncPredicate, BoxError};

use crate::config::{AuthInfo, AuthProviderConfig, ExecAuthCluster, ExecConfig, ExecInteractiveMode};

#[cfg(feature = "oauth")] mod oauth;
#[cfg(feature = "oauth")] pub use oauth::Error as OAuthError;
#[cfg(feature = "oidc")] mod oidc;
#[cfg(feature = "oidc")] pub use oidc::errors as oidc_errors;
//#[cfg(feature = "oauth")] mod oauth;
//#[cfg(feature = "oauth")] pub use oauth::Error as OAuthError;
//#[cfg(feature = "oidc")] mod oidc;
//#[cfg(feature = "oidc")] pub use oidc::errors as oidc_errors;
#[cfg(target_os = "windows")] use std::os::windows::process::CommandExt;

#[derive(Error, Debug)]
Expand Down
56 changes: 36 additions & 20 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@
//!
//! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically
//! retrieve the resources served by the kubernetes API.
#![allow(unused_imports)] // TODO: remove
use either::{Either, Left, Right};
use futures::{self, AsyncBufRead, StreamExt, TryStream, TryStreamExt};
use http::{self, Request, Response, StatusCode};
use http_body::Body;
use http_body_util::{BodyExt, BodyStream, Full};
use hyper::body::{Bytes, Incoming};
use http_body_util::{
combinators::{BoxBody, UnsyncBoxBody},
BodyExt, BodyStream, Full,
};
use hyper::body::Incoming;
type Bytes = Vec<u8>; // we use Vec<u8> internally everywhere - maybe try to move to Bytes..
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
pub use kube_core::response::Status;
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -67,7 +72,10 @@ pub use builder::{ClientBuilder, DynBody};
pub struct Client {
// - `Buffer` for cheap clone
// - `BoxService` for dynamic response future type
inner: Buffer<BoxService<Request<Bytes>, Response<BodyStream<Bytes>>, BoxError>, Request<Bytes>>,
inner: Buffer<
BoxService<Request<Bytes>, Response<BodyStream<BoxBody<Bytes, BoxError>>>, BoxError>,
Request<Bytes>,
>,
default_ns: String,
}

Expand Down Expand Up @@ -99,17 +107,17 @@ impl Client {
/// # Ok(())
/// # }
/// ```
pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
pub fn new<S, /*B,*/ T>(service: S, default_namespace: T) -> Self
where
S: Service<Request<Bytes>, Response = Response<B>> + Send + 'static,
S: Service<Request<Bytes>, Response = Response<BoxBody<Bytes, BoxError>>> + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
//B: http_body::Body<Data = Bytes> + Send + 'static,
//B::Error: Into<BoxError>,
T: Into<String>,
{
// Transform response body to `hyper::Body` and use type erased error to avoid type parameters.
let service = MapResponseBodyLayer::new(BodyStream::new)
let service = MapResponseBodyLayer::new(|x| BodyStream::new(x))
.layer(service)
.map_err(|e| e.into());
Self {
Expand Down Expand Up @@ -143,7 +151,10 @@ impl Client {
/// Perform a raw HTTP request against the API and return the raw response back.
/// This method can be used to get raw access to the API which may be used to, for example,
/// create a proxy server or application-level gateway between localhost and the API server.
pub async fn send(&self, request: Request<Bytes>) -> Result<Response<BodyStream<Bytes>>> {
pub async fn send(
&self,
request: Request<Bytes>,
) -> Result<Response<BodyStream<BoxBody<Bytes, BoxError>>>> {
let mut svc = self.inner.clone();
let res = svc
.ready()
Expand All @@ -163,12 +174,13 @@ impl Client {
Ok(res)
}

/*
/// Make WebSocket connection.
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub async fn connect(
&self,
request: Request<Vec<u8>>,
request: Request<Bytes>,
) -> Result<WebSocketStream<hyper::upgrade::Upgraded>> {
use http::header::HeaderValue;
let (mut parts, body) = request.into_parts();
Expand Down Expand Up @@ -209,10 +221,11 @@ impl Client {
)),
}
}
*/

/// Perform a raw HTTP request against the API and deserialize the response
/// as JSON to some known type.
pub async fn request<T>(&self, request: Request<Vec<u8>>) -> Result<T>
pub async fn request<T>(&self, request: Request<Bytes>) -> Result<T>
where
T: DeserializeOwned,
{
Expand All @@ -226,10 +239,13 @@ impl Client {

/// Perform a raw HTTP request against the API and get back the response
/// as a string
pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
pub async fn request_text(&self, request: Request<Bytes>) -> Result<String> {
let res = self.send(request.into()).await?;
let status = res.status();
// trace!("Status = {:?} for {}", status, res.url());
// TODO: we have a BodyStream that does not implement Body
// so we cannot use collect on it? then what's the point of it!?
//..maybe BodyStream layer is a bad idea in genreal?
let body_bytes = res.into_body().collect().await.unwrap(); // Infallible
let text = String::from_utf8(body_bytes.to_bytes()).map_err(Error::FromUtf8)?;
handle_api_errors(&text, status)?;
Expand All @@ -241,7 +257,7 @@ impl Client {
///
/// The response can be processed using [`AsyncReadExt`](futures::AsyncReadExt)
/// and [`AsyncBufReadExt`](futures::AsyncBufReadExt).
pub async fn request_stream(&self, request: Request<Incoming>) -> Result<impl AsyncBufRead> {
pub async fn request_stream(&self, request: Request<Bytes>) -> Result<impl AsyncBufRead> {
let res = self.send(request).await?;
// Map the error, since we want to convert this into an `AsyncBufReader` using
// `into_async_read` which specifies `std::io::Error` as the stream's error type.
Expand All @@ -253,7 +269,7 @@ impl Client {

/// Perform a raw HTTP request against the API and get back either an object
/// deserialized as JSON or a [`Status`] Object.
pub async fn request_status<T>(&self, request: Request<Vec<u8>>) -> Result<Either<T, Status>>
pub async fn request_status<T>(&self, request: Request<Bytes>) -> Result<Either<T, Status>>
where
T: DeserializeOwned,
{
Expand All @@ -277,12 +293,12 @@ impl Client {
/// Perform a raw request and get back a stream of [`WatchEvent`] objects
pub async fn request_events<T>(
&self,
request: Request<Vec<u8>>,
request: Request<Bytes>,
) -> Result<impl TryStream<Item = Result<WatchEvent<T>>>>
where
T: Clone + DeserializeOwned,
{
let res = self.send(request.map(Body::from)).await?;
let res = self.send(request).await?;
// trace!("Streaming from {} -> {}", res.url(), res.status().as_str());
tracing::trace!("headers: {:?}", res.headers());

Expand Down Expand Up @@ -357,7 +373,7 @@ impl Client {
self.request(
Request::builder()
.uri("/version")
.body(vec![])
.body(Bytes::new())
.map_err(Error::HttpError)?,
)
.await
Expand All @@ -368,7 +384,7 @@ impl Client {
self.request(
Request::builder()
.uri("/apis")
.body(vec![])
.body(Bytes::new())
.map_err(Error::HttpError)?,
)
.await
Expand Down Expand Up @@ -397,7 +413,7 @@ impl Client {
self.request(
Request::builder()
.uri(url)
.body(vec![])
.body(Bytes::new())
.map_err(Error::HttpError)?,
)
.await
Expand All @@ -408,7 +424,7 @@ impl Client {
self.request(
Request::builder()
.uri("/api")
.body(vec![])
.body(Bytes::new())
.map_err(Error::HttpError)?,
)
.await
Expand Down
6 changes: 4 additions & 2 deletions kube-client/src/client/upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![allow(unused_imports)] // TODO: remove
use http::{self, Response, StatusCode};
use hyper::Body;
use http_body_util::{combinators::BoxBody, BodyStream};
type VerifyBody = BoxBody<Vec<u8>, tower::BoxError>;
use thiserror::Error;
use tokio_tungstenite::tungstenite as ws;

Expand Down Expand Up @@ -41,7 +43,7 @@ pub enum UpgradeConnectionError {

// Verify upgrade response according to RFC6455.
// Based on `tungstenite` and added subprotocol verification.
pub fn verify_response(res: &Response<Body>, key: &str) -> Result<(), UpgradeConnectionError> {
pub fn verify_response(res: &Response<VerifyBody>, key: &str) -> Result<(), UpgradeConnectionError> {
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
return Err(UpgradeConnectionError::ProtocolSwitch(res.status()));
}
Expand Down

0 comments on commit d58138a

Please sign in to comment.