Skip to content

Commit

Permalink
fix: static routing now removes bundles if forwarded without errors f…
Browse files Browse the repository at this point in the history
…or singleton endpoints
  • Loading branch information
gh0st42 committed Feb 27, 2024
1 parent 84dcc22 commit fe52c8e
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 12 deletions.
8 changes: 8 additions & 0 deletions core/dtn7/src/core/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,14 @@ pub async fn forward(mut bp: BundlePack) -> Result<()> {
start_time.elapsed()
);
bundle_sent.store(true, Ordering::Relaxed);
if let Err(err) = routing_notify(RoutingNotifcation::SendingSucceeded(
bpid,
n.next_hop.node().unwrap(),
))
.await
{
error!("Error while sending succeeded notification: {}", err);
}
}
});
wg.push(task_handle);
Expand Down
2 changes: 2 additions & 0 deletions core/dtn7/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ lazy_static! {
pub static ref CLAS: Mutex<Vec<CLAEnum>> = Mutex::new(Vec::new());
}

pub type BundleID = String;

pub fn cla_add(cla: CLAEnum) {
(*CLAS.lock()).push(cla);
}
Expand Down
17 changes: 14 additions & 3 deletions core/dtn7/src/routing/erouting/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{peers_get_for_node, BundlePack, DtnPeer, PeerAddress, RoutingNotifcation};
use crate::{peers_get_for_node, BundleID, BundlePack, DtnPeer, PeerAddress, RoutingNotifcation};
use bp7::{Bundle, EndpointID};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
Expand Down Expand Up @@ -32,6 +32,8 @@ pub enum Packet {
Timeout(Timeout),
/// Packet that signals that the sending failed.
SendingFailed(SendingFailed),
/// Packet that signals that the sending succeeded.
SendingSucceeded(SendingSucceeded),
/// Packet that signals that a bundle is incoming.
IncomingBundle(IncomingBundle),
/// Packet that signals that a bundle is incoming without a previous node.
Expand All @@ -54,6 +56,9 @@ impl From<RoutingNotifcation> for Packet {
RoutingNotifcation::SendingFailed(bid, cla_sender) => {
Packet::SendingFailed(SendingFailed { bid, cla_sender })
}
RoutingNotifcation::SendingSucceeded(bid, cla_sender) => {
Packet::SendingSucceeded(SendingSucceeded { bid, cla_sender })
}
RoutingNotifcation::IncomingBundle(bndl) => {
Packet::IncomingBundle(IncomingBundle { bndl })
}
Expand Down Expand Up @@ -111,7 +116,13 @@ pub struct Timeout {

#[derive(Serialize, Deserialize, Clone)]
pub struct SendingFailed {
pub bid: String,
pub bid: BundleID,
pub cla_sender: String,
}

#[derive(Serialize, Deserialize, Clone)]
pub struct SendingSucceeded {
pub bid: BundleID,
pub cla_sender: String,
}

Expand All @@ -122,7 +133,7 @@ pub struct IncomingBundle {

#[derive(Serialize, Deserialize, Clone)]
pub struct IncomingBundleWithoutPreviousNode {
pub bid: String,
pub bid: BundleID,
pub node_name: String,
}

Expand Down
7 changes: 5 additions & 2 deletions core/dtn7/src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod static_routing;

use crate::cla::ClaSenderTask;
use crate::core::bundlepack::BundlePack;
use crate::BundleID;
use async_trait::async_trait;
use bp7::Bundle;
use bp7::EndpointID;
Expand All @@ -23,10 +24,12 @@ use std::fmt::Debug;
use std::fmt::Display;
use tokio::sync::{mpsc, oneshot};

#[derive(Debug)]
pub enum RoutingNotifcation {
SendingFailed(String, String),
SendingFailed(BundleID, String),
SendingSucceeded(BundleID, String),
IncomingBundle(Bundle),
IncomingBundleWithoutPreviousNode(String, String),
IncomingBundleWithoutPreviousNode(BundleID, String),
EncounteredPeer(EndpointID),
DroppedPeer(EndpointID),
}
Expand Down
22 changes: 18 additions & 4 deletions core/dtn7/src/routing/static_routing.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Display;

use crate::{CONFIG, PEERS};
use crate::{RoutingNotifcation, CONFIG, PEERS};

use super::{RoutingAgent, RoutingCmd};
use async_trait::async_trait;
Expand Down Expand Up @@ -88,6 +88,19 @@ fn parse_route_from_str(s: &str) -> Option<StaticRouteEntry> {
})
}

async fn handle_routing_notification(notification: RoutingNotifcation) {
debug!("Received notification: {:?}", notification);
match notification {
RoutingNotifcation::SendingFailed(bid, cla_sender) => {
debug!("Sending failed for bundle {} on CLA {}", bid, cla_sender);
}
RoutingNotifcation::SendingSucceeded(bid, cla_sender) => {
debug!("Sending succeeded for bundle {} on CLA {}", bid, cla_sender);
}
_ => { /* ignore */ }
}
}

async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
let mut route_entries = vec![];
let settings = CONFIG.lock().routing_settings.clone();
Expand Down Expand Up @@ -126,8 +139,7 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
if p.eid.to_string() == route.via {
if let Some(cla) = p.first_cla() {
clas.push(cla);
delete_afterwards =
p.node_name() == bp.destination.node().unwrap();
delete_afterwards = !bp.destination.is_non_singleton();
break 'route_loop;
}
}
Expand Down Expand Up @@ -172,7 +184,9 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
.fold(String::new(), |acc, r| acc + &format!("{}\n", r));
tx.send(routes_as_str).unwrap();
}
super::RoutingCmd::Notify(_) => {}
super::RoutingCmd::Notify(notification) => {
handle_routing_notification(notification).await;
}
}
}
}
4 changes: 1 addition & 3 deletions tests/routing_static.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@

prepare_test

STATUS_REPORTS="-g"
#STATUS_REPORTS="-g"

PORT_NODE1=$(get_current_port)
start_dtnd -d -j0 -i0 -C mtcp:port=2342 -e 42 -r static -n 1 -s mtcp://127.0.0.1:4223/2 $STATUS_REPORTS -R static.routes=tests/routes_1.csv

PORT_NODE2=$(get_current_port)
#DB2="-W /tmp/node2 -D sled"
#DB2="-W /tmp/node2 -D sneakers"
start_dtnd -d -j0 -i0 -C mtcp:port=4223 -e 42 -r static \
-n 2 \
-s mtcp://127.0.0.1:2342/1 \
Expand Down

0 comments on commit fe52c8e

Please sign in to comment.