diff --git a/Cargo.toml b/Cargo.toml index 3387ff2..4d51bc3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,11 +21,13 @@ tls = [ "tokio-rustls" ] [dependencies] async-trait = "0.1" bytes = "1.0" -cookie = "0.17" +cookie = "0.18" futures = "0.3" futures-util = "0.3" headers = "0.3" -hyper = { version="0.14", features=["server", "http1", "http2", "stream"] } +http-body-util = "0.1" +hyper = { version="1.0", features=["server", "http1", "http2"] } +hyper-util = { version = "0.1", features=["tokio", "server-auto"] } lazy_static = "1.4" mime = "0.3" mime_guess = "2.0" diff --git a/src/error.rs b/src/error.rs index d76c825..1786ef4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,6 +12,10 @@ pub enum Error { FormDecodeError(#[from] serde_urlencoded::de::Error), #[error("serde_urlencoded decode error")] FormEncodeError(#[from] serde_urlencoded::ser::Error), + #[error("infallible")] + Infallible(#[from] std::convert::Infallible), + #[error("mime error")] + Mime(#[from] mime::FromStrError), #[error("lieweb error")] Message(String), #[error("invalid request header {name:?}")] diff --git a/src/extracts.rs b/src/extracts.rs index b2797a5..75265c3 100644 --- a/src/extracts.rs +++ b/src/extracts.rs @@ -4,9 +4,9 @@ use std::{ ops::{Deref, DerefMut}, }; -use bytes::{Buf, Bytes, BytesMut}; -use headers::HeaderMapExt; -use hyper::{body::HttpBody, Body, StatusCode}; +use bytes::Bytes; +use http_body_util::BodyExt; +use hyper::StatusCode; use mime::Mime; use serde::de::DeserializeOwned; @@ -365,7 +365,7 @@ impl FromRequest for BytesBody { } #[crate::async_trait] -impl FromRequest for Body { +impl FromRequest for hyper::body::Incoming { type Rejection = BodyBeenTaken; async fn from_request(req: &mut RequestParts) -> Result { @@ -383,27 +383,26 @@ impl FromRequest for Body { fn get_content_type(req: &mut RequestParts) -> mime::Mime { req.headers() - .typed_get::() - .map(Into::into) + .get(hyper::header::CONTENT_TYPE) + .and_then(|v| { + String::from_utf8_lossy(v.as_bytes()) + .parse::() + .ok() + }) .unwrap_or(mime::APPLICATION_OCTET_STREAM) } async fn read_body(req: &mut RequestParts) -> Result { - let mut body = req + let body = req .body_mut() .take() .ok_or(ReadBodyRejection::BodyBeenTaken(BodyBeenTaken))?; - let mut bufs = BytesMut::new(); + let body = BodyExt::collect(body) + .await + .map_err(ReadBodyRejection::ReadFailed)?; - while let Some(buf) = body.data().await { - let buf = buf.map_err(ReadBodyRejection::ReadFailed)?; - if buf.has_remaining() { - bufs.extend(buf); - } - } - - Ok(bufs.freeze()) + Ok(body.to_bytes()) } mod params_de { diff --git a/src/request.rs b/src/request.rs index ed71b78..78d0d19 100644 --- a/src/request.rs +++ b/src/request.rs @@ -1,13 +1,14 @@ use std::net::SocketAddr; -use bytes::{Buf, Bytes, BytesMut}; -use headers::{Header, HeaderMapExt, HeaderName, HeaderValue}; -use hyper::body::HttpBody; +use bytes::Bytes; +use cookie::Cookie; +use http_body_util::BodyExt; use hyper::http; +use hyper::http::{HeaderName, HeaderValue}; use pathrouter::Params; use serde::de::DeserializeOwned; -pub type Request = hyper::Request; +pub type Request = hyper::Request; use crate::error::{invalid_header, invalid_param, missing_cookie, missing_header, missing_param}; use crate::response::IntoResponse; @@ -20,7 +21,7 @@ pub trait FromRequest: Sized { async fn from_request(req: &mut RequestParts) -> Result; } -pub type RequestParts = hyper::Request>; +pub type RequestParts = hyper::Request>; #[crate::async_trait] pub trait LieRequest { @@ -34,7 +35,7 @@ pub trait LieRequest { fn get_header(&self, header: K) -> Result<&HeaderValue, Error> where HeaderName: From; - fn get_typed_header(&self) -> Result; + // fn get_typed_header(&self) -> Result; async fn read_body(&mut self) -> Result; async fn read_form(&mut self) -> Result; @@ -85,32 +86,26 @@ impl LieRequest for Request { Ok(value) } - fn get_typed_header(&self) -> Result { - self.headers() - .typed_get::() - .ok_or_else(|| invalid_header(T::name().as_str())) - } + // fn get_typed_header(&self) -> Result { + // self.headers() + // .typed_get::() + // .ok_or_else(|| invalid_header(T::name().as_str())) + // } fn get_cookie(&self, name: &str) -> Result { - let cookie: headers::Cookie = self.get_typed_header()?; - - cookie - .get(name) - .ok_or_else(|| missing_cookie(name)) - .map(|s| s.to_string()) + let cookie = self.get_header(hyper::header::COOKIE)?; + let cookie = String::from_utf8_lossy(cookie.as_bytes()); + + Cookie::split_parse(cookie) + .flatten() + .find(|item| item.name() == name) + .map(|item| item.to_string()) + .ok_or(missing_cookie(name)) } async fn read_body(&mut self) -> Result { - let mut bufs = BytesMut::new(); - - while let Some(buf) = self.body_mut().data().await { - let buf = buf?; - if buf.has_remaining() { - bufs.extend(buf); - } - } - - Ok(bufs.freeze()) + let body = BodyExt::collect(self.body_mut()).await?; + Ok(body.to_bytes()) } async fn read_form(&mut self) -> Result { diff --git a/src/response.rs b/src/response.rs index 7e7e3c0..8b8a7c2 100644 --- a/src/response.rs +++ b/src/response.rs @@ -3,6 +3,10 @@ use std::{borrow::Cow, convert::Infallible}; use bytes::Bytes; +use futures_util::StreamExt; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Empty, Full}; +use hyper::body::Frame; use hyper::http::{ self, header::{HeaderMap, HeaderName, HeaderValue}, @@ -10,8 +14,9 @@ use hyper::http::{ }; use crate::ty::{BytesBody, Form, Html, Json, StreamBody}; +use crate::Error; -pub type Response = http::Response; +pub type Response = http::Response>; pub trait IntoResponse { fn into_response(self) -> Response; @@ -35,11 +40,11 @@ pub struct LieResponse { } impl LieResponse { - pub fn new(status: StatusCode, body: impl Into) -> Self { + pub fn new(status: StatusCode, body: impl Into) -> Self { LieResponse { inner: http::Response::builder() .status(status) - .body(body.into()) + .body(Full::new(body.into()).map_err(Into::into).boxed()) .unwrap(), } } @@ -49,11 +54,7 @@ impl LieResponse { resp.set_status(status) } - pub fn with_html(body: T) -> Self - where - hyper::Body: From, - T: Send, - { + pub fn with_html(body: impl Into) -> Self { Html::new(body).into() } @@ -82,9 +83,9 @@ impl LieResponse { pub fn with_stream(s: S, content_type: mime::Mime) -> Self where - S: futures::Stream> + Send + 'static, + S: futures::Stream> + Send + Sync + 'static, B: Into + 'static, - E: std::error::Error + Send + Sync + 'static, + E: Into + Send + Sync + 'static, { StreamBody::new(s, content_type).into() } @@ -184,22 +185,22 @@ impl LieResponse { self.append_header(http::header::SET_COOKIE, cookie.to_string()) } - pub async fn body_bytes(&mut self) -> Result, crate::Error> { - use bytes::Buf; - use bytes::BytesMut; - use hyper::body::HttpBody; + // pub async fn body_bytes(&mut self) -> Result, crate::Error> { + // use bytes::Buf; + // use bytes::BytesMut; + // use hyper::body::HttpBody; - let mut bufs = BytesMut::new(); + // let mut bufs = BytesMut::new(); - while let Some(buf) = self.inner.body_mut().data().await { - let buf = buf?; - if buf.has_remaining() { - bufs.extend(buf); - } - } + // while let Some(buf) = self.inner.body_mut().data().await { + // let buf = buf?; + // if buf.has_remaining() { + // bufs.extend(buf); + // } + // } - Ok(bufs.freeze().to_vec()) - } + // Ok(bufs.freeze().to_vec()) + // } } impl From for LieResponse { @@ -234,7 +235,7 @@ impl IntoResponse for StatusCode { hyper::header::CONTENT_TYPE, mime::TEXT_PLAIN_UTF_8.to_string(), ) - .body(hyper::Body::from(format!("{}", self))) + .body(Empty::new().map_err(Into::into).boxed()) .unwrap() } } @@ -246,7 +247,11 @@ impl From<&'static [u8]> for LieResponse { hyper::header::CONTENT_TYPE, mime::APPLICATION_OCTET_STREAM.to_string(), ) - .body(hyper::Body::from(val)) + .body( + Full::new(Bytes::from_static(val)) + .map_err(Into::into) + .boxed(), + ) .unwrap() .into() } @@ -259,7 +264,11 @@ impl IntoResponse for &'static [u8] { hyper::header::CONTENT_TYPE, mime::APPLICATION_OCTET_STREAM.to_string(), ) - .body(hyper::Body::from(self)) + .body( + Full::new(Bytes::from_static(self)) + .map_err(Into::into) + .boxed(), + ) .unwrap() } } @@ -271,7 +280,7 @@ impl From> for LieResponse { hyper::header::CONTENT_TYPE, mime::APPLICATION_OCTET_STREAM.to_string(), ) - .body(hyper::Body::from(val)) + .body(Full::new(Bytes::from(val)).map_err(Into::into).boxed()) .unwrap() .into() } @@ -284,7 +293,7 @@ impl IntoResponse for Vec { hyper::header::CONTENT_TYPE, mime::APPLICATION_OCTET_STREAM.to_string(), ) - .body(hyper::Body::from(self)) + .body(Full::new(Bytes::from(self)).map_err(Into::into).boxed()) .unwrap() } } @@ -296,7 +305,11 @@ impl From<&'static str> for LieResponse { hyper::header::CONTENT_TYPE, mime::TEXT_PLAIN_UTF_8.to_string(), ) - .body(hyper::Body::from(val)) + .body( + Full::new(Bytes::from_static(val.as_bytes())) + .map_err(Into::into) + .boxed(), + ) .unwrap() .into() } @@ -309,7 +322,11 @@ impl IntoResponse for &'static str { hyper::header::CONTENT_TYPE, mime::TEXT_PLAIN_UTF_8.to_string(), ) - .body(hyper::Body::from(self)) + .body( + Full::new(Bytes::from_static(self.as_bytes())) + .map_err(Into::into) + .boxed(), + ) .unwrap() } } @@ -321,7 +338,7 @@ impl From for LieResponse { hyper::header::CONTENT_TYPE, mime::TEXT_PLAIN_UTF_8.to_string(), ) - .body(hyper::Body::from(val)) + .body(Full::new(Bytes::from(val)).map_err(Into::into).boxed()) .unwrap() .into() } @@ -334,7 +351,7 @@ impl IntoResponse for String { hyper::header::CONTENT_TYPE, mime::TEXT_PLAIN_UTF_8.to_string(), ) - .body(hyper::Body::from(self)) + .body(Full::new(Bytes::from(self)).map_err(Into::into).boxed()) .unwrap() } } @@ -365,7 +382,7 @@ impl IntoResponse for (StatusCode, &'static str) { hyper::header::CONTENT_TYPE, mime::TEXT_PLAIN_UTF_8.to_string(), ) - .body(hyper::Body::from(self.1)) + .body(Full::new(Bytes::from(self.1)).map_err(Into::into).boxed()) .unwrap() } } @@ -376,7 +393,11 @@ impl IntoResponse for crate::Error { http::Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(hyper::Body::from("Internal Server Error")) + .body( + Full::new(Bytes::from("Internal Server Error")) + .map_err(Into::into) + .boxed(), + ) .unwrap() } } @@ -387,7 +408,11 @@ impl From for LieResponse { http::Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(hyper::Body::from("Internal Server Error")) + .body( + Full::new(Bytes::from("Internal Server Error")) + .map_err(Into::into) + .boxed(), + ) .unwrap() .into() } @@ -432,7 +457,7 @@ where hyper::header::CONTENT_TYPE, mime::APPLICATION_WWW_FORM_URLENCODED.to_string(), ) - .body(hyper::Body::from(b)) + .body(Full::new(Bytes::from(b)).map_err(Into::into).boxed()) .unwrap(), ) }) @@ -451,7 +476,7 @@ impl From for LieResponse { hyper::header::CONTENT_TYPE, mime::TEXT_HTML_UTF_8.to_string(), ) - .body(val.body) + .body(val.body.map_err(Into::into).boxed()) .unwrap() .into() } @@ -470,7 +495,7 @@ where hyper::header::CONTENT_TYPE, mime::APPLICATION_JSON.to_string(), ) - .body(hyper::Body::from(b)) + .body(Full::new(Bytes::from(b)).map_err(Into::into).boxed()) .unwrap(), ) }) @@ -488,7 +513,7 @@ impl From for LieResponse { http::Response::builder() .header(hyper::header::CONTENT_TYPE, content_type.to_string()) - .body(hyper::Body::from(body)) + .body(Full::new(body).map_err(Into::into).boxed()) .unwrap() .into() } @@ -496,17 +521,20 @@ impl From for LieResponse { impl From> for LieResponse where - S: futures::Stream> + Send + 'static, + S: futures::Stream> + Send + Sync + 'static, B: Into + 'static, - E: std::error::Error + Send + Sync + 'static, + E: Into + Send + Sync + 'static, { fn from(body: StreamBody) -> LieResponse { let StreamBody { s, content_type } = body; - http::Response::builder() + let body = s.map(|b| b.map(|b| Frame::data(b.into())).map_err(Into::into)); + + let resp = http::Response::builder() .header(hyper::header::CONTENT_TYPE, content_type.to_string()) - .body(hyper::Body::wrap_stream(s)) - .unwrap() - .into() + .body(BodyExt::boxed(http_body_util::StreamBody::new(body))) + .unwrap(); + + resp.into() } } diff --git a/src/server.rs b/src/server.rs index a73aff8..ffe416b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,8 +3,8 @@ use std::path::Path; use std::sync::Arc; use hyper::http; -use hyper::server::conn::Http; use hyper::service::service_fn; +use hyper_util::rt::{TokioExecutor, TokioIo}; use lazy_static::lazy_static; use tokio::net::{TcpListener, ToSocketAddrs}; @@ -99,18 +99,16 @@ impl App { let router = Arc::new(router); - let server = Http::new(); - let listener = TcpListener::bind(addr).await.unwrap(); while let Ok((socket, remote_addr)) = listener.accept().await { - let server = server.clone(); + let server = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); let router = router.clone(); tokio::task::spawn(async move { let router = router.clone(); - let ret = server.serve_connection( - socket, + let ret = server.serve_connection_with_upgrades( + TokioIo::new(socket), service_fn(|mut req| { let router = router.clone(); RequestCtx::init(&mut req, Some(remote_addr)); @@ -198,77 +196,3 @@ pub fn server_id() -> &'static str { &SERVER_ID } -#[cfg(test)] -mod test { - use bytes::Buf; - use hyper::body::HttpBody; - - use crate::{App, Request, Response, Router}; - - fn app() -> App { - let mut app = App::new(); - - app.get("/", || async move { "/" }); - app.post("/post", || async move { "/post" }); - - app - } - - fn request(method: &str, uri: &str) -> Request { - hyper::Request::builder() - .uri(uri) - .method(method) - .body(crate::hyper::Body::empty()) - .unwrap() - } - - async fn body_bytes(resp: &mut Response) -> Vec { - let mut bufs = bytes::BytesMut::new(); - - while let Some(buf) = resp.body_mut().data().await { - let buf = buf.unwrap(); - if buf.has_remaining() { - bufs.extend(buf); - } - } - - bufs.freeze().to_vec() - } - - #[tokio::test] - async fn basic() { - let mut resp = app().respond(request("GET", "/")).await; - assert_eq!(body_bytes(&mut resp).await, b"/".to_vec()) - } - - #[tokio::test] - async fn basic_post() { - let mut resp = app().respond(request("POST", "/post")).await; - assert_eq!(body_bytes(&mut resp).await, b"/post".to_vec()) - } - - #[tokio::test] - async fn tree() { - let mut app = app(); - - app.get("/aa", || async move { "aa" }); - - let mut router_c = Router::new(); - router_c.get("/c", || async move { "a-b-c" }); - - let mut router_b = Router::new(); - router_b.merge("/b/", router_c).unwrap(); - - app.merge("/a/", router_b).unwrap(); - - // let mut resp = app.respond(request("GET", "/aa")).await; - // let body = body_bytes(&mut resp).await; - // assert_eq!(body, b"aa".to_vec()); - - let mut resp = app.respond(request("GET", "/a/b/c")).await; - assert_eq!(resp.status(), 200); - - let body = body_bytes(&mut resp).await; - assert_eq!(body, b"a-b-c".to_vec()); - } -} diff --git a/src/ty.rs b/src/ty.rs index 20b3e0e..a051f9a 100644 --- a/src/ty.rs +++ b/src/ty.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use hyper::Body; +use http_body_util::Full; pub struct Form { pub(crate) value: T, @@ -20,12 +20,12 @@ impl Form { } pub struct Html { - pub(crate) body: Body, + pub(crate) body: Full, } impl Html { - pub fn new(body: impl Into) -> Self { - Html { body: body.into() } + pub fn new(body: impl Into) -> Self { + Html { body: Full::new(body.into()) } } } @@ -54,9 +54,9 @@ pub struct StreamBody { impl StreamBody where - S: futures::Stream> + Send + 'static, + S: futures::Stream> + Send + Sync + 'static, B: Into + 'static, - E: std::error::Error + Send + Sync + 'static, + E: Into + Send + Sync + 'static, { pub fn new(s: S, content_type: mime::Mime) -> Self { StreamBody { s, content_type }