Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: minimal implementation of UDP CL without any of the extensions #64

Merged
merged 1 commit into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Plus:
* [Minimal TCP Convergence Layer](https://tools.ietf.org/html/draft-ietf-dtn-mtcpcl-01)
* A simple [HTTP Convergence Layer](doc/http-cl.md)
* A [HTTP pull-based Convergence Layer](doc/http-pull-cl.md)
* A minimal [UDP Convergence Layer](https://www.ietf.org/archive/id/draft-sipos-dtn-udpcl-01.html) (currently, without the extensions)
* An IP neighborhood discovery service
* Convenient command line tools to interact with the daemon
* A simple web interface for status information about `dtnd`
Expand Down
2 changes: 2 additions & 0 deletions core/dtn7/src/cla/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod http;
pub mod httppull;
pub mod mtcp;
pub mod tcp;
pub mod udp;

use self::http::HttpConvergenceLayer;
use anyhow::Result;
Expand All @@ -24,6 +25,7 @@ use std::{
};
use tcp::TcpConvergenceLayer;
use tokio::sync::{mpsc, oneshot};
use udp::UdpConvergenceLayer;

// generate various helpers
// - enum CLAsAvailable for verification and loading from str
Expand Down
170 changes: 170 additions & 0 deletions core/dtn7/src/cla/udp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use crate::cla::{ConvergenceLayerAgent, TransferResult};
use async_trait::async_trait;
use bp7::{Bundle, ByteBuffer};
use core::convert::TryFrom;
use dtn7_codegen::cla;
use log::{debug, error, info};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::time::Instant;
use tokio::io;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;

use super::HelpStr;

async fn udp_listener(addr: String, port: u16) -> Result<(), io::Error> {
let addr: SocketAddrV4 = format!("{}:{}", addr, port).parse().unwrap();
let listener = UdpSocket::bind(&addr)
.await
.expect("failed to bind udp port");
debug!("spawning UDP listener on port {}", port);
loop {
let mut buf = [0; 65535];
let (amt, src) = listener.recv_from(&mut buf).await?;
let buf = &buf[..amt];
if let Ok(bndl) = Bundle::try_from(buf.to_vec()) {
info!("Received bundle: {} from {}", bndl.id(), src);
{
tokio::spawn(async move {
if let Err(err) = crate::core::processing::receive(bndl).await {
error!("Failed to process bundle: {}", err);
}
});
}
} else {
crate::STATS.lock().broken += 1;
info!("Error decoding bundle from {}", src);
}
}
}

pub async fn udp_send_bundles(addr: SocketAddr, bundles: Vec<ByteBuffer>) -> TransferResult {
let now = Instant::now();
let num_bundles = bundles.len();
let total_bytes: usize = bundles.iter().map(|b| b.len()).sum();

let sock = UdpSocket::bind("0.0.0.0:0").await;
if sock.is_err() {
error!("Error binding UDP socket for sending");
return TransferResult::Failure;
}
let sock = sock.unwrap();
if sock.connect(addr).await.is_err() {
error!("Error connecting UDP socket for sending");
return TransferResult::Failure;
}

for b in bundles {
if b.len() > 65535 {
error!("Bundle too large for UDP transmission");
return TransferResult::Failure;
}
// send b via udp socket to addr
if sock.send(&b).await.is_err() {
error!("Error sending bundle via UDP");
return TransferResult::Failure;
}
}

debug!(
"Transmission time: {:?} for {} bundles in {} bytes to {}",
now.elapsed(),
num_bundles,
total_bytes,
addr
);

TransferResult::Successful
}

#[cla(udp)]
#[derive(Debug, Clone)]
pub struct UdpConvergenceLayer {
local_addr: String,
local_port: u16,
tx: mpsc::Sender<super::ClaCmd>,
}

impl UdpConvergenceLayer {
pub fn new(local_settings: Option<&HashMap<String, String>>) -> UdpConvergenceLayer {
let addr: String = local_settings
.and_then(|settings| settings.get("bind"))
.map(|s| s.to_string())
.unwrap_or_else(|| "0.0.0.0".to_string());
let port = local_settings
.and_then(|settings| settings.get("port"))
.and_then(|port_str| port_str.parse::<u16>().ok())
.unwrap_or(4556);
let (tx, mut rx) = mpsc::channel(100);
tokio::spawn(async move {
while let Some(cmd) = rx.recv().await {
match cmd {
super::ClaCmd::Transfer(remote, data, reply) => {
debug!(
"UdpConvergenceLayer: received transfer command for {}",
remote
);
if !data.is_empty() {
let peeraddr: SocketAddr = remote.parse().unwrap();
debug!("forwarding to {:?}", peeraddr);
tokio::spawn(async move {
reply
.send(udp_send_bundles(peeraddr, vec![data]).await)
.unwrap();
});
} else {
debug!("Nothing to forward.");
reply.send(TransferResult::Successful).unwrap();
}
}
super::ClaCmd::Shutdown => {
debug!("UdpConvergenceLayer: received shutdown command");
break;
}
}
}
});
UdpConvergenceLayer {
local_addr: addr,
local_port: port,
tx,
}
}

pub async fn spawn_listener(&self) -> std::io::Result<()> {
// TODO: bubble up errors from run
tokio::spawn(udp_listener(self.local_addr.clone(), self.local_port)); /*.await.unwrap()*/
Ok(())
}
}

#[async_trait]
impl ConvergenceLayerAgent for UdpConvergenceLayer {
async fn setup(&mut self) {
self.spawn_listener()
.await
.expect("error setting up udp listener");
}
fn port(&self) -> u16 {
self.local_port
}
fn name(&self) -> &str {
"udp"
}
fn channel(&self) -> tokio::sync::mpsc::Sender<super::ClaCmd> {
self.tx.clone()
}
}

impl HelpStr for UdpConvergenceLayer {
fn local_help_str() -> &'static str {
"port=4556:bind=0.0.0.0"
}
}
impl std::fmt::Display for UdpConvergenceLayer {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "udp:{}:{}", self.local_addr, self.local_port)
}
}
18 changes: 12 additions & 6 deletions tests/cla_chain_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ PORT_NODE1=$(get_current_port)
PORT_NODE2=$((PORT_NODE1 + 1))
PORT_NODE3=$((PORT_NODE1 + 2))
PORT_NODE4=$((PORT_NODE1 + 3))
PORT_NODE5=$((PORT_NODE1 + 4))

#DB1="-W /tmp/node1 -D sled"
#DB1="-W /tmp/node1 -D sneakers"
Expand All @@ -30,15 +31,20 @@ start_dtnd -d -j5s -i0 -C http -C tcp:port=4224 -e incoming \
-s http://127.0.0.1:$PORT_NODE2/node2 \
-s tcp://127.0.0.1:4225/node4 $STATUS_REPORTS

start_dtnd -d -j5s -i0 -C tcp:port=4225 -e incoming -r epidemic \
-n node4 -s tcp://127.0.0.1:4224/node3 $DB4 $STATUS_REPORTS
start_dtnd -d -j5s -i0 -C tcp:port=4225 -C udp:port=4556 -e incoming -r epidemic \
-n node4 -s tcp://127.0.0.1:4224/node3 $DB4 $STATUS_REPORTS \
-s udp://127.0.0.1:4557/node5 $STATUS_REPORTS

start_dtnd -d -j5s -i0 -C udp:port=4557 -e incoming \
-r epidemic -n node5 \
-s udp://127.0.0.1:4556/node4 $STATUS_REPORTS

sleep 1

echo

echo "Sending 'test' to node 4"
echo test | $BINS/dtnsend -r dtn://node4/incoming -p $PORT_NODE1
echo "Sending 'test' to node 5"
echo test | $BINS/dtnsend -r dtn://node5/incoming -p $PORT_NODE1

sleep 5

Expand All @@ -59,8 +65,8 @@ else
echo "Incorrect number of bundles in store!"
fi
echo
echo -n "Receiving on node 4: "
$BINS/dtnrecv -v -e incoming -p $PORT_NODE4
echo -n "Receiving on node 5: "
$BINS/dtnrecv -v -e incoming -p $PORT_NODE5
RC=$?
echo "RET: $RC"
echo
Expand Down
Loading