From 9fc5e24ddd9d683b75628bb45b3e474781b49505 Mon Sep 17 00:00:00 2001 From: Orion Kindel Date: Mon, 8 May 2023 19:11:31 -0500 Subject: [PATCH] wip --- toad/src/step/block.rs | 66 ++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/toad/src/step/block.rs b/toad/src/step/block.rs index a85f2e6..a6a0c45 100644 --- a/toad/src/step/block.rs +++ b/toad/src/step/block.rs @@ -86,16 +86,8 @@ impl Conversation Pieces: Default { /// Create a new [`Conversation`] tracking a Blocked response to a sent request (param `msg`) - pub fn expect_response(expires_at: Instant, msg: Message

) -> Self { - Self { original: Some(msg), - biggest_number_seen: None, - pcs: Default::default(), - expires_at } - } - - /// Create a new [`Conversation`] tracking a received Blocked request - pub fn request(expires_at: Instant) -> Self { - Self { original: None, + pub fn new(expires_at: Instant, original: Option>) -> Self { + Self { original, biggest_number_seen: None, pcs: Default::default(), expires_at } @@ -280,21 +272,13 @@ impl Block, addr: SocketAddr, req: &Message

) { - let exp = snap.time + Milliseconds(snap.config.exchange_lifetime_millis()); - self.get_or_create_endpoint(addr, |convs| { - convs.insert((req.token, Role::Response, Direction::Inbound), - Conversation::expect_response(exp, req.clone())) - .unwrap(); - }); - } - - fn insert_request(&self, + fn insert(&self, snap: &Snapshot

, + original: Option<&Message

>, (addr, token, role, dir): (SocketAddr, Token, Role, Direction)) { let exp = snap.time + Milliseconds(snap.config.exchange_lifetime_millis()); self.get_or_create_endpoint(addr, |convs| { - convs.insert((token, role, dir), Conversation::request(exp)) + convs.insert((token, role, dir), Conversation::new(exp, original.cloned())) .unwrap(); }); } @@ -391,7 +375,7 @@ impl Step

}, | Some(block) => { if !has_prev_pieces && block.num() == 0 && block.more() { - self.insert_request(snap, k); + self.insert(snap, None, k); self.map_mut(k, |conv| { conv.have(snap.time, 0, req.clone().map(|r| r.into()).unwrap()) }); @@ -467,7 +451,7 @@ impl Step

}, | Some(block) => { if !has_prev_pieces { - // TODO: warn + log!(Block::poll_resp, effects, log::Level::Warn, "Response received for token {:?} but we've never seen a request using that token. Ignoring this response despite it having {:?}", rep.data().msg().token, block); Some(Ok(rep)) } else { self.map_mut(k, |conv| { @@ -496,16 +480,48 @@ impl Step

} } + fn before_message_sent(&self, + snap: &platform::Snapshot

, + effs: &mut P::Effects, + msg: &mut Addrd>) + -> Result<(), Self::Error> { + self.prune(effs, snap.time); + self.inner.before_message_sent(snap, effs, msg)?; + + let block_size: usize = 1024; + + let original_payload = msg.data().payload().0; + + // TODO: block if 1024 is too big and we got REQUEST_ENTITY_TOO_LARGE + if msg.data().block1().is_none() && original_payload.len() > block_size { + let k = (msg.addr(), msg.data().token, Role::Request, Direction::Outbound); + self.insert(snap, Some(msg.data()), k); + self.map_mut(k, |conv| { + let len = original_payload.len() as f32; + let block_count = (len / block_size as f32).ceil() as u32; + for n in 0..block_count { + let mut msg_block = msg.clone(); + msg_block.as_mut().set_block1(1024, n, n == block_count - 1).ok(); + let mut p = P::MessagePayload::default(); + p.append_copy(original_payload[n * 1024..((n + 1) * 1024)]); + msg_block.as_mut().payload = Payload(p); + conv.have(snap.time, n, msg_block.unwrap()); + } + }).unwrap(); + } + Ok(()) + } + fn on_message_sent(&self, snap: &platform::Snapshot

, effs: &mut P::Effects, msg: &Addrd>) -> Result<(), Self::Error> { - self.prune(effs, snap.time); self.inner.on_message_sent(snap, effs, msg)?; if msg.data().code.kind() == CodeKind::Request { - self.expect_response(snap, msg.addr(), msg.data()); + self.insert(snap, Some(msg.data()), (msg.addr(), msg.data().token, Role::Response, Direction::Inbound)); } else if msg.data().code.kind() == CodeKind::Response { + // TODO: block outbound responses } Ok(())