Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: upgrade to http 1.x #686

Merged
merged 5 commits into from
Apr 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
789 changes: 381 additions & 408 deletions Cargo.lock

Large diffs are not rendered by default.

26 changes: 15 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -23,13 +23,12 @@ panic = "abort"
ansi_term = "0.12"
anyhow = "1"
async-recursion = "1.0.5"
axum = { version = "0.6", features = ["ws"] }
axum-server = "0.5.1"
base64 = "0.21"
axum = { version = "0.7", features = ["ws"] }
axum-server = "0.6"
base64 = "0.22"
bytes = "1"
cargo-lock = "9"
cargo_metadata = "0.18.1"
crates_io_api = { version = "0.9", default-features = false }
clap = { version = "4", features = ["derive", "env"] }
console = "0.15"
directories = "5"
@@ -40,7 +39,7 @@ futures-util = { version = "0.3", default-features = false, features = ["sink"]
htmlescape = "0.3.1"
humantime = "2"
humantime-serde = "1"
hyper = "0.14"
hyper = "1.1"
local-ip-address = "0.6.1"
lol_html = "1.2.1"
mime_guess = "2.0.4"
@@ -53,7 +52,7 @@ open = "5"
oxipng = "9"
parking_lot = "0.12"
remove_dir_all = "0.8"
reqwest = { version = "0.11", default-features = false, features = ["stream", "trust-dns"] }
reqwest = { version = "0.12", default-features = false, features = ["stream", "trust-dns"] }
sha2 = "0.10"
seahash = { version = "4", features = ["use_std"] }
semver = "1"
@@ -65,16 +64,19 @@ time = { version = "0.3", features = ["serde-well-known"] }
thiserror = "1"
tokio = { version = "1", default-features = false, features = ["full"] }
tokio-stream = { version = "0.1", default-features = false, features = ["fs", "sync"] }
tokio-tungstenite = "0.20"
tokio-tungstenite = "0.21"
toml = "0.8"
tower-http = { version = "0.4", features = ["fs", "trace", "set-header"] }
tower-http = { version = "0.5.1", features = ["fs", "trace", "set-header"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
which = "6"
zip = "0.6"

# pin lightningcss, used by trunk, also pulled in by minify-html
lightningcss = "=1.0.0-alpha.54"
lightningcss = "=1.0.0-alpha.55"

# required for the update check
crates_io_api = { version = "0.11", default-features = false, optional = true }

[dev-dependencies]
tempfile = "3"
@@ -87,7 +89,7 @@ default = ["update_check", "rustls"]
rustls = [
"axum-server/tls-rustls",
"crates_io_api/rustls",
"reqwest/rustls",
"reqwest/rustls-tls",
"reqwest/rustls-tls-native-roots",
"tokio-tungstenite/rustls",
"tokio-tungstenite/rustls-tls-native-roots",
@@ -101,4 +103,6 @@ native-tls = [
]

# enable the update check on startup
update_check = []
update_check = [
"crates_io_api"
]
52 changes: 32 additions & 20 deletions src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
use std::sync::Arc;

use crate::serve::{ServerError, ServerResult};
use anyhow::Context;
use axum::body::Body;
use axum::extract::ws::{Message as MsgAxm, WebSocket, WebSocketUpgrade};
use axum::extract::State;
use axum::http::{Request, Response, Uri};
use axum::routing::{any, get, Router};
use axum::RequestExt;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use hyper::header::HOST;
use hyper::HeaderMap;
use axum::{
body::Body,
extract::{
ws::{Message as MsgAxm, WebSocket, WebSocketUpgrade},
Request, State,
},
http::{Response, Uri},
routing::{any, get, Router},
RequestExt,
};
use bytes::BytesMut;
use futures_util::{sink::SinkExt, stream::StreamExt, TryStreamExt};
use hyper::{header::HOST, HeaderMap};
use reqwest::header::HeaderValue;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::Message as MsgTng;
use std::sync::Arc;
use tokio_tungstenite::{
connect_async,
tungstenite::{protocol::CloseFrame, Message as MsgTng},
};
use tower_http::trace::TraceLayer;

use crate::serve::ServerResult;

/// The `X-Forwarded-Host`` (XFH) header is a de-facto standard header for
/// identifying the original host requested by the client in the Host HTTP
/// request header.
@@ -129,15 +131,25 @@ impl ProxyHandlerHttp {
#[tracing::instrument(level = "debug", skip(state, req))]
async fn proxy_http_request(
State(state): State<Arc<Self>>,
req: Request<Body>,
req: Request,
) -> ServerResult<Response<Body>> {
// Construct the outbound URI & build a new request to be sent to the proxy backend.
let outbound_uri = make_outbound_uri(&state.backend, req.uri())?;
let mut outbound_req = state
.client
.request(req.method().clone(), outbound_uri.to_string())
.headers(req.headers().clone())
.body(req.into_body())
.body(reqwest::Body::from(
// It would be better to use a stream for this. However, right now,
// .into_data_stream() returns a stream which is not Send+Sync, so we can't pass it
// on to reqwest::Body::wrap_stream(..).
req.into_body()
.into_data_stream()
.try_collect::<BytesMut>()
.await
.map_err(|err| ServerError(err.into()))?
.freeze(),
))
.build()
.context("error building outbound request to proxy backend")?;

@@ -160,7 +172,7 @@ impl ProxyHandlerHttp {
}

Ok(res
.body(Body::wrap_stream(backend_res.bytes_stream()))
.body(Body::from_stream(backend_res.bytes_stream()))
.context("error building proxy response")?)
}
}
15 changes: 8 additions & 7 deletions src/serve/mod.rs
Original file line number Diff line number Diff line change
@@ -6,11 +6,11 @@ use crate::tls::TlsConfig;
use crate::watch::WatchSystem;
use crate::ws;
use anyhow::{Context, Result};
use axum::body::{self, Body, Bytes};
use axum::body::{Body, Bytes};
use axum::extract;
use axum::extract::ws::WebSocketUpgrade;
use axum::http::header::{HeaderName, CONTENT_LENGTH, CONTENT_TYPE, HOST};
use axum::http::{HeaderValue, Request, StatusCode};
use axum::http::{HeaderValue, StatusCode};
use axum::middleware::Next;
use axum::response::{IntoResponse, Response};
use axum::routing::{get, get_service, Router};
@@ -392,12 +392,13 @@ fn router(state: Arc<State>, cfg: Arc<RtcServe>) -> Result<Router> {
Ok(builder.build())
}

async fn html_address_middleware<B: std::fmt::Debug>(
async fn html_address_middleware(
extract::State(state): extract::State<Arc<State>>,
request: Request<B>,
next: Next<B>,
request: extract::Request,
next: Next,
) -> Response {
let host = request.headers().get(HOST).cloned();

let response = next.run(request).await;

// if it's not a success, we don't modify it
@@ -419,7 +420,7 @@ async fn html_address_middleware<B: std::fmt::Debug>(
let (parts, body) = response.into_parts();

// turn the body into bytes
match hyper::body::to_bytes(body).await {
match axum::body::to_bytes(body, 100 * 1024 * 1024).await {
Err(err) => {
tracing::debug!("Unable to intercept: {err}");
(parts, Bytes::default()).into_response()
@@ -472,7 +473,7 @@ impl From<anyhow::Error> for ServerError {
impl IntoResponse for ServerError {
fn into_response(self) -> Response {
tracing::error!(error = ?self.0, "error handling request");
let mut res = Response::new(body::boxed(Body::empty()));
let mut res = Response::new(Body::empty());
*res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
res
}