diff --git a/lib/src/protocol/mux/converter.rs b/lib/src/protocol/mux/converter.rs index 27b081342..feb1a81df 100644 --- a/lib/src/protocol/mux/converter.rs +++ b/lib/src/protocol/mux/converter.rs @@ -150,7 +150,7 @@ impl<'a, T: AsBuffer> BlockConverter for H2BlockConverter<'a> { .unwrap(); kawa.push_out(Store::from_slice(&header)); kawa.push_out(data); - kawa.push_delimiter(); + // kawa.push_delimiter(); return can_continue; } Block::Flags(Flags { @@ -189,7 +189,7 @@ impl<'a, T: AsBuffer> BlockConverter for H2BlockConverter<'a> { kawa.push_out(Store::from_slice(&header)); } if end_header || end_stream { - kawa.push_delimiter() + // kawa.push_delimiter() } } } diff --git a/lib/src/protocol/mux/h1.rs b/lib/src/protocol/mux/h1.rs index c0a93840c..d0f14bea4 100644 --- a/lib/src/protocol/mux/h1.rs +++ b/lib/src/protocol/mux/h1.rs @@ -1,3 +1,5 @@ +use std::time::Instant; + use sozu_command::ready::Ready; use crate::{ @@ -42,6 +44,9 @@ impl ConnectionH1 { println_!("======= MUX H1 READABLE {:?}", self.position); self.timeout_container.reset(); let stream = &mut context.streams[self.stream]; + if stream.metrics.start.is_none() { + stream.metrics.start = Some(Instant::now()); + } let parts = stream.split(&self.position); let kawa = parts.rbuffer; let (size, status) = self.socket.socket_read(kawa.storage.space()); @@ -144,11 +149,11 @@ impl ConnectionH1 { let kawa = &mut stream.back; match kawa.detached.status_line { kawa::StatusLine::Response { code: 101, .. } => { - println!("============== HANDLE UPGRADE!"); + debug!("============== HANDLE UPGRADE!"); return MuxResult::Upgrade; } kawa::StatusLine::Response { code: 100, .. } => { - println!("============== HANDLE CONTINUE!"); + debug!("============== HANDLE CONTINUE!"); // after a 100 continue, we expect the client to continue with its request self.timeout_container.reset(); self.readiness.interest.insert(Ready::READABLE); @@ -156,7 +161,7 @@ impl ConnectionH1 { return MuxResult::Continue; } kawa::StatusLine::Response { code: 103, .. } => { - println!("============== HANDLE EARLY HINT!"); + debug!("============== HANDLE EARLY HINT!"); if let StreamState::Linked(token) = stream.state { // after a 103 early hints, we expect the backend to send its response endpoint @@ -181,9 +186,7 @@ impl ConnectionH1 { let old_state = std::mem::replace(&mut stream.state, StreamState::Unlinked); if stream.context.keep_alive_frontend { self.timeout_container.reset(); - // println!("{old_state:?} {:?}", self.readiness); if let StreamState::Linked(token) = old_state { - // println!("{:?}", endpoint.readiness(token)); endpoint.end_stream(token, self.stream, context); } self.readiness.interest.insert(Ready::READABLE); @@ -285,7 +288,7 @@ impl ConnectionH1 { } (false, false) => { // we do not have an answer, but the request is untouched so we can retry - println!("H1 RECONNECT"); + debug!("H1 RECONNECT"); stream.state = StreamState::Link; } (false, true) => unreachable!(), diff --git a/lib/src/protocol/mux/h2.rs b/lib/src/protocol/mux/h2.rs index 2987af681..4ca6c2947 100644 --- a/lib/src/protocol/mux/h2.rs +++ b/lib/src/protocol/mux/h2.rs @@ -92,7 +92,7 @@ impl Prioriser { weight, } => { if stream_dependency.stream_id == stream_id { - println_!("STREAM CAN'T DEPEND ON ITSELF"); + error!("STREAM CAN'T DEPEND ON ITSELF"); true } else { false @@ -148,6 +148,10 @@ pub enum H2StreamId { } impl ConnectionH2 { + fn expect_header(&mut self) { + self.state = H2State::Header; + self.expect_read = Some((H2StreamId::Zero, 9)); + } pub fn readable(&mut self, context: &mut Context, endpoint: E) -> MuxResult where E: Endpoint, @@ -221,8 +225,7 @@ impl ConnectionH2 { let i = kawa.storage.data(); println_!("DISCARDING: {i:?}"); kawa.storage.clear(); - self.state = H2State::Header; - self.expect_read = Some((H2StreamId::Zero, 9)); + self.expect_header(); } (H2State::ClientPreface, Position::Server) => { let i = kawa.storage.data(); @@ -267,8 +270,8 @@ impl ConnectionH2 { let kawa = &mut self.zero; match serializer::gen_settings(kawa.storage.space(), &self.local_settings) { Ok((_, size)) => kawa.storage.fill(size), - Err(e) => { - println!("could not serialize SettingsFrame: {e:?}"); + Err(error) => { + error!("Could not serialize SettingsFrame: {:?}", error); return self.force_disconnect(); } }; @@ -309,7 +312,8 @@ impl ConnectionH2 { } else if let Some(global_stream_id) = self.streams.get(&stream_id) { let allowed_on_half_closed = header.frame_type == FrameType::WindowUpdate - || header.frame_type == FrameType::Priority; + || header.frame_type == FrameType::Priority + || header.frame_type == FrameType::RstStream; let stream = &context.streams[*global_stream_id]; println_!( "REQUESTING EXISTING STREAM {stream_id}: {}/{:?}", @@ -319,6 +323,10 @@ impl ConnectionH2 { if !allowed_on_half_closed && (stream.received_end_of_stream || !stream.state.is_open()) { + error!( + "CANNOT RECEIVE {:?} ON THIS STREAM {:?}", + header.frame_type, stream.state + ); return self.goaway(H2Error::StreamClosed); } if header.frame_type == FrameType::Data { @@ -332,19 +340,43 @@ impl ConnectionH2 { && stream_id % 2 == 1 && stream_id >= self.last_stream_id { - if context.streams.len() + if context.active_len() >= self.local_settings.settings_max_concurrent_streams as usize { + error!( + "MAX CONCURRENT STREAMS: {} {} {}", + self.local_settings.settings_max_concurrent_streams, + context.active_len(), + context.streams.len() + ); return self.goaway(H2Error::RefusedStream); } match self.create_stream(stream_id, context) { Some(_) => {} - None => return self.goaway(H2Error::InternalError), + None => { + error!("COULD NOT CREATE NEW STREAM"); + return self.goaway(H2Error::InternalError); + } } } else if header.frame_type != FrameType::Priority { - println_!( - "ONLY HEADERS AND PRIORITY CAN BE RECEIVED ON IDLE/CLOSED STREAMS" + error!( + "CANNOT RECEIVE {:?} FRAME ON IDLE/CLOSED STREAMS", + header.frame_type ); + if header.frame_type == FrameType::Data + && header.payload_len == 0 + && header.flags == 1 + { + error!( + "SKIPPED DATA: {} {} {} {}", + stream_id, + self.last_stream_id, + header.flags, + header.payload_len + ); + self.expect_header(); + return MuxResult::Continue; + } return self.goaway(H2Error::ProtocolError); } H2StreamId::Zero @@ -362,6 +394,7 @@ impl ConnectionH2 { } Err(error) => { let error = error_nom_to_h2(error); + error!("COULD NOT PARSE FRAME HEADER"); return self.goaway(error); } }; @@ -390,9 +423,13 @@ impl ConnectionH2 { } Err(error) => { let error = error_nom_to_h2(error); + error!("COULD NOT PARSE CONTINUATION HEADER"); return self.goaway(error); } - _ => return self.goaway(H2Error::ProtocolError), + other => { + error!("UNEXPECTED {:?} WHILE PARSING CONTINUATION HEADER", other); + return self.goaway(H2Error::ProtocolError); + } }; } (H2State::Frame(header), _) => { @@ -402,6 +439,7 @@ impl ConnectionH2 { Ok((_, frame)) => frame, Err(error) => { let error = error_nom_to_h2(error); + error!("COULD NOT PARSE FRAME BODY"); return self.goaway(error); } }; @@ -412,8 +450,7 @@ impl ConnectionH2 { kawa.storage.end = kawa.storage.head; } } - self.state = H2State::Header; - self.expect_read = Some((H2StreamId::Zero, 9)); + self.expect_header(); return self.handle_frame(frame, context, endpoint); } (H2State::ContinuationFrame(headers), _) => { @@ -421,8 +458,7 @@ impl ConnectionH2 { let i = kawa.storage.data(); println_!(" data: {i:?}"); let headers = headers.clone(); - self.state = H2State::Header; - self.expect_read = Some((H2StreamId::Zero, 9)); + self.expect_header(); return self.handle_frame(Frame::Headers(headers), context, endpoint); } } @@ -478,8 +514,8 @@ impl ConnectionH2 { kawa.storage.fill(pri.len()); match serializer::gen_settings(kawa.storage.space(), &self.local_settings) { Ok((_, size)) => kawa.storage.fill(size), - Err(e) => { - println!("could not serialize SettingsFrame: {e:?}"); + Err(error) => { + error!("Could not serialize SettingsFrame: {:?}", error); return self.force_disconnect(); } }; @@ -491,14 +527,13 @@ impl ConnectionH2 { (H2State::ClientSettings, Position::Client(..)) => { println_!("Sent preface and settings"); self.state = H2State::ServerSettings; - self.readiness.interest.remove(Ready::WRITABLE); self.expect_read = Some((H2StreamId::Zero, 9)); + self.readiness.interest.remove(Ready::WRITABLE); MuxResult::Continue } (H2State::ServerSettings, Position::Server) => { - self.state = H2State::Header; + self.expect_header(); self.readiness.interest.remove(Ready::WRITABLE); - self.expect_read = Some((H2StreamId::Zero, 9)); MuxResult::Continue } // Proxying states @@ -649,6 +684,7 @@ impl ConnectionH2 { self.expect_read = None; let kawa = &mut self.zero; kawa.storage.clear(); + error!("//////////////GOAWAY: {:?}", error); match serializer::gen_goaway(kawa.storage.space(), self.last_stream_id, error) { Ok((_, size)) => { @@ -658,8 +694,8 @@ impl ConnectionH2 { self.readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR; MuxResult::Continue } - Err(e) => { - println!("could not serialize GoAwayFrame: {e:?}"); + Err(error) => { + error!("Could not serialize GoAwayFrame: {:?}", error); self.force_disconnect() } } @@ -704,10 +740,8 @@ impl ConnectionH2 { match frame { Frame::Data(data) => { let mut slice = data.payload; - let global_stream_id = match self.streams.get(&data.stream_id) { - Some(global_stream_id) => *global_stream_id, - None => panic!("stream error"), - }; + // can this fail? + let global_stream_id = *self.streams.get(&data.stream_id).unwrap(); let stream = &mut context.streams[global_stream_id]; let parts = stream.split(&self.position); let kawa = parts.rbuffer; @@ -739,8 +773,7 @@ impl ConnectionH2 { } Frame::Headers(headers) => { if !headers.end_headers { - // self.zero.storage.head = self.zero.storage.end; - println!("FRAGMENT: {:?}", self.zero.storage.data()); + debug!("FRAGMENT: {:?}", self.zero.storage.data()); self.state = H2State::ContinuationHeader(headers); return MuxResult::Continue; } @@ -781,6 +814,7 @@ impl ConnectionH2 { kawa.storage.clear(); if let Err((error, global)) = status { if global { + error!("GOT GLOBAL ERROR WHILE PROCESSING HEADERS"); return self.goaway(error); } else { return self.reset_stream(global_stream_id, context, endpoint, error); @@ -805,11 +839,12 @@ impl ConnectionH2 { if self.local_settings.settings_enable_push { todo!("forward the push") } else { + error!("DID NOT ALLOW PUSH"); return self.goaway(H2Error::ProtocolError); } } Position::Server => { - println_!("A client should not push promises"); + error!("INVALID PUSH FROM CLIENT"); return self.goaway(H2Error::ProtocolError); } }, @@ -826,6 +861,7 @@ impl ConnectionH2 { H2Error::ProtocolError, ); } else { + error!("INVALID PRIORITY RECEIVED ON INVALID STREAM"); return self.goaway(H2Error::ProtocolError); } } @@ -876,9 +912,10 @@ impl ConnectionH2 { 6 => { self.peer_settings.settings_max_header_list_size = v }, 8 => { self.peer_settings.settings_enable_connect_protocol = v == 1; is_error |= v > 1 }, 9 => { self.peer_settings.settings_no_rfc7540_priorities = v == 1; is_error |= v > 1 }, - other => println!("unknown setting_id: {other}, we MUST ignore this"), + other => warn!("Unknown setting_id: {}, we MUST ignore this", other), }; if is_error { + error!("INVALID SETTING"); return self.goaway(H2Error::ProtocolError); } } @@ -901,8 +938,8 @@ impl ConnectionH2 { let kawa = &mut self.zero; match serializer::gen_ping_acknolegment(kawa.storage.space(), &ping.payload) { Ok((_, size)) => kawa.storage.fill(size), - Err(e) => { - println!("could not serialize PingFrame: {e:?}"); + Err(error) => { + error!("Could not serialize PingFrame: {:?}", error); return self.force_disconnect(); } }; @@ -930,6 +967,7 @@ impl ConnectionH2 { } self.window = window; } else { + error!("INVALID WINDOW INCREMENT"); return self.goaway(H2Error::FlowControlError); } } else { @@ -968,21 +1006,15 @@ impl ConnectionH2 { return true; } let delta = value as i32 - self.peer_settings.settings_initial_window_size as i32; - println!( - "INITIAL_WINDOW_SIZE: {} -> {} => {}", - self.peer_settings.settings_initial_window_size, value, delta - ); let mut open_window = false; for (i, stream) in context.streams.iter_mut().enumerate() { - println!( - " - stream_{i}: {} -> {}", - stream.window, - stream.window + delta - ); open_window |= stream.window <= 0 && stream.window + delta > 0; stream.window += delta; } - println_!("UPDATE INIT WINDOW: {open_window} {:?}", self.readiness); + println_!( + "UPDATE INIT WINDOW: {delta} {open_window} {:?}", + self.readiness + ); if open_window { self.readiness.interest.insert(Ready::WRITABLE); } @@ -1090,7 +1122,7 @@ impl ConnectionH2 { } (false, false) => { // we do not have an answer, but the request is untouched so we can retry - println!("H2 RECONNECT"); + debug!("H2 RECONNECT"); stream.state = StreamState::Link } } diff --git a/lib/src/protocol/mux/mod.rs b/lib/src/protocol/mux/mod.rs index 90f3f9d44..6dce6aad1 100644 --- a/lib/src/protocol/mux/mod.rs +++ b/lib/src/protocol/mux/mod.rs @@ -52,9 +52,9 @@ pub use crate::protocol::mux::{ #[macro_export] macro_rules! println_ { ($($t:expr),*) => { - print!("{}:{} ", file!(), line!()); - println!($($t),*) - // $(let _ = &$t;)* + // print!("{}:{} ", file!(), line!()); + // println!($($t),*) + $(let _ = &$t;)* }; } fn debug_kawa(_kawa: &GenericHttpStream) { @@ -772,6 +772,13 @@ impl Context { } } + pub fn active_len(&self) -> usize { + self.streams + .iter() + .filter(|s| !matches!(s.state, StreamState::Recycle)) + .count() + } + pub fn create_stream(&mut self, request_id: Ulid, window: u32) -> Option { let listener = self.listener.borrow(); let http_context = HttpContext::new( @@ -786,12 +793,15 @@ impl Context { println_!("Reuse stream: {stream_id}"); stream.state = StreamState::Idle; stream.attempts = 0; + stream.received_end_of_stream = false; stream.window = window as i32; stream.context = http_context; stream.back.clear(); stream.back.storage.clear(); stream.front.clear(); stream.front.storage.clear(); + stream.metrics.reset(); + stream.metrics.start = Some(Instant::now()); return Some(stream_id); } } @@ -921,8 +931,8 @@ impl Router { let token = if let Some(token) = reuse_token { println_!("reused backend: {:#?}", self.backends.get(&token).unwrap()); - stream.metrics.backend_start(); - stream.metrics.backend_connected(); + // stream.metrics.backend_start(); + // stream.metrics.backend_connected(); token } else { let (mut socket, backend) = self.backend_from_request( @@ -997,7 +1007,7 @@ impl Router { Err(cluster_error) => { // we are past kawa parsing if it succeeded this can't fail // if the request was malformed it was caught by kawa and we sent a 400 - panic!("{cluster_error}"); + unreachable!("{cluster_error}"); } }; @@ -1006,7 +1016,7 @@ impl Router { let route = match route_result { Ok(route) => route, Err(frontend_error) => { - println!("{}", frontend_error); + println_!("{}", frontend_error); // self.set_answer(DefaultAnswerStatus::Answer404, None); return Err(RetrieveClusterError::RetrieveFrontend(frontend_error)); } @@ -1015,7 +1025,7 @@ impl Router { let cluster_id = match route { Route::Cluster(id) => id, Route::Deny => { - println!("Route::Deny"); + println_!("Route::Deny"); // self.set_answer(DefaultAnswerStatus::Answer401, None); return Err(RetrieveClusterError::UnauthorizedRoute); } @@ -1040,7 +1050,7 @@ impl Router { proxy, ) .map_err(|backend_error| { - println!("{backend_error}"); + println_!("{backend_error}"); // self.set_answer(DefaultAnswerStatus::Answer503, None); BackendConnectionError::Backend(backend_error) })?; @@ -1137,7 +1147,6 @@ impl {readiness:?}"); let dead = readiness.filter_interest().is_hup() || readiness.filter_interest().is_error(); if dead { @@ -1294,8 +1303,6 @@ impl(&mut self, proxy: Rc>, _metrics: &mut SessionMetrics) { + println!("MUX CLOSE"); println_!("FRONTEND: {:#?}", self.frontend); println_!("BACKENDS: {:#?}", self.router.backends); @@ -1546,8 +1556,6 @@ impl unreachable!(), } } + return; let s = match &mut self.frontend { Connection::H1(c) => &mut c.socket, Connection::H2(c) => &mut c.socket, diff --git a/lib/src/protocol/mux/parser.rs b/lib/src/protocol/mux/parser.rs index 95ca0a5db..d138be2e5 100644 --- a/lib/src/protocol/mux/parser.rs +++ b/lib/src/protocol/mux/parser.rs @@ -207,7 +207,7 @@ pub fn frame_header(input: &[u8], max_frame_size: u32) -> IResult<&[u8], FrameHe FrameType::WindowUpdate => true, }; if !valid_stream_id { - println!("invalid stream_id: {stream_id}"); + error!("invalid stream_id: {}", stream_id); return Err(Err::Failure(ParserError::new_h2(i, H2Error::ProtocolError))); } @@ -223,7 +223,7 @@ pub fn frame_header(input: &[u8], max_frame_size: u32) -> IResult<&[u8], FrameHe } fn convert_frame_type(t: u8) -> Option { - info!("got frame type: {}", t); + debug!("got frame type: {}", t); match t { 0 => Some(FrameType::Data), 1 => Some(FrameType::Headers), @@ -353,7 +353,6 @@ pub fn data_frame<'a>( header: &FrameHeader, ) -> IResult<&'a [u8], Frame, ParserError<'a>> { let (remaining, i) = take(header.payload_len)(input)?; - println!("{i:?}"); let (i, pad_length) = if header.flags & 0x8 != 0 { let (i, pad_length) = be_u8(i)?; diff --git a/lib/src/protocol/mux/pkawa.rs b/lib/src/protocol/mux/pkawa.rs index f94cf21e8..6cbf9dc3b 100644 --- a/lib/src/protocol/mux/pkawa.rs +++ b/lib/src/protocol/mux/pkawa.rs @@ -33,6 +33,21 @@ where return handle_trailer(kawa, input, end_stream, decoder); } kawa.push_block(Block::StatusLine); + // kawa.detached.status_line = match kawa.kind { + // Kind::Request => StatusLine::Request { + // version: Version::V20, + // method: Store::Static(b"GET"), + // uri: Store::Static(b"/"), + // authority: Store::Static(b"lolcatho.st:8443"), + // path: Store::Static(b"/"), + // }, + // Kind::Response => StatusLine::Response { + // version: Version::V20, + // code: 200, + // status: Store::Static(b"200"), + // reason: Store::Static(b"FromH2"), + // }, + // }; kawa.detached.status_line = match kawa.kind { Kind::Request => { let mut method = Store::Empty; @@ -87,12 +102,15 @@ where invalid_headers = true; } } else if compare_no_case(&k, b"priority") { - todo!("decode priority"); + // todo!("decode priority"); + warn!("DECODE PRIORITY: {}", unsafe { + std::str::from_utf8_unchecked(v.as_ref()) + }); prioriser.push_priority( stream_id, PriorityPart::Rfc9218 { - urgency: todo!(), - incremental: todo!(), + urgency: 0, + incremental: false, }, ); } @@ -105,7 +123,7 @@ where } }); if let Err(error) = decode_status { - println!("INVALID FRAGMENT: {error:?}"); + error!("INVALID FRAGMENT: {:?}", error); return Err((H2Error::CompressionError, true)); } if invalid_headers @@ -114,7 +132,7 @@ where || path.len() == 0 || scheme.len() == 0 { - println!("INVALID HEADERS"); + error!("INVALID HEADERS"); return Err((H2Error::ProtocolError, false)); } // uri is only used by H1 statusline, in most cases it only consists of the path @@ -177,11 +195,11 @@ where } }); if let Err(error) = decode_status { - println!("INVALID FRAGMENT: {error:?}"); + error!("INVALID FRAGMENT: {:?}", error); return Err((H2Error::CompressionError, true)); } if invalid_headers || status.len() == 0 { - println!("INVALID HEADERS"); + error!("INVALID HEADERS"); return Err((H2Error::ProtocolError, false)); } StatusLine::Response { @@ -195,7 +213,7 @@ where // everything has been parsed kawa.storage.head = kawa.storage.end; - println!( + debug!( "index: {}/{}/{}", kawa.storage.start, kawa.storage.head, kawa.storage.end ); diff --git a/lib/src/socket.rs b/lib/src/socket.rs index f19cbf9fb..44d3f614d 100644 --- a/lib/src/socket.rs +++ b/lib/src/socket.rs @@ -193,6 +193,31 @@ impl SocketHandler for FrontRustls { incr!("rustls.read.infinite_loop.error"); } + while !self.session.wants_read() { + match self.session.reader().read(&mut buf[size..]) { + Ok(0) => break, + Ok(sz) => { + size += sz; + } + Err(e) => match e.kind() { + ErrorKind::WouldBlock => { + break; + } + ErrorKind::ConnectionReset + | ErrorKind::ConnectionAborted + | ErrorKind::BrokenPipe => { + is_closed = true; + break; + } + _ => { + error!("could not read data from TLS stream: {:?}", e); + is_error = true; + break; + } + }, + } + } + if size == buf.len() { break; } @@ -233,31 +258,6 @@ impl SocketHandler for FrontRustls { is_error = true; break; } - - while !self.session.wants_read() { - match self.session.reader().read(&mut buf[size..]) { - Ok(0) => break, - Ok(sz) => { - size += sz; - } - Err(e) => match e.kind() { - ErrorKind::WouldBlock => { - break; - } - ErrorKind::ConnectionReset - | ErrorKind::ConnectionAborted - | ErrorKind::BrokenPipe => { - is_closed = true; - break; - } - _ => { - error!("could not read data from TLS stream: {:?}", e); - is_error = true; - break; - } - }, - } - } } if is_error {