Skip to content

Commit

Permalink
Various enhancements for test and release
Browse files Browse the repository at this point in the history
- Add logs and access logs for informationals
- Add metrics
- Fix panic on malformed and Http/1.0 requests
- Some clippy fixes

Signed-off-by: Eloi DEMOLIS <[email protected]>
  • Loading branch information
Wonshtrum committed Oct 25, 2024
1 parent 6507da3 commit f33993b
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 129 deletions.
12 changes: 9 additions & 3 deletions bin/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ pub enum SubCmd {
#[clap(subcommand)]
cmd: ConfigCmd,
},
#[clap(name = "events", about = "receive sozu events about the status of backends")]
#[clap(
name = "events",
about = "receive sozu events about the status of backends"
)]
Events,
}

Expand Down Expand Up @@ -296,6 +299,11 @@ pub enum ClusterCmd {
help = "Configures the load balancing policy. Possible values are 'roundrobin', 'random' or 'leastconnections'"
)]
load_balancing_policy: LoadBalancingAlgorithms,
#[clap(
long = "http2",
help = "the backends of this cluster use http2 prio-knowledge"
)]
h2: bool,
},
}

Expand Down Expand Up @@ -422,8 +430,6 @@ pub enum HttpFrontendCmd {
method: Option<String>,
#[clap(long = "tags", help = "Specify tag (key-value pair) to apply on front-end (example: 'key=value, other-key=other-value')", value_parser = parse_tags)]
tags: Option<BTreeMap<String, String>>,
#[clap(help = "the frontend uses http2 with prio-knowledge")]
h2: Option<bool>,
},
#[clap(name = "remove")]
Remove {
Expand Down
7 changes: 4 additions & 3 deletions bin/src/ctl/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl CommandManager {
send_proxy,
expect_proxy,
load_balancing_policy,
h2,
} => {
let proxy_protocol = match (send_proxy, expect_proxy) {
(true, true) => Some(ProxyProtocolConfig::RelayHeader),
Expand All @@ -164,7 +165,9 @@ impl CommandManager {
https_redirect,
proxy_protocol: proxy_protocol.map(|pp| pp as i32),
load_balancing: load_balancing_policy as i32,
..Default::default()
http2: h2,
load_metric: None,
answer_503: None,
})
.into(),
)
Expand Down Expand Up @@ -238,7 +241,6 @@ impl CommandManager {
method,
cluster_id: route,
tags,
h2,
} => self.send_request(
RequestType::AddHttpFrontend(RequestHttpFrontend {
cluster_id: route.into(),
Expand Down Expand Up @@ -287,7 +289,6 @@ impl CommandManager {
method,
cluster_id: route,
tags,
h2,
} => self.send_request(
RequestType::AddHttpsFrontend(RequestHttpFrontend {
cluster_id: route.into(),
Expand Down
13 changes: 11 additions & 2 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ impl HttpSession {
Some(session_address),
public_address,
);
context.create_stream(expect.request_id, 1 << 16)?;
if context.create_stream(expect.request_id, 1 << 16).is_none() {
error!("HTTP expect upgrade failed: could not create stream");
return None;
}
let mut mux = Mux {
configured_frontend_timeout: self.configured_frontend_timeout,
frontend_token: self.frontend_token,
Expand All @@ -232,7 +235,13 @@ impl HttpSession {
gauge_add!("protocol.http", 1);
Some(HttpStateMachine::Mux(mux))
}
_ => None,
_ => {
warn!(
"HTTP expect upgrade failed: bad header {:?}",
expect.addresses
);
None
}
}
}

Expand Down
19 changes: 13 additions & 6 deletions lib/src/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,10 @@ impl HttpsSession {
if !expect.container_frontend_timeout.cancel() {
error!("failed to cancel request timeout on expect upgrade phase for 'expect proxy protocol with AF_UNSPEC address'");
}

warn!(
"HTTP expect upgrade failed: bad header {:?}",
expect.addresses
);
None
}

Expand Down Expand Up @@ -299,14 +302,18 @@ impl HttpsSession {
);
let mut frontend = match alpn {
AlpnProtocol::Http11 => {
incr!("http.alpn.http11");
context.create_stream(handshake.request_id, 1 << 16)?;
mux::Connection::new_h1_server(front_stream, handshake.container_frontend_timeout)
}
AlpnProtocol::H2 => mux::Connection::new_h2_server(
front_stream,
self.pool.clone(),
handshake.container_frontend_timeout,
)?,
AlpnProtocol::H2 => {
incr!("http.alpn.h2");
mux::Connection::new_h2_server(
front_stream,
self.pool.clone(),
handshake.container_frontend_timeout,
)?
}
};
frontend.readiness_mut().event = handshake.frontend_readiness.event;

Expand Down
57 changes: 50 additions & 7 deletions lib/src/protocol/mux/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,41 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
self.readiness.interest.remove(Ready::READABLE);
}
if kawa.is_main_phase() {
if let StreamState::Linked(token) = stream.state {
endpoint
.readiness_mut(token)
.interest
.insert(Ready::WRITABLE)
}
if !was_main_phase && self.position.is_server() {
if parts.context.method.is_none()
|| parts.context.authority.is_none()
|| parts.context.path.is_none()
{
if let kawa::StatusLine::Request {
version: kawa::Version::V10,
..
} = kawa.detached.status_line
{
error!(
"Unexpected malformed request: HTTP/1.0 from {:?} with {:?} {:?} {:?}",
parts.context.session_address,
parts.context.method,
parts.context.authority,
parts.context.path
);
} else {
error!("Unexpected malformed request");
kawa::debug_kawa(kawa);
}
set_default_answer(stream, &mut self.readiness, 400);
return MuxResult::Continue;
}
self.requests += 1;
println_!("REQUESTS: {}", self.requests);
gauge_add!("http.active_requests", 1);
stream.state = StreamState::Link
}
if let StreamState::Linked(token) = stream.state {
endpoint
.readiness_mut(token)
.interest
.insert(Ready::WRITABLE)
}
};
MuxResult::Continue
}
Expand Down Expand Up @@ -150,6 +173,11 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
match kawa.detached.status_line {
kawa::StatusLine::Response { code: 101, .. } => {
debug!("============== HANDLE UPGRADE!");
stream.generate_access_log(
false,
Some(String::from("H1::Upgrade")),
context.listener.clone(),
);
return MuxResult::Upgrade;
}
kawa::StatusLine::Response { code: 100, .. } => {
Expand All @@ -158,6 +186,11 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
self.timeout_container.reset();
self.readiness.interest.insert(Ready::READABLE);
kawa.clear();
stream.generate_access_log(
false,
Some(String::from("H1::Continue")),
context.listener.clone(),
);
return MuxResult::Continue;
}
kawa::StatusLine::Response { code: 103, .. } => {
Expand All @@ -169,14 +202,24 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
.interest
.insert(Ready::READABLE);
kawa.clear();
stream.generate_access_log(
false,
Some(String::from("H1::EarlyHint+Error")),
context.listener.clone(),
);
return MuxResult::Continue;
} else {
stream.generate_access_log(
false,
Some(String::from("H1::EarlyHint")),
context.listener.clone(),
);
return MuxResult::CloseSession;
}
}
_ => {}
}
// ACCESS LOG
incr!("http.e2e.http11");
stream.generate_access_log(
false,
Some(String::from("H1")),
Expand Down
Loading

0 comments on commit f33993b

Please sign in to comment.