Skip to content

Commit

Permalink
Pass proto-layer connection events internally
Browse files Browse the repository at this point in the history
Groundwork for replacing generic message-passing with lighter
event-representation-aware communication strategies.
  • Loading branch information
Ralith committed Dec 17, 2023
1 parent 10e0616 commit 9f6ece6
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 78 deletions.
36 changes: 16 additions & 20 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
frame::{Close, Datagram, FrameStruct},
packet::{Header, LongType, Packet, PartialDecode, SpaceId},
range_set::ArrayRangeSet,
shared::{ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent},
shared::{ConnectionEvent, ConnectionId, EcnCodepoint, EndpointEvent},
token::ResetToken,
transport_parameters::TransportParameters,
ConnectionHandle, Dir, EndpointConfig, Frame, Side, StreamId, Transmit, TransportError,
Expand Down Expand Up @@ -128,6 +128,7 @@ pub struct Connection {
server_config: Option<Arc<ServerConfig>>,
config: Arc<TransportConfig>,
rng: StdRng,
connection_events: mpsc::Receiver<ConnectionEvent>,
crypto: Box<dyn crypto::Session>,
/// The CID we initially chose, for use during the handshake
handshake_cid: ConnectionId,
Expand Down Expand Up @@ -251,6 +252,7 @@ impl Connection {
allow_mtud: bool,
rng_seed: [u8; 32],
endpoint_events: EndpointEvents,
connection_events: mpsc::Receiver<ConnectionEvent>,
) -> Self {
let side = if server_config.is_some() {
Side::Server
Expand All @@ -271,6 +273,7 @@ impl Connection {
let mut this = Self {
endpoint_config,
server_config,
connection_events,
crypto,
handshake_cid: loc_cid,
rem_handshake_cid: rem_cid,
Expand Down Expand Up @@ -949,14 +952,20 @@ impl Connection {
SendableFrames::empty()
}

/// Process `ConnectionEvent`s generated by the associated `Endpoint`
/// Process events from the associated [`Endpoint`](crate::Endpoint)
///
/// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
/// (including application `Event`s, endpoint events, and outgoing datagrams) that should be
/// checked through the relevant methods.
pub fn handle_event(&mut self, event: ConnectionEvent, now: Instant) {
use self::ConnectionEventInner::*;
match event.0 {
pub fn handle_events(&mut self, now: Instant) {
while let Ok(event) = self.connection_events.try_recv() {
self.handle_event(event, now);
}
}

fn handle_event(&mut self, event: ConnectionEvent, now: Instant) {
use self::ConnectionEvent::*;
match event {
Datagram {
now,
remote,
Expand Down Expand Up @@ -3274,22 +3283,9 @@ impl Connection {

/// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
#[cfg(test)]
pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
let (first_decode, remaining) = match &event.0 {
ConnectionEventInner::Datagram {
first_decode,
remaining,
..
} => (first_decode, remaining),
_ => return None,
};

if remaining.is_some() {
panic!("Packets should never be coalesced in tests");
}

pub(crate) fn decode_packet(&self, packet: PartialDecode) -> Option<Vec<u8>> {
let decrypted_header = packet_crypto::unprotect_header(
first_decode.clone(),
packet.clone(),
&self.spaces,
self.zero_rtt_crypto.as_ref(),
self.peer_params.stateless_reset_token,
Expand Down
71 changes: 47 additions & 24 deletions quinn-proto/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use crate::{
crypto::{self, Keys, UnsupportedVersion},
frame,
packet::{Header, Packet, PacketDecodeError, PacketNumber, PartialDecode},
shared::{
ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent, IssuedCid,
},
shared::{ConnectionEvent, ConnectionId, EcnCodepoint, EndpointEvent, IssuedCid},
transport_parameters::TransportParameters,
ResetToken, RetryToken, Side, Transmit, TransportConfig, TransportError, INITIAL_MTU,
MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE,
Expand Down Expand Up @@ -81,25 +79,23 @@ impl Endpoint {

/// Process events from [`Connection`]s that have returned `true` from [`Connection::poll_endpoint_events`]
///
/// May return a `ConnectionEvent` for any `Connection`. Call until `None` is returned.
pub fn handle_events(&mut self) -> Option<(ConnectionHandle, ConnectionEvent)> {
/// May return the [`ConnectionHandle`] of a [`Connection`] for which
/// [`Connection::handle_events`] must be called. Call until `None` is returned.
pub fn handle_events(&mut self) -> Option<ConnectionHandle> {
while let Ok((ch, event)) = self.event_recv.try_recv() {
if let Some(response) = self.handle_event(ch, event) {
return Some((ch, response));
if self.handle_event(ch, event) {
return Some(ch);
}
}
None
}

fn handle_event(
&mut self,
ch: ConnectionHandle,
event: EndpointEvent,
) -> Option<ConnectionEvent> {
fn handle_event(&mut self, ch: ConnectionHandle, event: EndpointEvent) -> bool {
use EndpointEvent::*;
match event {
NeedIdentifiers(n) => {
return Some(self.send_new_identifiers(ch, n));
self.send_new_identifiers(ch, n);
return true;
}
ResetToken(remote, token) => {
if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
Expand All @@ -114,7 +110,8 @@ impl Endpoint {
trace!("peer retired CID {}: {}", seq, cid);
self.index.retire(&cid);
if allow_more_cids {
return Some(self.send_new_identifiers(ch, 1));
self.send_new_identifiers(ch, 1);
return true;
}
}
}
Expand All @@ -129,7 +126,27 @@ impl Endpoint {
}
}
}
None
false
}

#[cfg(test)]
pub(crate) fn decode_packet(
&self,
datagram: BytesMut,
) -> Result<PartialDecode, PacketDecodeError> {
PartialDecode::new(
datagram,
self.local_cid_generator.cid_len(),
&self.config.supported_versions,
self.config.grease_quic_bit,
)
.map(|(packet, rest)| {
assert!(
rest.is_none(),
"capturing decoded coalesced packets in tests is unimplemented"
);
packet
})
}

/// Process an incoming UDP datagram
Expand Down Expand Up @@ -196,16 +213,16 @@ impl Endpoint {

let addresses = FourTuple { remote, local_ip };
if let Some(ch) = self.index.get(&addresses, &first_decode) {
return Some(DatagramEvent::ConnectionEvent(
ch,
ConnectionEvent(ConnectionEventInner::Datagram {
_ = self.connections[ch.0]
.events
.send(ConnectionEvent::Datagram {
now,
remote: addresses.remote,
ecn,
first_decode,
remaining,
}),
));
});
return Some(DatagramEvent::ConnectionEvent(ch));
}

//
Expand Down Expand Up @@ -375,7 +392,7 @@ impl Endpoint {
Ok((ch, conn))
}

fn send_new_identifiers(&mut self, ch: ConnectionHandle, num: u64) -> ConnectionEvent {
fn send_new_identifiers(&mut self, ch: ConnectionHandle, num: u64) {
let mut ids = vec![];
for _ in 0..num {
let id = self.new_cid(ch);
Expand All @@ -389,7 +406,9 @@ impl Endpoint {
reset_token: ResetToken::new(&*self.config.reset_key, &id),
});
}
ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids))
_ = self.connections[ch]
.events
.send(ConnectionEvent::NewIdentifiers(ids));
}

/// Generate a connection ID for `ch`
Expand Down Expand Up @@ -603,6 +622,7 @@ impl Endpoint {
) -> Connection {
let mut rng_seed = [0; 32];
self.rng.fill_bytes(&mut rng_seed);
let (send, recv) = mpsc::channel();
let conn = Connection::new(
self.config.clone(),
server_config,
Expand All @@ -619,6 +639,7 @@ impl Endpoint {
self.allow_mtud,
rng_seed,
EndpointEvents::new(ch, self.event_send.clone()),
recv,
);

let id = self.connections.insert(ConnectionMeta {
Expand All @@ -627,6 +648,7 @@ impl Endpoint {
loc_cids: iter::once((0, loc_cid)).collect(),
addresses,
reset_token: None,
events: send,
});
debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");

Expand Down Expand Up @@ -836,6 +858,7 @@ pub(crate) struct ConnectionMeta {
/// Reset token provided by the peer for the CID we're currently sending to, and the address
/// being sent to
reset_token: Option<(SocketAddr, ResetToken)>,
events: mpsc::Sender<ConnectionEvent>,
}

/// Internal identifier for a `Connection` currently associated with an endpoint
Expand Down Expand Up @@ -864,8 +887,8 @@ impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
/// Event resulting from processing a single datagram
#[allow(clippy::large_enum_variant)] // Not passed around extensively
pub enum DatagramEvent {
/// The datagram is redirected to its `Connection`
ConnectionEvent(ConnectionHandle, ConnectionEvent),
/// [`Connection::handle_events`] must be called on the associated [`Connection`]
ConnectionEvent(ConnectionHandle),
/// The datagram has resulted in starting a new `Connection`
NewConnection(ConnectionHandle, Connection),
/// Response generated directly by the endpoint
Expand Down
2 changes: 1 addition & 1 deletion quinn-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ mod endpoint;
pub use crate::endpoint::{ConnectError, ConnectionHandle, DatagramEvent, Endpoint};

mod shared;
pub use crate::shared::{ConnectionEvent, ConnectionId, EcnCodepoint};
pub use crate::shared::{ConnectionId, EcnCodepoint};

mod transport_error;
pub use crate::transport_error::{Code as TransportErrorCode, Error as TransportError};
Expand Down
6 changes: 1 addition & 5 deletions quinn-proto/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@ use bytes::{Buf, BufMut, BytesMut};

use crate::{coding::BufExt, packet::PartialDecode, ResetToken, MAX_CID_SIZE};

/// Events sent from an Endpoint to a Connection
#[derive(Debug)]
pub struct ConnectionEvent(pub(crate) ConnectionEventInner);

#[derive(Debug)]
pub(crate) enum ConnectionEventInner {
pub(crate) enum ConnectionEvent {
/// A datagram has been received for the Connection
Datagram {
now: Instant,
Expand Down
6 changes: 2 additions & 4 deletions quinn-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fn version_negotiate_client() {
.unwrap();
let now = Instant::now();
let mut buf = BytesMut::with_capacity(client.config().get_max_udp_payload_size() as usize);
let opt_event = client.handle(
client.handle(
now,
server_addr,
None,
Expand All @@ -90,9 +90,7 @@ fn version_negotiate_client() {
.into(),
&mut buf,
);
if let Some(DatagramEvent::ConnectionEvent(_, event)) = opt_event {
client_ch.handle_event(event, now);
}
client_ch.handle_events(now);
assert_matches!(
client_ch.poll(),
Some(Event::ConnectionLost {
Expand Down
34 changes: 17 additions & 17 deletions quinn-proto/src/tests/util.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
cmp,
collections::{HashMap, VecDeque},
collections::{HashMap, HashSet, VecDeque},
env,
io::{self, Write},
mem,
Expand Down Expand Up @@ -289,7 +289,7 @@ pub(super) struct TestEndpoint {
pub(super) inbound: VecDeque<(Instant, Option<EcnCodepoint>, BytesMut)>,
accepted: Option<ConnectionHandle>,
pub(super) connections: HashMap<ConnectionHandle, Connection>,
conn_events: HashMap<ConnectionHandle, VecDeque<ConnectionEvent>>,
conn_events: HashSet<ConnectionHandle>,
pub(super) captured_packets: Vec<Vec<u8>>,
pub(super) capture_inbound_packets: bool,
}
Expand All @@ -315,7 +315,7 @@ impl TestEndpoint {
inbound: VecDeque::new(),
accepted: None,
connections: HashMap::default(),
conn_events: HashMap::default(),
conn_events: HashSet::default(),
captured_packets: Vec::new(),
capture_inbound_packets: false,
}
Expand All @@ -335,22 +335,24 @@ impl TestEndpoint {

while self.inbound.front().map_or(false, |x| x.0 <= now) {
let (recv_time, ecn, packet) = self.inbound.pop_front().unwrap();
if let Some(event) = self
.endpoint
.handle(recv_time, remote, None, ecn, packet, &mut buf)
if let Some(event) =
self.endpoint
.handle(recv_time, remote, None, ecn, packet.clone(), &mut buf)
{
match event {
DatagramEvent::NewConnection(ch, conn) => {
self.connections.insert(ch, conn);
self.accepted = Some(ch);
}
DatagramEvent::ConnectionEvent(ch, event) => {
DatagramEvent::ConnectionEvent(ch) => {
if self.capture_inbound_packets {
let packet = self.connections[&ch].decode_packet(&event);
let packet = self
.decode_packet(packet)
.ok()
.and_then(|x| self.connections[&ch].decode_packet(x));
self.captured_packets.extend(packet);
}

self.conn_events.entry(ch).or_default().push_back(event);
self.conn_events.insert(ch);
}
DatagramEvent::Response(transmit) => {
let size = transmit.size;
Expand All @@ -363,16 +365,14 @@ impl TestEndpoint {

loop {
let mut endpoint_events = false;
for (_, conn) in self.connections.iter_mut() {
for (ch, conn) in self.connections.iter_mut() {
if self.timeout.map_or(false, |x| x <= now) {
self.timeout = None;
conn.handle_timeout(now);
}

for (_, mut events) in self.conn_events.drain() {
for event in events.drain(..) {
conn.handle_event(event, now);
}
if self.conn_events.remove(ch) {
conn.handle_events(now);
}

endpoint_events |= conn.poll_endpoint_events();
Expand All @@ -388,9 +388,9 @@ impl TestEndpoint {
break;
}

while let Some((ch, event)) = self.handle_events() {
while let Some(ch) = self.handle_events() {
if let Some(conn) = self.connections.get_mut(&ch) {
conn.handle_event(event, now);
conn.handle_events(now);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,8 +933,8 @@ impl State {
Poll::Ready(Some(ConnectionEvent::Ping)) => {
self.inner.ping();
}
Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
self.inner.handle_event(event, now);
Poll::Ready(Some(ConnectionEvent::Proto)) => {
self.inner.handle_events(now);
}
Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
self.close(error_code, reason, shared);
Expand Down
Loading

0 comments on commit 9f6ece6

Please sign in to comment.