Skip to content

Commit

Permalink
Make gRPC work through Mux!
Browse files Browse the repository at this point in the history
- Fix readiness (set opposite endpoint as writable, after a read)
- Fix timeouts (with force_disconnect)
- Fix flags and phase handling for h2 headers
- Add support for h2 trailers in pkawa

Signed-off-by: Eloi DEMOLIS <[email protected]>
  • Loading branch information
Wonshtrum committed Sep 29, 2023
1 parent 2366951 commit b135cac
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 71 deletions.
2 changes: 1 addition & 1 deletion e2e/src/http_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ pub fn http_request<S1: Into<String>, S2: Into<String>, S3: Into<String>, S4: In
)
}

use std::io::Write;
use kawa;
use std::io::Write;

/// the default kawa answer for the error code provided, converted to HTTP/1.1
pub fn default_answer(code: u16) -> String {
Expand Down
22 changes: 8 additions & 14 deletions lib/src/protocol/mux/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,21 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
self.readiness.interest.remove(Ready::READABLE);
}
if kawa.is_main_phase() {
if let StreamState::Linked(token) = stream.state {
endpoint
.readiness_mut(token)
.interest
.insert(Ready::WRITABLE)
}
match self.position {
Position::Client(_) => {
let StreamState::Linked(token) = stream.state else { unreachable!() };
endpoint
.readiness_mut(token)
.interest
.insert(Ready::WRITABLE)
}
Position::Server => {
if let StreamState::Linked(token) = stream.state {
endpoint
.readiness_mut(token)
.interest
.insert(Ready::WRITABLE)
}
if was_initial {
self.requests += 1;
println_!("REQUESTS: {}", self.requests);
stream.state = StreamState::Link
}
}
Position::Client(_) => {}
}
};
MuxResult::Continue
Expand Down Expand Up @@ -177,7 +171,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
MuxResult::Continue
}

fn force_disconnect(&mut self) -> MuxResult {
pub fn force_disconnect(&mut self) -> MuxResult {
match self.position {
Position::Client(_) => {
self.position = Position::Client(BackendStatus::Disconnecting);
Expand Down
46 changes: 21 additions & 25 deletions lib/src/protocol/mux/h2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,6 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
let kawa = stream.rbuffer(&self.position);
slice.start += kawa.storage.head as u32;
kawa.storage.head += slice.len();
let buffer = kawa.storage.buffer();
let payload = slice.data(buffer);
println_!("{:?}", unsafe { from_utf8_unchecked(payload) });
kawa.push_block(kawa::Block::Chunk(kawa::Chunk {
data: kawa::Store::Slice(slice),
}));
Expand All @@ -550,16 +547,12 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
}));
kawa.parsing_phase = kawa::ParsingPhase::Terminated;
}
match self.position {
Position::Client(_) => {
let StreamState::Linked(token) = stream.state else { unreachable!() };
endpoint
.readiness_mut(token)
.interest
.insert(Ready::WRITABLE)
}
Position::Server => {}
};
if let StreamState::Linked(token) = stream.state {
endpoint
.readiness_mut(token)
.interest
.insert(Ready::WRITABLE)
}
}
Frame::Headers(headers) => {
if !headers.end_headers {
Expand All @@ -572,6 +565,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
let buffer = headers.header_block_fragment.data(kawa.storage.buffer());
let stream = &mut context.streams[global_stream_id];
let parts = &mut stream.split(&self.position);
let was_initial = parts.rbuffer.is_initial();
pkawa::handle_header(
parts.rbuffer,
buffer,
Expand All @@ -580,16 +574,18 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
parts.context,
);
debug_kawa(parts.rbuffer);
match self.position {
Position::Client(_) => {
let StreamState::Linked(token) = stream.state else { unreachable!() };
endpoint
.readiness_mut(token)
.interest
.insert(Ready::WRITABLE)
}
Position::Server => stream.state = StreamState::Link,
};
if let StreamState::Linked(token) = stream.state {
endpoint
.readiness_mut(token)
.interest
.insert(Ready::WRITABLE)
}
if was_initial {
match self.position {
Position::Server => stream.state = StreamState::Link,
Position::Client(_) => {}
};
}
}
Frame::PushPromise(push_promise) => match self.position {
Position::Client(_) => {
Expand Down Expand Up @@ -673,12 +669,12 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
}
}
}
Frame::Continuation(_) => todo!(),
Frame::Continuation(_) => unreachable!(),
}
MuxResult::Continue
}

fn force_disconnect(&mut self) -> MuxResult {
pub fn force_disconnect(&mut self) -> MuxResult {
self.state = H2State::Error;
match self.position {
Position::Client(_) => {
Expand Down
64 changes: 49 additions & 15 deletions lib/src/protocol/mux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl<Front: SocketHandler> Connection<Front> {
interest: Ready::WRITABLE | Ready::READABLE | Ready::HUP | Ready::ERROR,
event: Ready::EMPTY,
},
stream: 0,
stream: usize::MAX - 1,
requests: 0,
timeout_container,
})
Expand Down Expand Up @@ -311,6 +311,12 @@ impl<Front: SocketHandler> Connection<Front> {
Connection::H2(c) => c.socket.socket_ref(),
}
}
fn force_disconnect(&mut self) -> MuxResult {
match self {
Connection::H1(c) => c.force_disconnect(),
Connection::H2(c) => c.force_disconnect(),
}
}
fn readable<E>(&mut self, context: &mut Context, endpoint: E) -> MuxResult
where
E: Endpoint,
Expand Down Expand Up @@ -1073,8 +1079,10 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
Connection::H1(_) => false,
Connection::H2(_) => true,
};
let mut is_to_be_closed = true;
let mut should_close = true;
let mut should_write = false;
if self.frontend_token == token {
println_!("MuxState::timeout_frontend({:#?})", self.frontend);
self.frontend.timeout_container().triggered();
let front_readiness = self.frontend.readiness_mut();
for stream in &mut self.context.streams {
Expand All @@ -1085,27 +1093,27 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
// in most cases it was just reserved, so we can just ignore them.
if !front_is_h2 {
set_default_answer(stream, front_readiness, 408);
is_to_be_closed = false;
should_write = true;
}
}
StreamState::Link => {
// This is an unusual case, as we have both a complete request and no
// available backend yet. For now, we answer with 503
set_default_answer(stream, front_readiness, 503);
is_to_be_closed = false;
should_write = true;
}
StreamState::Linked(_) => {
// A stream Linked to a backend is waiting for the response, not the answer.
// For streaming or malformed requests, it is possible that the request is not
// terminated at this point. For now, we do nothing and
is_to_be_closed = false;
should_close = false;
}
StreamState::Unlinked => {
// A stream Unlinked already has a response and its backend closed.
// In case it hasn't finished proxying we wait. Otherwise it is a stream
// kept alive for a new request, which can be killed.
if !stream.back.is_completed() {
is_to_be_closed = false;
should_close = false;
}
}
StreamState::Recycle => {
Expand All @@ -1115,6 +1123,7 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
}
}
} else if let Some(backend) = self.router.backends.get_mut(&token) {
println_!("MuxState::timeout_backend({:#?})", backend);
backend.timeout_container().triggered();
let front_readiness = self.frontend.readiness_mut();
for stream_id in 0..self.context.streams.len() {
Expand All @@ -1123,24 +1132,40 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
if token == back_token {
// This stream is linked to the backend that timedout.
if stream.back.is_terminated() || stream.back.is_error() {
println!(
"Stream terminated or in error, do nothing, just wait a bit more"
);
// Nothing to do, simply wait for the remaining bytes to be proxied
if !stream.back.is_completed() {
is_to_be_closed = false;
should_close = false;
}
} else if stream.back.is_initial() {
// The response has not started yet
println!("Stream still waiting for response, send 504");
set_default_answer(stream, front_readiness, 504);
is_to_be_closed = false;
should_write = true;
} else {
println!("Stream waiting for end of response, forcefully terminate it");
forcefully_terminate_answer(stream, front_readiness);
is_to_be_closed = false;
should_write = true;
}
backend.end_stream(0, &mut self.context);
backend.end_stream(stream_id, &mut self.context);
backend.force_disconnect();
}
}
}
}
if is_to_be_closed {
if should_write {
return match self
.frontend
.writable(&mut self.context, EndpointClient(&mut self.router))
{
MuxResult::Continue => StateResult::Continue,
MuxResult::Upgrade => StateResult::Upgrade,
MuxResult::CloseSession => StateResult::CloseSession,
};
}
if should_close {
StateResult::CloseSession
} else {
StateResult::Continue
Expand All @@ -1160,11 +1185,19 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
"\
{} Session(Mux)
\tFrontend:
\t\ttoken: {:?}\treadiness: {:?}",
\t\ttoken: {:?}\treadiness: {:?}
\tBackend(s):",
context,
self.frontend_token,
self.frontend.readiness()
);
for (backend_token, backend) in &self.router.backends {
error!(
"\t\ttoken: {:?}\treadiness: {:?}",
backend_token,
backend.readiness()
)
}
}

fn close(&mut self, _proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
Expand All @@ -1182,9 +1215,10 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
let out = kawa.as_io_slice();
let mut writer = std::io::BufWriter::new(Vec::new());
let amount = writer.write_vectored(&out).unwrap();
println_!("amount: {amount}\n{}", unsafe {
std::str::from_utf8_unchecked(writer.buffer())
});
println_!(
"amount: {amount}\n{}",
String::from_utf8_lossy(writer.buffer())
);
}
}
}
Expand Down
58 changes: 42 additions & 16 deletions lib/src/protocol/mux/pkawa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub fn handle_header<C>(
) where
C: ParserCallbacks<Checkout>,
{
if !kawa.is_initial() {
return handle_trailer(kawa, input, end_stream, decoder);
}
kawa.push_block(Block::StatusLine);
kawa.detached.status_line = match kawa.kind {
Kind::Request => {
Expand Down Expand Up @@ -126,6 +129,7 @@ pub fn handle_header<C>(

if end_stream {
if let BodySize::Empty = kawa.body_size {
kawa.body_size = BodySize::Length(0);
kawa.push_block(Block::Header(Pair {
key: Store::Static(b"Content-Length"),
val: Store::Static(b"0"),
Expand All @@ -134,22 +138,12 @@ pub fn handle_header<C>(
}

kawa.push_block(Block::Flags(Flags {
end_body: false,
end_body: end_stream,
end_chunk: false,
end_header: true,
end_stream: false,
end_stream,
}));

if end_stream {
kawa.push_block(Block::Flags(Flags {
end_body: true,
end_chunk: false,
end_header: false,
end_stream: true,
}));
kawa.body_size = BodySize::Length(0);
}

if kawa.parsing_phase == ParsingPhase::Terminated {
return;
}
Expand All @@ -158,9 +152,41 @@ pub fn handle_header<C>(
BodySize::Chunked => ParsingPhase::Chunks { first: true },
BodySize::Length(0) => ParsingPhase::Terminated,
BodySize::Length(_) => ParsingPhase::Body,
BodySize::Empty => {
println!("HTTP is just the worst...");
ParsingPhase::Body
}
BodySize::Empty => ParsingPhase::Chunks { first: true },
};
}

pub fn handle_trailer(
kawa: &mut GenericHttpStream,
input: &[u8],
end_stream: bool,
decoder: &mut hpack::Decoder,
) {
decoder
.decode_with_cb(input, |k, v| {
let start = kawa.storage.end as u32;
kawa.storage.write_all(&k).unwrap();
kawa.storage.write_all(&v).unwrap();
let len_key = k.len() as u32;
let len_val = v.len() as u32;
let key = Store::Slice(Slice {
start,
len: len_key,
});
let val = Store::Slice(Slice {
start: start + len_key,
len: len_val,
});
kawa.push_block(Block::Header(Pair { key, val }));
})
.unwrap();

assert!(end_stream);
kawa.push_block(Block::Flags(Flags {
end_body: end_stream,
end_chunk: false,
end_header: true,
end_stream,
}));
kawa.parsing_phase = ParsingPhase::Terminated;
}

0 comments on commit b135cac

Please sign in to comment.