From d58138aa66b0ee47a40922d6ca88e3e173ce3dca Mon Sep 17 00:00:00 2001 From: clux Date: Wed, 24 Jan 2024 10:27:33 +0000 Subject: [PATCH] comment out some stuff and relax signatures trying to focus on the streaming part first Signed-off-by: clux --- kube-client/src/client/auth/mod.rs | 8 ++--- kube-client/src/client/mod.rs | 56 +++++++++++++++++++----------- kube-client/src/client/upgrade.rs | 6 ++-- 3 files changed, 44 insertions(+), 26 deletions(-) diff --git a/kube-client/src/client/auth/mod.rs b/kube-client/src/client/auth/mod.rs index eaca2d8ea..0cf9bb446 100644 --- a/kube-client/src/client/auth/mod.rs +++ b/kube-client/src/client/auth/mod.rs @@ -19,10 +19,10 @@ use tower::{filter::AsyncPredicate, BoxError}; use crate::config::{AuthInfo, AuthProviderConfig, ExecAuthCluster, ExecConfig, ExecInteractiveMode}; -#[cfg(feature = "oauth")] mod oauth; -#[cfg(feature = "oauth")] pub use oauth::Error as OAuthError; -#[cfg(feature = "oidc")] mod oidc; -#[cfg(feature = "oidc")] pub use oidc::errors as oidc_errors; +//#[cfg(feature = "oauth")] mod oauth; +//#[cfg(feature = "oauth")] pub use oauth::Error as OAuthError; +//#[cfg(feature = "oidc")] mod oidc; +//#[cfg(feature = "oidc")] pub use oidc::errors as oidc_errors; #[cfg(target_os = "windows")] use std::os::windows::process::CommandExt; #[derive(Error, Debug)] diff --git a/kube-client/src/client/mod.rs b/kube-client/src/client/mod.rs index 556cfb24c..d75bd23cc 100644 --- a/kube-client/src/client/mod.rs +++ b/kube-client/src/client/mod.rs @@ -7,12 +7,17 @@ //! //! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically //! retrieve the resources served by the kubernetes API. +#![allow(unused_imports)] // TODO: remove use either::{Either, Left, Right}; use futures::{self, AsyncBufRead, StreamExt, TryStream, TryStreamExt}; use http::{self, Request, Response, StatusCode}; use http_body::Body; -use http_body_util::{BodyExt, BodyStream, Full}; -use hyper::body::{Bytes, Incoming}; +use http_body_util::{ + combinators::{BoxBody, UnsyncBoxBody}, + BodyExt, BodyStream, Full, +}; +use hyper::body::Incoming; +type Bytes = Vec; // we use Vec internally everywhere - maybe try to move to Bytes.. use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1; pub use kube_core::response::Status; use serde::de::DeserializeOwned; @@ -67,7 +72,10 @@ pub use builder::{ClientBuilder, DynBody}; pub struct Client { // - `Buffer` for cheap clone // - `BoxService` for dynamic response future type - inner: Buffer, Response>, BoxError>, Request>, + inner: Buffer< + BoxService, Response>>, BoxError>, + Request, + >, default_ns: String, } @@ -99,17 +107,17 @@ impl Client { /// # Ok(()) /// # } /// ``` - pub fn new(service: S, default_namespace: T) -> Self + pub fn new(service: S, default_namespace: T) -> Self where - S: Service, Response = Response> + Send + 'static, + S: Service, Response = Response>> + Send + 'static, S::Future: Send + 'static, S::Error: Into, - B: http_body::Body + Send + 'static, - B::Error: Into, + //B: http_body::Body + Send + 'static, + //B::Error: Into, T: Into, { // Transform response body to `hyper::Body` and use type erased error to avoid type parameters. - let service = MapResponseBodyLayer::new(BodyStream::new) + let service = MapResponseBodyLayer::new(|x| BodyStream::new(x)) .layer(service) .map_err(|e| e.into()); Self { @@ -143,7 +151,10 @@ impl Client { /// Perform a raw HTTP request against the API and return the raw response back. /// This method can be used to get raw access to the API which may be used to, for example, /// create a proxy server or application-level gateway between localhost and the API server. - pub async fn send(&self, request: Request) -> Result>> { + pub async fn send( + &self, + request: Request, + ) -> Result>>> { let mut svc = self.inner.clone(); let res = svc .ready() @@ -163,12 +174,13 @@ impl Client { Ok(res) } + /* /// Make WebSocket connection. #[cfg(feature = "ws")] #[cfg_attr(docsrs, doc(cfg(feature = "ws")))] pub async fn connect( &self, - request: Request>, + request: Request, ) -> Result> { use http::header::HeaderValue; let (mut parts, body) = request.into_parts(); @@ -209,10 +221,11 @@ impl Client { )), } } + */ /// Perform a raw HTTP request against the API and deserialize the response /// as JSON to some known type. - pub async fn request(&self, request: Request>) -> Result + pub async fn request(&self, request: Request) -> Result where T: DeserializeOwned, { @@ -226,10 +239,13 @@ impl Client { /// Perform a raw HTTP request against the API and get back the response /// as a string - pub async fn request_text(&self, request: Request>) -> Result { + pub async fn request_text(&self, request: Request) -> Result { let res = self.send(request.into()).await?; let status = res.status(); // trace!("Status = {:?} for {}", status, res.url()); + // TODO: we have a BodyStream that does not implement Body + // so we cannot use collect on it? then what's the point of it!? + //..maybe BodyStream layer is a bad idea in genreal? let body_bytes = res.into_body().collect().await.unwrap(); // Infallible let text = String::from_utf8(body_bytes.to_bytes()).map_err(Error::FromUtf8)?; handle_api_errors(&text, status)?; @@ -241,7 +257,7 @@ impl Client { /// /// The response can be processed using [`AsyncReadExt`](futures::AsyncReadExt) /// and [`AsyncBufReadExt`](futures::AsyncBufReadExt). - pub async fn request_stream(&self, request: Request) -> Result { + pub async fn request_stream(&self, request: Request) -> Result { let res = self.send(request).await?; // Map the error, since we want to convert this into an `AsyncBufReader` using // `into_async_read` which specifies `std::io::Error` as the stream's error type. @@ -253,7 +269,7 @@ impl Client { /// Perform a raw HTTP request against the API and get back either an object /// deserialized as JSON or a [`Status`] Object. - pub async fn request_status(&self, request: Request>) -> Result> + pub async fn request_status(&self, request: Request) -> Result> where T: DeserializeOwned, { @@ -277,12 +293,12 @@ impl Client { /// Perform a raw request and get back a stream of [`WatchEvent`] objects pub async fn request_events( &self, - request: Request>, + request: Request, ) -> Result>>> where T: Clone + DeserializeOwned, { - let res = self.send(request.map(Body::from)).await?; + let res = self.send(request).await?; // trace!("Streaming from {} -> {}", res.url(), res.status().as_str()); tracing::trace!("headers: {:?}", res.headers()); @@ -357,7 +373,7 @@ impl Client { self.request( Request::builder() .uri("/version") - .body(vec![]) + .body(Bytes::new()) .map_err(Error::HttpError)?, ) .await @@ -368,7 +384,7 @@ impl Client { self.request( Request::builder() .uri("/apis") - .body(vec![]) + .body(Bytes::new()) .map_err(Error::HttpError)?, ) .await @@ -397,7 +413,7 @@ impl Client { self.request( Request::builder() .uri(url) - .body(vec![]) + .body(Bytes::new()) .map_err(Error::HttpError)?, ) .await @@ -408,7 +424,7 @@ impl Client { self.request( Request::builder() .uri("/api") - .body(vec![]) + .body(Bytes::new()) .map_err(Error::HttpError)?, ) .await diff --git a/kube-client/src/client/upgrade.rs b/kube-client/src/client/upgrade.rs index e8fe67c5c..4ae62ef28 100644 --- a/kube-client/src/client/upgrade.rs +++ b/kube-client/src/client/upgrade.rs @@ -1,5 +1,7 @@ +#![allow(unused_imports)] // TODO: remove use http::{self, Response, StatusCode}; -use hyper::Body; +use http_body_util::{combinators::BoxBody, BodyStream}; +type VerifyBody = BoxBody, tower::BoxError>; use thiserror::Error; use tokio_tungstenite::tungstenite as ws; @@ -41,7 +43,7 @@ pub enum UpgradeConnectionError { // Verify upgrade response according to RFC6455. // Based on `tungstenite` and added subprotocol verification. -pub fn verify_response(res: &Response, key: &str) -> Result<(), UpgradeConnectionError> { +pub fn verify_response(res: &Response, key: &str) -> Result<(), UpgradeConnectionError> { if res.status() != StatusCode::SWITCHING_PROTOCOLS { return Err(UpgradeConnectionError::ProtocolSwitch(res.status())); }