Skip to content

Commit

Permalink
Use hyper 1 (#11)
Browse files Browse the repository at this point in the history
* address a couple of todos

* draft: use hyper 1

* draft

* draft: try using u8

* draft: hyper 1 works

* refactor: improve code quality

* chore: unnecessary format

* chore: ad test

* refactor: factor out some methods

* refactor: rearrange variables

* feat: use a broadcast queue for streaming monitoring data

* fix: stop monitoring thread if channel is closed

* refactor: code golfing in main
  • Loading branch information
gabotechs authored Dec 12, 2023
1 parent 6688e5e commit 4847400
Show file tree
Hide file tree
Showing 14 changed files with 977 additions and 595 deletions.
376 changes: 333 additions & 43 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ edition = "2021"

[dependencies]
openssl = { version = "0.10", features = ["vendored"] } # NOTE: neeeded for cross compilations
hyper = { version = "^0.14.26", features = ["full"] }
hyper-tls = "^0.5.0"
tokio = { version = "^1.28.1", features = ["full"] }
hyper = { version = "^1.0.1", features = ["full"] }
http-body-util = "0.1.0"
hyper-util = { version = "0.1.1", features = ["full"]}
hyper-tls = "0.6.0"
tokio = { version = "^1.34.0", features = ["full"] }
url = "^2.3.1"
anyhow = "^1.0.71"
serde = { version = "^1.0.163", features = ["derive"] }
Expand All @@ -23,7 +25,9 @@ time = { version = "^0.3.6", features = ["formatting", "macros", "parsing"] }
percent-encoding = "^2.2.0"
tracing = "^0.1.37"
async-trait = "^0.1.68"
futures-util = "0.3.29"
bytes = "1.5.0"

[dev-dependencies]
lazy_static = "^1.4.0"
reqwest = { version = "^0.11.18", features = ["json"] }
reqwest = { version = "^0.11.22", features = ["json"] }
32 changes: 16 additions & 16 deletions server/src/_test_tools.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
use hyper::{Body, Request};

#[cfg(test)]
pub(crate) mod tests {
use hyper::Request;
use std::collections::HashMap;
use std::convert::Infallible;
use std::error::Error;
use std::str::FromStr;

use anyhow::{anyhow, Context};
use async_trait::async_trait;
use http_body_util::Full;
use hyper::header::HeaderName;
use hyper::{HeaderMap, Response, StatusCode, Uri};
use serde::de::DeserializeOwned;
use time::{OffsetDateTime, PrimitiveDateTime};
use url::Url;

use crate::signing::{ElementsToSign, UrlSigner};
use crate::signing::{ElementsToSign, SignedBody, UrlSigner};
use crate::sw_body::{empty, sw_body_from_string, SwBody};
use crate::{GetSecretResponse, SecretGetter, SecretGetterResult};

use super::*;

#[derive(Clone, Debug)]
pub(crate) struct ReqBuilder {
method: String,
Expand Down Expand Up @@ -75,10 +74,10 @@ pub(crate) mod tests {
}
}

pub(crate) fn build(&self) -> anyhow::Result<Request<Body>> {
let body = match &self.body {
Some(b) => Body::from(b.to_string()),
None => Body::empty(),
pub(crate) fn build(self) -> anyhow::Result<Request<SwBody>> {
let body = match self.body.clone() {
Some(b) => sw_body_from_string(b),
None => empty(),
};

let mut builder = Request::builder();
Expand All @@ -101,7 +100,10 @@ pub(crate) mod tests {
datetime: PrimitiveDateTime::new(now.date(), now.time()),
method: self.method.clone(),
headers: self.build_headers()?,
body: self.body.clone(),
body: match self.body.clone() {
None => SignedBody::None,
Some(value) => SignedBody::Some(value),
},
};

let signer = UrlSigner::new(id, secret);
Expand Down Expand Up @@ -156,18 +158,16 @@ pub(crate) mod tests {

#[async_trait]
impl SecretGetter for InMemorySecretGetter {
type Error = Infallible;

async fn get_secret(&self, id: &str) -> Result<GetSecretResponse, Self::Error> {
async fn get_secret(&self, id: &str) -> Result<GetSecretResponse, Box<dyn Error>> {
let secret = match self.0.get(id).cloned() {
Some(a) => a,
None => {
return Ok(GetSecretResponse::EarlyResponse(
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(Body::empty())
.body(Full::default())
.unwrap(),
))
));
}
};

Expand Down
51 changes: 0 additions & 51 deletions server/src/body.rs

This file was deleted.

81 changes: 30 additions & 51 deletions server/src/gateway_callbacks.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
use async_trait::async_trait;
use hyper::{Body, Request, Response};
use std::fmt::{Display, Formatter};

use async_trait::async_trait;
use bytes::Bytes;
use http_body_util::Full;
use hyper::http::{request, response};
use hyper::Response;
use url::Url;

pub enum CallbackResult {
EarlyResponse(Response<Body>),
EarlyResponse(Response<Full<Bytes>>),
Empty,
}

#[async_trait]
pub trait OnRequest: Sync + Send {
async fn call(&self, id: &str, req: &Request<Body>) -> CallbackResult;
async fn call(&self, id: &str, req: &request::Parts) -> CallbackResult;
}

#[async_trait]
pub trait OnSuccess: Sync + Send {
async fn call(&self, id: &str, res: &Response<Body>) -> CallbackResult;
async fn call(&self, id: &str, res: &response::Parts) -> CallbackResult;
}

#[derive(Debug, Clone)]
Expand All @@ -41,30 +45,27 @@ impl Display for BytesTransferredKind {
pub struct BytesTransferredInfo {
pub id: String,
pub proxy_url: Url,
pub bytes: usize,
pub kind: BytesTransferredKind,
}

#[async_trait]
pub trait OnBytesTransferred: Sync + Send {
async fn call(&self, bytes: usize, info: BytesTransferredInfo);
}

#[cfg(test)]
mod tests {
use crate::_test_tools::tests::{InMemorySecretGetter, ReqBuilder};
use crate::body::body_to_string;
use crate::gateway_callbacks::{CallbackResult, OnRequest, OnSuccess};
use crate::{
BytesTransferredInfo, HeaderMap, OnBytesTransferred, SecretGetterResult, SignwayServer,
};
use async_trait::async_trait;
use hyper::body::HttpBody;
use hyper::{Body, Request, Response, StatusCode};
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::SeqCst;

fn server() -> SignwayServer<InMemorySecretGetter> {
use async_trait::async_trait;
use hyper::http::{request, response};
use hyper::{Request, StatusCode};

use crate::_test_tools::tests::{InMemorySecretGetter, ReqBuilder};
use crate::gateway_callbacks::{CallbackResult, OnRequest, OnSuccess};
use crate::sw_body::SwBody;
use crate::{HeaderMap, SecretGetterResult, SignwayServer};

fn server() -> SignwayServer {
SignwayServer::from_env(InMemorySecretGetter(HashMap::from([(
"foo".to_string(),
SecretGetterResult {
Expand All @@ -74,7 +75,7 @@ mod tests {
)])))
}

fn req() -> Request<Body> {
fn req() -> Request<SwBody> {
ReqBuilder::default()
.query("page", "1")
.header("Content-Length", "3")
Expand All @@ -90,35 +91,30 @@ mod tests {

#[async_trait]
impl<'a> OnRequest for SizeCollector<'a> {
async fn call(&self, _id: &str, req: &Request<Body>) -> CallbackResult {
self.0.fetch_add(req.size_hint().exact().unwrap(), SeqCst);
async fn call(&self, _id: &str, req: &request::Parts) -> CallbackResult {
let size: &str = req.headers.get("content-length").unwrap().to_str().unwrap();
self.0.fetch_add(u64::from_str(size).unwrap(), SeqCst);
CallbackResult::Empty
}
}

#[async_trait]
impl<'a> OnSuccess for SizeCollector<'a> {
async fn call(&self, _id: &str, res: &Response<Body>) -> CallbackResult {
self.0.fetch_add(res.size_hint().exact().unwrap(), SeqCst);
async fn call(&self, _id: &str, res: &response::Parts) -> CallbackResult {
let size: &str = res.headers.get("content-length").unwrap().to_str().unwrap();
self.0.fetch_add(u64::from_str(size).unwrap(), SeqCst);
CallbackResult::Empty
}
}

#[async_trait]
impl<'a> OnBytesTransferred for SizeCollector<'a> {
async fn call(&self, bytes: usize, _info: BytesTransferredInfo) {
self.0.fetch_add(bytes as u64, SeqCst);
}
}

#[tokio::test]
async fn test_on_request() {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let size_collector = SizeCollector(&COUNTER);

let response = server()
.on_request(size_collector)
.route_gateway(req())
.handler(req())
.await
.unwrap();

Expand All @@ -133,28 +129,11 @@ mod tests {

let response = server()
.on_success(size_collector)
.route_gateway(req())
.handler(req())
.await
.unwrap();

assert_eq!(response.status(), StatusCode::OK);
assert_eq!(COUNTER.load(SeqCst), 396);
}

#[tokio::test]
async fn test_on_bytes_transferred() {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let size_collector = SizeCollector(&COUNTER);

let response = server()
.on_bytes_transferred(size_collector)
.route_gateway(req())
.await
.unwrap();

assert_eq!(response.status(), StatusCode::OK);
assert_eq!(COUNTER.load(SeqCst), 3);
body_to_string(response.into_body(), 396).await.unwrap();
assert_eq!(COUNTER.load(SeqCst), 399);
}
}
5 changes: 4 additions & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub use http_body_util;
pub use hyper;
pub use hyper_util;

pub use gateway_callbacks::*;
pub use secret_getter::*;
Expand All @@ -7,9 +9,10 @@ pub use server::*;
#[cfg(test)]
mod _test_tools;

mod body;
mod gateway_callbacks;
mod route_gateway;
mod secret_getter;
mod server;
mod signing;
mod sw_body;
mod monitoring;
Loading

0 comments on commit 4847400

Please sign in to comment.