Skip to content

Commit 92f7a2e

Browse files
committed
Emit global uTP listener events when uTP stream is closed or reset
1 parent 357a5eb commit 92f7a2e

File tree

11 files changed

+571
-300
lines changed

11 files changed

+571
-300
lines changed

Cargo.lock

Lines changed: 48 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

newsfragments/325.added.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,6 @@
1-
Process all closed uTP streams in UtpListener and pass the payload to overlay service.
1+
- Rename `UtpSocket` to `UtpStream`.
2+
- Refactor the way we are storing the received payload (DATA packets) in the uTP stream.
3+
- Add a new AddActiveConnection UtpListener request and move the initialization of a uTP stream inside UtpListener.
4+
- Add UtpStream -> UtpListener event channel and emit event inside UtpStream when stream state changes to Closed or Reset.
5+
- Emit a global uTP listener event containing a uTP payload when a stream is closed.
6+
- Remove redundant and dead code.

trin-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,4 @@ features = ["bundled"]
6969

7070
[dev-dependencies]
7171
quickcheck = "1.0.3"
72+
ntest = "0.8.0"

trin-core/src/portalnet/discovery.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use rand::seq::SliceRandom;
1414
use serde_json::{json, Value};
1515
use std::{
1616
convert::TryFrom,
17+
fmt,
1718
net::{IpAddr, SocketAddr},
1819
sync::Arc,
1920
time::Duration,
@@ -54,6 +55,18 @@ pub struct Discovery {
5455
pub listen_socket: SocketAddr,
5556
}
5657

58+
impl fmt::Debug for Discovery {
59+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
60+
write!(
61+
f,
62+
"Discovery: ( enr: {}, started: {}, listen_socket: {} )",
63+
self.discv5.local_enr(),
64+
self.started,
65+
self.listen_socket
66+
)
67+
}
68+
}
69+
5770
impl Discovery {
5871
pub fn new(portal_config: PortalnetConfig) -> Result<Self, String> {
5972
let listen_all_ips = SocketAddr::new("0.0.0.0".parse().unwrap(), portal_config.listen_port);

trin-core/src/portalnet/overlay.rs

Lines changed: 62 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::portalnet::{
1818
use crate::{
1919
portalnet::types::content_key::RawContentKey,
2020
utp::{
21-
stream::{UtpListenerRequest, UtpSocket, BUF_SIZE},
21+
stream::{UtpListenerRequest, UtpStream, BUF_SIZE},
2222
trin_helpers::{UtpAccept, UtpMessage, UtpStreamId},
2323
},
2424
};
@@ -281,49 +281,41 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
281281
conn_id: u16,
282282
) -> Result<Content, OverlayRequestError> {
283283
// initiate the connection to the acceptor
284-
let (tx, rx) = tokio::sync::oneshot::channel::<anyhow::Result<UtpSocket>>();
285-
self.utp_listener_tx
286-
.send(UtpListenerRequest::Connect(
287-
conn_id,
288-
enr.node_id(),
289-
self.protocol.clone(),
290-
UtpStreamId::FindContentStream,
291-
tx,
284+
let (tx, rx) = tokio::sync::oneshot::channel::<UtpStream>();
285+
let utp_request = UtpListenerRequest::Connect(
286+
conn_id,
287+
enr,
288+
self.protocol.clone(),
289+
UtpStreamId::FindContentStream,
290+
tx,
291+
);
292+
293+
self.utp_listener_tx.send(utp_request).map_err(|err| {
294+
OverlayRequestError::UtpError(format!(
295+
"Unable to send Connect request with FindContent stream to UtpListener: {err}"
292296
))
293-
.map_err(|err| {
294-
OverlayRequestError::UtpError(format!(
295-
"Unable to send Connect request with FindContent stream to UtpListener: {err}"
296-
))
297-
})?;
297+
})?;
298298

299299
match rx.await {
300-
Ok(conn) => {
301-
match conn {
302-
Ok(mut conn) => {
303-
let mut result = Vec::new();
304-
// Loop and receive all DATA packets, similar to `read_to_end`
305-
loop {
306-
let mut buf = [0; BUF_SIZE];
307-
match conn.recv_from(&mut buf).await {
308-
Ok((0, _)) => {
309-
break;
310-
}
311-
Ok((bytes, _)) => {
312-
result.extend_from_slice(&mut buf[..bytes]);
313-
}
314-
Err(err) => {
315-
warn!("Unable to receive content via uTP: {err}");
316-
return Err(OverlayRequestError::UtpError(err.to_string()));
317-
}
318-
}
300+
Ok(mut conn) => {
301+
let mut result = Vec::new();
302+
// Loop and receive all DATA packets, similar to `read_to_end`
303+
loop {
304+
let mut buf = [0; BUF_SIZE];
305+
match conn.recv_from(&mut buf).await {
306+
Ok((0, _)) => {
307+
break;
308+
}
309+
Ok((bytes, _)) => {
310+
result.extend_from_slice(&mut buf[..bytes]);
311+
}
312+
Err(err) => {
313+
warn!("Unable to receive content via uTP: {err}");
314+
return Err(OverlayRequestError::UtpError(err.to_string()));
319315
}
320-
Ok(Content::Content(VariableList::from(result)))
321-
}
322-
Err(err) => {
323-
warn!("Unable to initiate uTP stream with remote node. Error initializing uTP socket: {err}");
324-
Err(OverlayRequestError::UtpError(err.to_string()))
325316
}
326317
}
318+
Ok(Content::Content(VariableList::from(result)))
327319
}
328320
Err(err) => {
329321
warn!("Unable to receive from uTP listener channel: {err}");
@@ -390,47 +382,43 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
390382
}
391383

392384
// initiate the connection to the acceptor
393-
let (tx, rx) = tokio::sync::oneshot::channel::<anyhow::Result<UtpSocket>>();
394-
395-
self.utp_listener_tx.send(UtpListenerRequest::Connect(
385+
let (tx, rx) = tokio::sync::oneshot::channel::<UtpStream>();
386+
let utp_request = UtpListenerRequest::Connect(
396387
conn_id,
397-
enr.node_id(),
388+
enr,
398389
self.protocol.clone(),
399390
UtpStreamId::OfferStream,
400391
tx,
401-
)).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?;
392+
);
402393

403-
match rx.await? {
404-
Ok(mut conn) => {
405-
// Handle STATE packet for SYN
406-
let mut buf = [0; BUF_SIZE];
407-
conn.recv(&mut buf).await?;
408-
409-
let content_items = self.provide_requested_content(&response, content_keys_offered);
410-
411-
let content_message = UtpAccept {
412-
message: content_items,
413-
};
414-
415-
tokio::spawn(async move {
416-
// send the content to the acceptor over a uTP stream
417-
if let Err(err) = conn
418-
.send_to(&UtpMessage::new(content_message.as_ssz_bytes()).encode()[..])
419-
.await
420-
{
421-
warn!("Error sending content {err}");
422-
};
423-
// Close uTP connection
424-
if let Err(err) = conn.close().await {
425-
warn!("Unable to close uTP connection!: {err}")
426-
};
427-
});
428-
Ok(response)
429-
}
430-
Err(err) => Err(anyhow!(
431-
"Unable to initialize Offer uTP stream with remote node: {err}"
432-
)),
433-
}
394+
self.utp_listener_tx
395+
.send(utp_request).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?;
396+
397+
let mut conn = rx.await?;
398+
// Handle STATE packet for SYN
399+
let mut buf = [0; BUF_SIZE];
400+
conn.recv(&mut buf).await?;
401+
402+
let content_items = self.provide_requested_content(&response, content_keys_offered);
403+
404+
let content_message = UtpAccept {
405+
message: content_items,
406+
};
407+
408+
tokio::spawn(async move {
409+
// send the content to the acceptor over a uTP stream
410+
if let Err(err) = conn
411+
.send_to(&UtpMessage::new(content_message.as_ssz_bytes()).encode()[..])
412+
.await
413+
{
414+
warn!("Error sending content {err}");
415+
};
416+
// Close uTP connection
417+
if let Err(err) = conn.close().await {
418+
warn!("Unable to close uTP connection!: {err}")
419+
};
420+
});
421+
Ok(response)
434422
}
435423

436424
/// Provide the requested content key and content value for the acceptor

0 commit comments

Comments
 (0)