Skip to content

Commit

Permalink
refactor: Body::wrap_stream -> Body::from_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
omjadas committed Apr 29, 2024
1 parent de4bebe commit 1279433
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 18 deletions.
4 changes: 2 additions & 2 deletions benches/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ fn raw_body() -> Body {

fn gzip_body() -> Body {
let encoder = GzipEncoder::new(&BODY[..]);
Body::wrap_stream(ReaderStream::new(encoder))
Body::from_stream(ReaderStream::new(encoder))
}

fn gzip_brotli_body() -> Body {
let encoder = GzipEncoder::new(&BODY[..]);
let encoder = BrotliEncoder::new(BufReader::new(encoder));
Body::wrap_stream(ReaderStream::new(encoder))
Body::from_stream(ReaderStream::new(encoder))
}

fn raw_request() -> Request<Body> {
Expand Down
23 changes: 13 additions & 10 deletions src/body.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::Error;
use futures::{Stream, StreamExt};
use futures::{Stream, TryStream, TryStreamExt};
use http_body_util::{combinators::BoxBody, Collected, Empty, Full, StreamBody};
use hyper::body::{Body as HttpBody, Bytes, Frame, Incoming, SizeHint};
use std::pin::Pin;

#[derive(Debug)]
enum Internal {
BoxBody(BoxBody<Bytes, crate::Error>),
BoxBody(BoxBody<Bytes, Error>),
Collected(Collected<Bytes>),
Empty(Empty<Bytes>),
Full(Full<Bytes>),
Expand All @@ -21,23 +21,26 @@ pub struct Body {
}

impl Body {
pub fn wrap_stream<S, O, E>(stream: S) -> Self
pub fn from_stream<S>(stream: S) -> Self
where
S: Stream<Item = Result<O, E>> + Send + Sync + 'static,
O: Into<Bytes>,
E: Into<Error>,
S: TryStream + Send + Sync + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<Error>,
{
Self {
inner: Internal::BoxBody(BoxBody::new(StreamBody::new(
stream.map(|res| res.map(Into::into).map(Frame::data).map_err(Into::into)),
stream
.map_ok(Into::into)
.map_ok(Frame::data)
.map_err(Into::into),
))),
}
}
}

impl HttpBody for Body {
type Data = Bytes;
type Error = crate::Error;
type Error = Error;

fn poll_frame(
mut self: std::pin::Pin<&mut Self>,
Expand Down Expand Up @@ -76,8 +79,8 @@ impl HttpBody for Body {
}
}

impl From<BoxBody<Bytes, crate::Error>> for Body {
fn from(value: BoxBody<Bytes, crate::Error>) -> Self {
impl From<BoxBody<Bytes, Error>> for Body {
fn from(value: BoxBody<Bytes, Error>) -> Self {
Self {
inner: Internal::BoxBody(value),
}
Expand Down
10 changes: 5 additions & 5 deletions src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl From<Decoder<Body>> for Body {
fn from(decoder: Decoder<Body>) -> Body {
match decoder {
Decoder::Body(body) => body,
Decoder::Decoder(decoder) => Body::wrap_stream(ReaderStream::new(decoder)),
Decoder::Decoder(decoder) => Body::from_stream(ReaderStream::new(decoder)),
}
}
}
Expand Down Expand Up @@ -307,7 +307,7 @@ mod tests {
async fn single_encoding() {
let content = b"hello, world";
let encoder = GzipEncoder::new(&content[..]);
let body = Body::wrap_stream(ReaderStream::new(encoder));
let body = Body::from_stream(ReaderStream::new(encoder));

assert_eq!(
&to_bytes(decode_body(vec![&b"gzip"[..]], body).unwrap()).await[..],
Expand All @@ -320,7 +320,7 @@ mod tests {
let content = b"hello, world";
let encoder = GzipEncoder::new(&content[..]);
let encoder = BrotliEncoder::new(BufReader::new(encoder));
let body = Body::wrap_stream(ReaderStream::new(encoder));
let body = Body::from_stream(ReaderStream::new(encoder));

assert_eq!(
&to_bytes(decode_body(vec![&b"br"[..], &b"gzip"[..]], body).unwrap()).await[..],
Expand All @@ -347,7 +347,7 @@ mod tests {
let req = Request::builder()
.header(CONTENT_LENGTH, 123)
.header(CONTENT_ENCODING, "gzip")
.body(Body::wrap_stream(ReaderStream::new(encoder)))
.body(Body::from_stream(ReaderStream::new(encoder)))
.unwrap();

let req = decode_request(req).unwrap();
Expand All @@ -369,7 +369,7 @@ mod tests {
let res = Response::builder()
.header(CONTENT_LENGTH, 123)
.header(CONTENT_ENCODING, "gzip")
.body(Body::wrap_stream(ReaderStream::new(encoder)))
.body(Body::from_stream(ReaderStream::new(encoder)))
.unwrap();

let res = decode_response(res).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn test_server(req: Request<Incoming>) -> Result<Response<Body>, Infallibl
(&Method::GET, "/hello/gzip") => Ok(Response::builder()
.header(CONTENT_ENCODING, "gzip")
.status(StatusCode::OK)
.body(Body::wrap_stream(ReaderStream::new(GzipEncoder::new(
.body(Body::from_stream(ReaderStream::new(GzipEncoder::new(
HELLO_WORLD.as_bytes(),
))))
.unwrap()),
Expand Down

0 comments on commit 1279433

Please sign in to comment.