Skip to content

Commit

Permalink
Add opaque wrapper around Full body, use wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed Jul 11, 2024
1 parent 19ddbd8 commit 25f9213
Showing 1 changed file with 38 additions and 13 deletions.
51 changes: 38 additions & 13 deletions opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,51 @@ pub mod hyper {

use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};
use http::HeaderValue;
use http_body_util::BodyExt;
use hyper::body::Body;
use http_body_util::{BodyExt, Full};
use hyper::body::{Body as HttpBody, Frame};
use hyper_util::client::legacy::{connect::Connect, Client};
use std::error::Error;
use std::convert::Infallible;
use std::fmt::Debug;
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::Duration;
use tokio::time;

pub struct Body(Full<Bytes>);

impl HttpBody for Body {
type Data = Bytes;
type Error = Infallible;

#[inline]
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let inner_body = unsafe { self.map_unchecked_mut(|b| &mut b.0) };
inner_body.poll_frame(cx)
}

Check warning on line 131 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L125-L131

Added lines #L125 - L131 were not covered by tests

#[inline]
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}

Check warning on line 136 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L134-L136

Added lines #L134 - L136 were not covered by tests

#[inline]
fn size_hint(&self) -> hyper::body::SizeHint {
self.0.size_hint()
}

Check warning on line 141 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L139-L141

Added lines #L139 - L141 were not covered by tests
}

#[derive(Debug, Clone)]
pub struct HyperClient<C, B> {
inner: Client<C, B>,
pub struct HyperClient<C> {
inner: Client<C, Body>,
timeout: Duration,
authorization: Option<HeaderValue>,
}

impl<C, B> HyperClient<C, B> {
pub fn new_with_timeout(inner: Client<C, B>, timeout: Duration) -> Self {
impl<C> HyperClient<C> {
pub fn new_with_timeout(inner: Client<C, Body>, timeout: Duration) -> Self {

Check warning on line 152 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L152

Added line #L152 was not covered by tests
Self {
inner,
timeout,
Expand All @@ -130,7 +158,7 @@ pub mod hyper {
}

pub fn new_with_timeout_and_authorization_header(
inner: Client<C, B>,
inner: Client<C, Body>,

Check warning on line 161 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L161

Added line #L161 was not covered by tests
timeout: Duration,
authorization: HeaderValue,
) -> Self {
Expand All @@ -143,16 +171,13 @@ pub mod hyper {
}

#[async_trait]
impl<C, B> HttpClient for HyperClient<C, B>
impl<C> HttpClient for HyperClient<C>
where
C: Connect + Send + Sync + Clone + Debug + 'static,
B: From<Vec<u8>> + Body + Send + Sync + Debug + Unpin + 'static,
<B as Body>::Data: Send,
<B as Body>::Error: Into<Box<dyn Error + Send + Sync>>,
{
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let (parts, body) = request.into_parts();
let mut request = Request::from_parts(parts, B::from(body));
let mut request = Request::from_parts(parts, Body(Full::from(body)));

Check warning on line 180 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L180

Added line #L180 was not covered by tests
if let Some(ref authorization) = self.authorization {
request
.headers_mut()
Expand Down

0 comments on commit 25f9213

Please sign in to comment.