diff --git a/core/dtn7/src/core/application_agent.rs b/core/dtn7/src/core/application_agent.rs index 6cb40d12..d9a74ee4 100644 --- a/core/dtn7/src/core/application_agent.rs +++ b/core/dtn7/src/core/application_agent.rs @@ -6,6 +6,7 @@ use std::fmt::Debug; use tokio::sync::mpsc::Sender; use crate::dtnd::ws::BundleDelivery; +use crate::store_remove_if_singleton_bundle; //use crate::dtnd::ws::WsAASession; #[enum_dispatch] @@ -45,6 +46,8 @@ impl ApplicationAgent for SimpleApplicationAgent { if addr.try_send(BundleDelivery(bundle.clone())).is_err() { self.bundles.push_back(bundle.clone()); + } else { + store_remove_if_singleton_bundle(bundle); } } else { // save in temp buffer for delivery @@ -52,7 +55,11 @@ impl ApplicationAgent for SimpleApplicationAgent { } } fn pop(&mut self) -> Option { - self.bundles.pop_front() + let bundle = self.bundles.pop_front(); + if let Some(bndl) = bundle.as_ref() { + store_remove_if_singleton_bundle(bndl); + }; + bundle } fn set_delivery_addr(&mut self, addr: Sender) { diff --git a/core/dtn7/src/core/processing.rs b/core/dtn7/src/core/processing.rs index 74a7f89b..551ce20c 100644 --- a/core/dtn7/src/core/processing.rs +++ b/core/dtn7/src/core/processing.rs @@ -472,11 +472,6 @@ pub async fn local_delivery(mut bp: BundlePack) -> Result<()> { } bp.add_constraint(Constraint::LocalEndpoint); bp.sync()?; - if let Some(aa) = (*DTNCORE.lock()).get_endpoint_mut(&bp.destination) { - info!("Delivering {}", bp.id()); - aa.push(&bndl); - STATS.lock().delivered += 1; - } if is_local_node_id(&bp.destination) { if bndl .primary @@ -497,6 +492,11 @@ pub async fn local_delivery(mut bp: BundlePack) -> Result<()> { bp.add_constraint(Constraint::ForwardPending); } bp.sync()?; + if let Some(aa) = (*DTNCORE.lock()).get_endpoint_mut(&bp.destination) { + info!("Delivering {}", bp.id()); + aa.push(&bndl); + STATS.lock().delivered += 1; + } Ok(()) } pub fn contraindicated(mut bp: BundlePack) -> Result<()> { diff --git a/core/dtn7/src/lib.rs b/core/dtn7/src/lib.rs index ff1794d5..6898b6a3 100644 --- a/core/dtn7/src/lib.rs +++ b/core/dtn7/src/lib.rs @@ -190,6 +190,16 @@ pub fn store_remove(bid: &str) -> Result<()> { Ok(()) } +/// Removes a bundle from the store if its destination is a singleton endpoint +fn store_remove_if_singleton_bundle(bundle: &Bundle) { + if !bundle.primary.destination.is_non_singleton() { + debug!("Removing bundle with singleton destination from store"); + if let Err(e) = store_remove(&bundle.id()) { + error!("Error while removing bundle from store: {:?}", e); + } + } +} + pub fn store_update_metadata(bp: &BundlePack) -> Result<()> { (*STORE.lock()).update_metadata(bp) } diff --git a/run_all_tests.sh b/run_all_tests.sh index d3f395a1..8b20cb32 100755 --- a/run_all_tests.sh +++ b/run_all_tests.sh @@ -43,6 +43,7 @@ cargo test $TARGET_OPT && filter_output ./tests/local_nodes_dtn_httppull.sh && filter_output ./tests/local_nodes_http_dtn.sh && filter_output ./tests/store_delete.sh && + filter_output ./tests/store_delete_singleton.sh && filter_output ./tests/lifetime.sh && filter_output ./tests/cla_chain_test.sh && filter_output ./tests/ecla_test.sh && diff --git a/tests/local_ping_echo.sh b/tests/local_ping_echo.sh index f0cbebc6..383153a2 100755 --- a/tests/local_ping_echo.sh +++ b/tests/local_ping_echo.sh @@ -21,6 +21,22 @@ $BINS/examples/dtnping -d 'dtn://node1/echo' -c 6 -t 500ms RC=$? echo "RET: $RC" +NUM_BUNDLES=$($BINS/dtnquery bundles | grep "dtn://" | wc -l | awk '{print $1}') +NUM_DELETED=$($BINS/dtnquery store | grep -o "Deleted" | wc -l | awk '{print $1}') + +EXPECTED_BUNDLES=0 +EXPECTED_DELETED=12 + +echo "Bundles in store on node 1: : $NUM_BUNDLES / $EXPECTED_BUNDLES" +echo "Bundles marked as Deleted in store on node 1: : $NUM_DELETED / $EXPECTED_DELETED" +if [ "$NUM_BUNDLES" = "$EXPECTED_BUNDLES" ] && [ "$NUM_DELETED" = "$EXPECTED_DELETED" ]; then + echo "Correct number of bundles in store!" +else + echo "Incorrect number of bundles in store!" + RC=1 +fi + + wait_for_key $1 #kill $PID_ECHO1 diff --git a/tests/store_delete_singleton.sh b/tests/store_delete_singleton.sh new file mode 100755 index 00000000..c76e2fb9 --- /dev/null +++ b/tests/store_delete_singleton.sh @@ -0,0 +1,114 @@ +#!/bin/bash + +. $(dirname $(perl -MCwd -e 'print Cwd::abs_path shift' "$0"))/libshelltests.sh + +prepare_test + +#DB1="-W /tmp/node1 -D sled" +#DB1="-W /tmp/node1 -D sneakers" +PORT_NODE1=$(get_current_port) +start_dtnd -d -j5s -i0 -C tcp:port=2342 -e incoming -e ~group -r epidemic -n node1 $DB1 + +sleep 0.5 + +echo + +echo "Sending 'test' to node 1" +BID_SINGLE=$(echo test | $BINS/dtnsend -r dtn://node1/incoming -p $PORT_NODE1 | grep "dtn://" | awk '{print $2}') +BID_GRP=$(echo test | $BINS/dtnsend -r dtn://node1/~group -p $PORT_NODE1 | grep "dtn://" | awk '{print $2}') + +sleep 0.5 + +echo +echo -n "Bundles in store on node 1: " +NUM_BUNDLES=$($BINS/dtnquery store | grep "dtn://" | wc -l | awk '{print $1}') +echo -n $NUM_BUNDLES + +EXPECTED_BUNDLES=2 + +echo " / $EXPECTED_BUNDLES" +if [ "$NUM_BUNDLES" = "$EXPECTED_BUNDLES" ]; then + echo "Correct number of bundles in store!" +else + echo "Incorrect number of bundles in store!" + wait_for_key $1 + cleanup + exit 1 +fi +echo + +# Receive bundle IDs - should not change number of bundles +echo "Receiving bundles $BID_SINGLE and $BID_GRP on node 1: " +$BINS/dtnrecv -v -b $BID_SINGLE -p $PORT_NODE1 || exit $? +$BINS/dtnrecv -v -b $BID_GRP -p $PORT_NODE1 || exit $? +echo -n "Bundles in store on node 1: " +NUM_BUNDLES=$($BINS/dtnquery bundles | grep "dtn://" | wc -l | awk '{print $1}') +echo -n $NUM_BUNDLES + +EXPECTED_BUNDLES=2 + +echo " / $EXPECTED_BUNDLES" +if [ "$NUM_BUNDLES" = "$EXPECTED_BUNDLES" ]; then + echo "Correct number of bundles in store!" +else + echo "Incorrect number of bundles in store!" + wait_for_key $1 + cleanup + exit 1 +fi + + +# Receive singleton endpoint - should decrease number of bundles to 1 +echo "Receiving from endpoint 'incoming' on node 1: " +$BINS/dtnrecv -v -e incoming -p $PORT_NODE1 +RC=$? +echo +echo "RET: $RC" +echo +echo +echo -n "Bundles in store on node 1: " +NUM_BUNDLES=$($BINS/dtnquery bundles | grep "dtn://" | wc -l | awk '{print $1}') +echo -n $NUM_BUNDLES + +EXPECTED_BUNDLES=1 + +echo " / $EXPECTED_BUNDLES" +if [ "$NUM_BUNDLES" = "$EXPECTED_BUNDLES" ]; then + echo "Correct number of bundles in store!" +else + echo "Incorrect number of bundles in store!" + wait_for_key $1 + cleanup + exit 1 +fi + +# Receive group endpoint - should not change number of bundles +echo "Receiving from endpoint '~group' on node 1: " +$BINS/dtnrecv -v -e ~group -p $PORT_NODE1 +RC=$? +echo +echo "RET: $RC" +echo +echo +echo -n "Bundles in store on node 1: " +NUM_BUNDLES=$($BINS/dtnquery bundles | grep "dtn://" | wc -l | awk '{print $1}') +echo -n $NUM_BUNDLES + +EXPECTED_BUNDLES=1 + +echo " / $EXPECTED_BUNDLES" +if [ "$NUM_BUNDLES" = "$EXPECTED_BUNDLES" ]; then + echo "Correct number of bundles in store!" +else + echo "Incorrect number of bundles in store!" +fi + +wait_for_key $1 + +cleanup + +if [ "$NUM_BUNDLES" = "$EXPECTED_BUNDLES" ]; then + exit $RC +else + exit 1 +fi