diff --git a/bin/src/cli.rs b/bin/src/cli.rs index ea4e8a96e..74355d9bc 100644 --- a/bin/src/cli.rs +++ b/bin/src/cli.rs @@ -180,7 +180,10 @@ pub enum SubCmd { #[clap(subcommand)] cmd: ConfigCmd, }, - #[clap(name = "events", about = "receive sozu events about the status of backends")] + #[clap( + name = "events", + about = "receive sozu events about the status of backends" + )] Events, } @@ -296,6 +299,11 @@ pub enum ClusterCmd { help = "Configures the load balancing policy. Possible values are 'roundrobin', 'random' or 'leastconnections'" )] load_balancing_policy: LoadBalancingAlgorithms, + #[clap( + long = "http2", + help = "the backends of this cluster use http2 prio-knowledge" + )] + h2: bool, }, } @@ -422,8 +430,6 @@ pub enum HttpFrontendCmd { method: Option, #[clap(long = "tags", help = "Specify tag (key-value pair) to apply on front-end (example: 'key=value, other-key=other-value')", value_parser = parse_tags)] tags: Option>, - #[clap(help = "the frontend uses http2 with prio-knowledge")] - h2: Option, }, #[clap(name = "remove")] Remove { diff --git a/bin/src/ctl/request_builder.rs b/bin/src/ctl/request_builder.rs index 0f477cb49..2d8acea14 100644 --- a/bin/src/ctl/request_builder.rs +++ b/bin/src/ctl/request_builder.rs @@ -150,6 +150,7 @@ impl CommandManager { send_proxy, expect_proxy, load_balancing_policy, + h2, } => { let proxy_protocol = match (send_proxy, expect_proxy) { (true, true) => Some(ProxyProtocolConfig::RelayHeader), @@ -164,7 +165,9 @@ impl CommandManager { https_redirect, proxy_protocol: proxy_protocol.map(|pp| pp as i32), load_balancing: load_balancing_policy as i32, - ..Default::default() + http2: h2, + load_metric: None, + answer_503: None, }) .into(), ) @@ -238,7 +241,6 @@ impl CommandManager { method, cluster_id: route, tags, - h2, } => self.send_request( RequestType::AddHttpFrontend(RequestHttpFrontend { cluster_id: route.into(), @@ -287,7 +289,6 @@ impl CommandManager { method, cluster_id: route, tags, - h2, } => self.send_request( RequestType::AddHttpsFrontend(RequestHttpFrontend { cluster_id: route.into(), diff --git a/lib/src/http.rs b/lib/src/http.rs index 63fea8fb4..c4e7c4903 100644 --- a/lib/src/http.rs +++ b/lib/src/http.rs @@ -218,7 +218,10 @@ impl HttpSession { Some(session_address), public_address, ); - context.create_stream(expect.request_id, 1 << 16)?; + if context.create_stream(expect.request_id, 1 << 16).is_none() { + error!("HTTP expect upgrade failed: could not create stream"); + return None; + } let mut mux = Mux { configured_frontend_timeout: self.configured_frontend_timeout, frontend_token: self.frontend_token, @@ -232,7 +235,13 @@ impl HttpSession { gauge_add!("protocol.http", 1); Some(HttpStateMachine::Mux(mux)) } - _ => None, + _ => { + warn!( + "HTTP expect upgrade failed: bad header {:?}", + expect.addresses + ); + None + } } } diff --git a/lib/src/https.rs b/lib/src/https.rs index f8a134b8b..49a8980da 100644 --- a/lib/src/https.rs +++ b/lib/src/https.rs @@ -248,7 +248,10 @@ impl HttpsSession { if !expect.container_frontend_timeout.cancel() { error!("failed to cancel request timeout on expect upgrade phase for 'expect proxy protocol with AF_UNSPEC address'"); } - + warn!( + "HTTP expect upgrade failed: bad header {:?}", + expect.addresses + ); None } @@ -299,14 +302,18 @@ impl HttpsSession { ); let mut frontend = match alpn { AlpnProtocol::Http11 => { + incr!("http.alpn.http11"); context.create_stream(handshake.request_id, 1 << 16)?; mux::Connection::new_h1_server(front_stream, handshake.container_frontend_timeout) } - AlpnProtocol::H2 => mux::Connection::new_h2_server( - front_stream, - self.pool.clone(), - handshake.container_frontend_timeout, - )?, + AlpnProtocol::H2 => { + incr!("http.alpn.h2"); + mux::Connection::new_h2_server( + front_stream, + self.pool.clone(), + handshake.container_frontend_timeout, + )? + } }; frontend.readiness_mut().event = handshake.frontend_readiness.event; diff --git a/lib/src/protocol/mux/h1.rs b/lib/src/protocol/mux/h1.rs index d0f14bea4..ec2b54502 100644 --- a/lib/src/protocol/mux/h1.rs +++ b/lib/src/protocol/mux/h1.rs @@ -90,18 +90,41 @@ impl ConnectionH1 { self.readiness.interest.remove(Ready::READABLE); } if kawa.is_main_phase() { - if let StreamState::Linked(token) = stream.state { - endpoint - .readiness_mut(token) - .interest - .insert(Ready::WRITABLE) - } if !was_main_phase && self.position.is_server() { + if parts.context.method.is_none() + || parts.context.authority.is_none() + || parts.context.path.is_none() + { + if let kawa::StatusLine::Request { + version: kawa::Version::V10, + .. + } = kawa.detached.status_line + { + error!( + "Unexpected malformed request: HTTP/1.0 from {:?} with {:?} {:?} {:?}", + parts.context.session_address, + parts.context.method, + parts.context.authority, + parts.context.path + ); + } else { + error!("Unexpected malformed request"); + kawa::debug_kawa(kawa); + } + set_default_answer(stream, &mut self.readiness, 400); + return MuxResult::Continue; + } self.requests += 1; println_!("REQUESTS: {}", self.requests); gauge_add!("http.active_requests", 1); stream.state = StreamState::Link } + if let StreamState::Linked(token) = stream.state { + endpoint + .readiness_mut(token) + .interest + .insert(Ready::WRITABLE) + } }; MuxResult::Continue } @@ -150,6 +173,11 @@ impl ConnectionH1 { match kawa.detached.status_line { kawa::StatusLine::Response { code: 101, .. } => { debug!("============== HANDLE UPGRADE!"); + stream.generate_access_log( + false, + Some(String::from("H1::Upgrade")), + context.listener.clone(), + ); return MuxResult::Upgrade; } kawa::StatusLine::Response { code: 100, .. } => { @@ -158,6 +186,11 @@ impl ConnectionH1 { self.timeout_container.reset(); self.readiness.interest.insert(Ready::READABLE); kawa.clear(); + stream.generate_access_log( + false, + Some(String::from("H1::Continue")), + context.listener.clone(), + ); return MuxResult::Continue; } kawa::StatusLine::Response { code: 103, .. } => { @@ -169,14 +202,24 @@ impl ConnectionH1 { .interest .insert(Ready::READABLE); kawa.clear(); + stream.generate_access_log( + false, + Some(String::from("H1::EarlyHint+Error")), + context.listener.clone(), + ); return MuxResult::Continue; } else { + stream.generate_access_log( + false, + Some(String::from("H1::EarlyHint")), + context.listener.clone(), + ); return MuxResult::CloseSession; } } _ => {} } - // ACCESS LOG + incr!("http.e2e.http11"); stream.generate_access_log( false, Some(String::from("H1")), diff --git a/lib/src/protocol/mux/h2.rs b/lib/src/protocol/mux/h2.rs index 4ca6c2947..90f6dd19c 100644 --- a/lib/src/protocol/mux/h2.rs +++ b/lib/src/protocol/mux/h2.rs @@ -26,12 +26,12 @@ fn error_nom_to_h2(error: nom::Err) -> H2Error { nom::Err::Error(parser::ParserError { kind: parser::ParserErrorKind::H2(e), .. - }) => return e, + }) => e, nom::Err::Failure(parser::ParserError { kind: parser::ParserErrorKind::H2(e), .. - }) => return e, - _ => return H2Error::ProtocolError, + }) => e, + _ => H2Error::ProtocolError, } } @@ -78,12 +78,10 @@ impl Default for H2Settings { } } +#[derive(Default)] pub struct Prioriser {} impl Prioriser { - pub fn new() -> Self { - Self {} - } pub fn push_priority(&mut self, stream_id: StreamId, priority: parser::PriorityPart) -> bool { println_!("PRIORITY REQUEST FOR {stream_id}: {priority:?}"); match priority { @@ -162,7 +160,7 @@ impl ConnectionH2 { let (stream_id, kawa) = if let Some((stream_id, amount)) = self.expect_read { let kawa = match stream_id { H2StreamId::Zero => &mut self.zero, - H2StreamId::Other(stream_id, global_stream_id) => { + H2StreamId::Other(_, global_stream_id) => { context.streams[global_stream_id] .split(&self.position) .rbuffer @@ -186,23 +184,21 @@ impl ConnectionH2 { } if update_readiness_after_read(size, status, &mut self.readiness) { return MuxResult::Continue; + } else if size == amount { + self.expect_read = None; } else { - if size == amount { - self.expect_read = None; - } else { - self.expect_read = Some((stream_id, amount - size)); - match (&self.state, &self.position) { - (H2State::ClientPreface, Position::Server) => { - let i = kawa.storage.data(); - if !b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".starts_with(i) { - println_!("EARLY INVALID PREFACE: {i:?}"); - return self.force_disconnect(); - } + self.expect_read = Some((stream_id, amount - size)); + match (&self.state, &self.position) { + (H2State::ClientPreface, Position::Server) => { + let i = kawa.storage.data(); + if !b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".starts_with(i) { + println_!("EARLY INVALID PREFACE: {i:?}"); + return self.force_disconnect(); } - _ => {} } - return MuxResult::Continue; + _ => {} } + return MuxResult::Continue; } } else { self.expect_read = None; @@ -359,24 +355,24 @@ impl ConnectionH2 { } } } else if header.frame_type != FrameType::Priority { - error!( - "CANNOT RECEIVE {:?} FRAME ON IDLE/CLOSED STREAMS", - header.frame_type - ); if header.frame_type == FrameType::Data && header.payload_len == 0 && header.flags == 1 { - error!( - "SKIPPED DATA: {} {} {} {}", - stream_id, - self.last_stream_id, - header.flags, - header.payload_len - ); + // error!( + // "SKIPPED DATA: {} {} {} {}", + // stream_id, + // self.last_stream_id, + // header.flags, + // header.payload_len + // ); self.expect_header(); return MuxResult::Continue; } + error!( + "CANNOT RECEIVE {:?} FRAME ON IDLE/CLOSED STREAMS", + header.frame_type + ); return self.goaway(H2Error::ProtocolError); } H2StreamId::Zero @@ -456,7 +452,7 @@ impl ConnectionH2 { (H2State::ContinuationFrame(headers), _) => { kawa.storage.head = kawa.storage.end; let i = kawa.storage.data(); - println_!(" data: {i:?}"); + println_!(" data: {:?}", i); let headers = headers.clone(); self.expect_header(); return self.handle_frame(Frame::Headers(headers), context, endpoint); @@ -581,7 +577,7 @@ impl ConnectionH2 { Position::Server => { // mark stream as reusable println_!("Recycle stream: {global_stream_id}"); - // ACCESS LOG + incr!("http.e2e.h2"); stream.generate_access_log( false, Some(String::from("H2::SplitFrame")), @@ -648,9 +644,12 @@ impl ConnectionH2 { match self.position { Position::Client(..) => {} Position::Server => { + // Handle 1xx, this code should probably be merged with the h2 SplitFrame case and h1 nominal case + // to avoid code duplication + // mark stream as reusable println_!("Recycle1 stream: {global_stream_id}"); - // ACCESS LOG + incr!("http.e2e.h2"); stream.generate_access_log( false, Some(String::from("H2::WholeFrame")), @@ -834,7 +833,7 @@ impl ConnectionH2 { stream.state = StreamState::Link; } } - Frame::PushPromise(push_promise) => match self.position { + Frame::PushPromise(_push_promise) => match self.position { Position::Client(..) => { if self.local_settings.settings_enable_push { todo!("forward the push") @@ -884,7 +883,6 @@ impl ConnectionH2 { // This is a special case, normally, all stream are terminated by the server // when the last byte of the response is written. Here, the reset is requested // on the server endpoint and immediately terminates, shortcutting the other path - // ACCESS LOG stream.generate_access_log( true, Some(String::from("H2::ResetFrame")), @@ -903,7 +901,7 @@ impl ConnectionH2 { let v = setting.value; let mut is_error = false; #[rustfmt::skip] - let _ = match setting.identifier { + match setting.identifier { 1 => { self.peer_settings.settings_header_table_size = v }, 2 => { self.peer_settings.settings_enable_push = v == 1; is_error |= v > 1 }, 3 => { self.peer_settings.settings_max_concurrent_streams = v }, @@ -970,27 +968,23 @@ impl ConnectionH2 { error!("INVALID WINDOW INCREMENT"); return self.goaway(H2Error::FlowControlError); } - } else { - if let Some(global_stream_id) = self.streams.get(&stream_id) { - let stream = &mut context.streams[*global_stream_id]; - if let Some(window) = stream.window.checked_add(increment) { - if stream.window <= 0 && window > 0 { - self.readiness.interest.insert(Ready::WRITABLE); - } - stream.window = window; - } else { - return self.reset_stream( - *global_stream_id, - context, - endpoint, - H2Error::FlowControlError, - ); + } else if let Some(global_stream_id) = self.streams.get(&stream_id) { + let stream = &mut context.streams[*global_stream_id]; + if let Some(window) = stream.window.checked_add(increment) { + if stream.window <= 0 && window > 0 { + self.readiness.interest.insert(Ready::WRITABLE); } + stream.window = window; } else { - println_!( - "Ignoring window update on closed stream {stream_id}: {increment}" + return self.reset_stream( + *global_stream_id, + context, + endpoint, + H2Error::FlowControlError, ); } + } else { + println_!("Ignoring window update on closed stream {stream_id}: {increment}"); }; } Frame::Continuation(_) => unreachable!(), @@ -1007,7 +1001,7 @@ impl ConnectionH2 { } let delta = value as i32 - self.peer_settings.settings_initial_window_size as i32; let mut open_window = false; - for (i, stream) in context.streams.iter_mut().enumerate() { + for stream in context.streams.iter_mut() { open_window |= stream.window <= 0 && stream.window + delta > 0; stream.window += delta; } @@ -1080,7 +1074,7 @@ impl ConnectionH2 { L: ListenerHandler + L7ListenerHandler, { let stream_context = &mut context.streams[stream].context; - println_!("end H2 stream {stream}: {stream_context:#?}"); + println_!("end H2 stream {}: {:#?}", stream, stream_context); match self.position { Position::Client(..) => { for (stream_id, global_stream_id) in &self.streams { diff --git a/lib/src/protocol/mux/mod.rs b/lib/src/protocol/mux/mod.rs index 6dce6aad1..560ccfb57 100644 --- a/lib/src/protocol/mux/mod.rs +++ b/lib/src/protocol/mux/mod.rs @@ -120,17 +120,39 @@ pub fn terminate_default_answer(kawa: &mut kawa::Kawa, clo /// Replace the content of the kawa message with a default Sozu answer for a given status code fn set_default_answer(stream: &mut Stream, readiness: &mut Readiness, code: u16) { + let context = &mut stream.context; let kawa = &mut stream.back; kawa.clear(); kawa.storage.clear(); + let key = match code { + 301 => "http.auto.301", + 400 => "http.auto.400", + 401 => "http.auto.401", + 404 => "http.auto.404", + 408 => "http.auto.408", + 413 => "http.auto.413", + 502 => "http.auto.502", + 503 => "http.auto.503", + 504 => "http.auto.504", + 507 => "http.auto.507", + _ => "http.auto.unknown", + }; + if context.cluster_id.is_some() { + incr!(key); + } + incr!( + key, + context.cluster_id.as_deref(), + context.backend_id.as_deref() + ); if code == 301 { - let host = stream.context.authority.as_deref().unwrap(); - let uri = stream.context.path.as_deref().unwrap(); + let host = context.authority.as_deref().unwrap(); + let uri = context.path.as_deref().unwrap(); fill_default_301_answer(kawa, host, uri); } else { fill_default_answer(kawa, code); } - stream.context.status = Some(code); + context.status = Some(code); stream.state = StreamState::Unlinked; readiness.interest.insert(Ready::WRITABLE); } @@ -172,6 +194,7 @@ impl Debug for Position { } } +#[allow(dead_code)] impl Position { fn is_server(&self) -> bool { match self { @@ -289,7 +312,7 @@ impl Connection { local_settings: H2Settings::default(), peer_settings: H2Settings::default(), position: Position::Server, - prioriser: Prioriser::new(), + prioriser: Prioriser::default(), readiness: Readiness { interest: Ready::READABLE | Ready::HUP | Ready::ERROR, event: Ready::EMPTY, @@ -325,7 +348,7 @@ impl Connection { backend, BackendStatus::Connecting(Instant::now()), ), - prioriser: Prioriser::new(), + prioriser: Prioriser::default(), readiness: Readiness { interest: Ready::WRITABLE | Ready::HUP | Ready::ERROR, event: Ready::EMPTY, @@ -439,13 +462,10 @@ impl Connection { where L: ListenerHandler + L7ListenerHandler, { - match self.position() { - Position::Client(_, backend, BackendStatus::Connected) => { - let mut backend_borrow = backend.borrow_mut(); - backend_borrow.active_requests = backend_borrow.active_requests.saturating_sub(1); - println_!("--------------- CONNECTION END STREAM: {backend_borrow:#?}"); - } - _ => {} + if let Position::Client(_, backend, BackendStatus::Connected) = self.position() { + let mut backend_borrow = backend.borrow_mut(); + backend_borrow.active_requests = backend_borrow.active_requests.saturating_sub(1); + println_!("--------------- CONNECTION END STREAM: {backend_borrow:#?}"); } match self { Connection::H1(c) => c.end_stream(stream, context), @@ -457,13 +477,10 @@ impl Connection { where L: ListenerHandler + L7ListenerHandler, { - match self.position() { - Position::Client(_, backend, BackendStatus::Connected) => { - let mut backend_borrow = backend.borrow_mut(); - backend_borrow.active_requests += 1; - println_!("--------------- CONNECTION START STREAM: {backend_borrow:#?}"); - } - _ => {} + if let Position::Client(_, backend, BackendStatus::Connected) = self.position() { + let mut backend_borrow = backend.borrow_mut(); + backend_borrow.active_requests += 1; + println_!("--------------- CONNECTION START STREAM: {backend_borrow:#?}"); } match self { Connection::H1(c) => c.start_stream(stream, context), @@ -485,7 +502,7 @@ impl<'a, Front: SocketHandler> Endpoint for EndpointServer<'a, Front> { self.0.readiness_mut() } - fn end_stream(&mut self, token: Token, stream: GlobalStreamId, context: &mut Context) + fn end_stream(&mut self, _token: Token, stream: GlobalStreamId, context: &mut Context) where L: ListenerHandler + L7ListenerHandler, { @@ -702,23 +719,46 @@ impl Stream { ) where L: ListenerHandler + L7ListenerHandler, { + let context = &self.context; gauge_add!("http.active_requests", -1); - let protocol = match self.context.protocol { + let protocol = match context.protocol { Protocol::HTTP => "http", Protocol::HTTPS => "https", _ => unreachable!(), }; + // Save the HTTP status code of the backend response + let key = if let Some(status) = context.status { + match status { + 100..=199 => "http.status.1xx", + 200..=299 => "http.status.2xx", + 300..=399 => "http.status.3xx", + 400..=499 => "http.status.4xx", + 500..=599 => "http.status.5xx", + _ => "http.status.other", + } + } else { + "http.status.none" + }; + if context.cluster_id.is_some() { + incr!(key); + } + incr!( + key, + context.cluster_id.as_deref(), + context.backend_id.as_deref() + ); + let endpoint = EndpointRecord::Http { - method: self.context.method.as_deref(), - authority: self.context.authority.as_deref(), - path: self.context.path.as_deref(), - reason: self.context.reason.as_deref(), - status: self.context.status, + method: context.method.as_deref(), + authority: context.authority.as_deref(), + path: context.path.as_deref(), + reason: context.reason.as_deref(), + status: context.status, }; let listener = listener.borrow(); - let tags = self.context.authority.as_deref().and_then(|host| { + let tags = context.authority.as_deref().and_then(|host| { let hostname = match host.split_once(':') { None => host, Some((hostname, _)) => hostname, @@ -730,9 +770,9 @@ impl Stream { error, on_failure: { incr!("unsent-access-logs") }, message: message.as_deref(), - context: self.context.log_context(), - session_address: self.context.session_address, - backend_address: self.context.backend_address, + context: context.log_context(), + session_address: context.session_address, + backend_address: context.backend_address, protocol, endpoint, tags, @@ -743,7 +783,7 @@ impl Stream { request_time: self.metrics.request_time(), bytes_in: self.metrics.bin, bytes_out: self.metrics.bout, - user_agent: self.context.user_agent.as_deref(), + user_agent: context.user_agent.as_deref(), }; } } @@ -1007,6 +1047,10 @@ impl Router { Err(cluster_error) => { // we are past kawa parsing if it succeeded this can't fail // if the request was malformed it was caught by kawa and we sent a 400 + error!( + "Malformed request in connect (should be caught at parsing) {:?}", + context + ); unreachable!("{cluster_error}"); } }; @@ -1117,7 +1161,7 @@ impl>, proxy: Rc>, - metrics: &mut SessionMetrics, + _metrics: &mut SessionMetrics, ) -> SessionResult { let mut counter = 0; let max_loop_iterations = 100000; @@ -1127,7 +1171,7 @@ impl(&mut self, proxy: Rc>, _metrics: &mut SessionMetrics) { - println!("MUX CLOSE"); + debug!("MUX CLOSE"); println_!("FRONTEND: {:#?}", self.frontend); println_!("BACKENDS: {:#?}", self.router.backends); @@ -1583,7 +1627,7 @@ impl unreachable!(), } } - return; + /* let s = match &mut self.frontend { Connection::H1(c) => &mut c.socket, Connection::H2(c) => &mut c.socket, @@ -1594,16 +1638,17 @@ impl SessionIsToBeClosed { diff --git a/lib/src/protocol/mux/parser.rs b/lib/src/protocol/mux/parser.rs index d138be2e5..10a2256fa 100644 --- a/lib/src/protocol/mux/parser.rs +++ b/lib/src/protocol/mux/parser.rs @@ -139,7 +139,7 @@ impl<'a> ParseError<&'a [u8]> for ParserError<'a> { } } - fn append(input: &'a [u8], kind: ErrorKind, other: Self) -> Self { + fn append(input: &'a [u8], kind: ErrorKind, _other: Self) -> Self { ParserError { input, kind: ParserErrorKind::Nom(kind), diff --git a/lib/src/protocol/mux/serializer.rs b/lib/src/protocol/mux/serializer.rs index 18c53720d..28e0643a0 100644 --- a/lib/src/protocol/mux/serializer.rs +++ b/lib/src/protocol/mux/serializer.rs @@ -15,9 +15,9 @@ pub const H2_PRI: &str = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; pub const SETTINGS_ACKNOWLEDGEMENT: [u8; 9] = [0, 0, 0, 4, 1, 0, 0, 0, 0]; pub const PING_ACKNOWLEDGEMENT_HEADER: [u8; 9] = [0, 0, 8, 6, 1, 0, 0, 0, 0]; -pub fn gen_frame_header<'a, 'b>( +pub fn gen_frame_header<'a>( buf: &'a mut [u8], - frame: &'b FrameHeader, + frame: &FrameHeader, ) -> Result<(&'a mut [u8], usize), GenError> { let serializer = tuple(( be_u24(frame.payload_len), @@ -100,11 +100,11 @@ pub fn gen_settings<'a>( }) } -pub fn gen_rst_stream<'a>( - buf: &'a mut [u8], +pub fn gen_rst_stream( + buf: &mut [u8], stream_id: u32, error_code: H2Error, -) -> Result<(&'a mut [u8], usize), GenError> { +) -> Result<(&mut [u8], usize), GenError> { gen_frame_header( buf, &FrameHeader { @@ -119,11 +119,11 @@ pub fn gen_rst_stream<'a>( }) } -pub fn gen_goaway<'a>( - buf: &'a mut [u8], +pub fn gen_goaway( + buf: &mut [u8], last_stream_id: u32, error_code: H2Error, -) -> Result<(&'a mut [u8], usize), GenError> { +) -> Result<(&mut [u8], usize), GenError> { gen_frame_header( buf, &FrameHeader {