Skip to content

Commit

Permalink
Remove singleton endpoint bundles from store after dispatch (#59)
Browse files Browse the repository at this point in the history
* AA removes bundles from store

* adds integration test

* delete on WS push as well

* augments local_ping_echo.sh with test for correct number of bundles in store
  • Loading branch information
teschmitt authored Feb 27, 2024
1 parent 10846a0 commit 1c829a2
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 6 deletions.
9 changes: 8 additions & 1 deletion core/dtn7/src/core/application_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::fmt::Debug;
use tokio::sync::mpsc::Sender;

use crate::dtnd::ws::BundleDelivery;
use crate::store_remove_if_singleton_bundle;
//use crate::dtnd::ws::WsAASession;

#[enum_dispatch]
Expand Down Expand Up @@ -45,14 +46,20 @@ impl ApplicationAgent for SimpleApplicationAgent {

if addr.try_send(BundleDelivery(bundle.clone())).is_err() {
self.bundles.push_back(bundle.clone());
} else {
store_remove_if_singleton_bundle(bundle);
}
} else {
// save in temp buffer for delivery
self.bundles.push_back(bundle.clone());
}
}
fn pop(&mut self) -> Option<Bundle> {
self.bundles.pop_front()
let bundle = self.bundles.pop_front();
if let Some(bndl) = bundle.as_ref() {
store_remove_if_singleton_bundle(bndl);
};
bundle
}

fn set_delivery_addr(&mut self, addr: Sender<BundleDelivery>) {
Expand Down
10 changes: 5 additions & 5 deletions core/dtn7/src/core/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,11 +472,6 @@ pub async fn local_delivery(mut bp: BundlePack) -> Result<()> {
}
bp.add_constraint(Constraint::LocalEndpoint);
bp.sync()?;
if let Some(aa) = (*DTNCORE.lock()).get_endpoint_mut(&bp.destination) {
info!("Delivering {}", bp.id());
aa.push(&bndl);
STATS.lock().delivered += 1;
}
if is_local_node_id(&bp.destination) {
if bndl
.primary
Expand All @@ -497,6 +492,11 @@ pub async fn local_delivery(mut bp: BundlePack) -> Result<()> {
bp.add_constraint(Constraint::ForwardPending);
}
bp.sync()?;
if let Some(aa) = (*DTNCORE.lock()).get_endpoint_mut(&bp.destination) {
info!("Delivering {}", bp.id());
aa.push(&bndl);
STATS.lock().delivered += 1;
}
Ok(())
}
pub fn contraindicated(mut bp: BundlePack) -> Result<()> {
Expand Down
10 changes: 10 additions & 0 deletions core/dtn7/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ pub fn store_remove(bid: &str) -> Result<()> {
Ok(())
}

/// Removes a bundle from the store if its destination is a singleton endpoint
fn store_remove_if_singleton_bundle(bundle: &Bundle) {
if !bundle.primary.destination.is_non_singleton() {
debug!("Removing bundle with singleton destination from store");
if let Err(e) = store_remove(&bundle.id()) {
error!("Error while removing bundle from store: {:?}", e);
}
}
}

pub fn store_update_metadata(bp: &BundlePack) -> Result<()> {
(*STORE.lock()).update_metadata(bp)
}
Expand Down
1 change: 1 addition & 0 deletions run_all_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ cargo test $TARGET_OPT &&
filter_output ./tests/local_nodes_dtn_httppull.sh &&
filter_output ./tests/local_nodes_http_dtn.sh &&
filter_output ./tests/store_delete.sh &&
filter_output ./tests/store_delete_singleton.sh &&
filter_output ./tests/lifetime.sh &&
filter_output ./tests/cla_chain_test.sh &&
filter_output ./tests/ecla_test.sh &&
Expand Down
16 changes: 16 additions & 0 deletions tests/local_ping_echo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,22 @@ $BINS/examples/dtnping -d 'dtn://node1/echo' -c 6 -t 500ms
RC=$?
echo "RET: $RC"

NUM_BUNDLES=$($BINS/dtnquery bundles | grep "dtn://" | wc -l | awk '{print $1}')
NUM_DELETED=$($BINS/dtnquery store | grep -o "Deleted" | wc -l | awk '{print $1}')

EXPECTED_BUNDLES=0
EXPECTED_DELETED=12

echo "Bundles in store on node 1: : $NUM_BUNDLES / $EXPECTED_BUNDLES"
echo "Bundles marked as Deleted in store on node 1: : $NUM_DELETED / $EXPECTED_DELETED"
if [ "$NUM_BUNDLES" = "$EXPECTED_BUNDLES" ] && [ "$NUM_DELETED" = "$EXPECTED_DELETED" ]; then
echo "Correct number of bundles in store!"
else
echo "Incorrect number of bundles in store!"
RC=1
fi


wait_for_key $1

#kill $PID_ECHO1
Expand Down
114 changes: 114 additions & 0 deletions tests/store_delete_singleton.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#!/bin/bash

. $(dirname $(perl -MCwd -e 'print Cwd::abs_path shift' "$0"))/libshelltests.sh

prepare_test

#DB1="-W /tmp/node1 -D sled"
#DB1="-W /tmp/node1 -D sneakers"
PORT_NODE1=$(get_current_port)
start_dtnd -d -j5s -i0 -C tcp:port=2342 -e incoming -e ~group -r epidemic -n node1 $DB1

sleep 0.5

echo

echo "Sending 'test' to node 1"
BID_SINGLE=$(echo test | $BINS/dtnsend -r dtn://node1/incoming -p $PORT_NODE1 | grep "dtn://" | awk '{print $2}')
BID_GRP=$(echo test | $BINS/dtnsend -r dtn://node1/~group -p $PORT_NODE1 | grep "dtn://" | awk '{print $2}')

sleep 0.5

echo
echo -n "Bundles in store on node 1: "
NUM_BUNDLES=$($BINS/dtnquery store | grep "dtn://" | wc -l | awk '{print $1}')
echo -n $NUM_BUNDLES

EXPECTED_BUNDLES=2

echo " / $EXPECTED_BUNDLES"
if [ "$NUM_BUNDLES" = "$EXPECTED_BUNDLES" ]; then
echo "Correct number of bundles in store!"
else
echo "Incorrect number of bundles in store!"
wait_for_key $1
cleanup
exit 1
fi
echo

# Receive bundle IDs - should not change number of bundles
echo "Receiving bundles $BID_SINGLE and $BID_GRP on node 1: "
$BINS/dtnrecv -v -b $BID_SINGLE -p $PORT_NODE1 || exit $?
$BINS/dtnrecv -v -b $BID_GRP -p $PORT_NODE1 || exit $?
echo -n "Bundles in store on node 1: "
NUM_BUNDLES=$($BINS/dtnquery bundles | grep "dtn://" | wc -l | awk '{print $1}')
echo -n $NUM_BUNDLES

EXPECTED_BUNDLES=2

echo " / $EXPECTED_BUNDLES"
if [ "$NUM_BUNDLES" = "$EXPECTED_BUNDLES" ]; then
echo "Correct number of bundles in store!"
else
echo "Incorrect number of bundles in store!"
wait_for_key $1
cleanup
exit 1
fi


# Receive singleton endpoint - should decrease number of bundles to 1
echo "Receiving from endpoint 'incoming' on node 1: "
$BINS/dtnrecv -v -e incoming -p $PORT_NODE1
RC=$?
echo
echo "RET: $RC"
echo
echo
echo -n "Bundles in store on node 1: "
NUM_BUNDLES=$($BINS/dtnquery bundles | grep "dtn://" | wc -l | awk '{print $1}')
echo -n $NUM_BUNDLES

EXPECTED_BUNDLES=1

echo " / $EXPECTED_BUNDLES"
if [ "$NUM_BUNDLES" = "$EXPECTED_BUNDLES" ]; then
echo "Correct number of bundles in store!"
else
echo "Incorrect number of bundles in store!"
wait_for_key $1
cleanup
exit 1
fi

# Receive group endpoint - should not change number of bundles
echo "Receiving from endpoint '~group' on node 1: "
$BINS/dtnrecv -v -e ~group -p $PORT_NODE1
RC=$?
echo
echo "RET: $RC"
echo
echo
echo -n "Bundles in store on node 1: "
NUM_BUNDLES=$($BINS/dtnquery bundles | grep "dtn://" | wc -l | awk '{print $1}')
echo -n $NUM_BUNDLES

EXPECTED_BUNDLES=1

echo " / $EXPECTED_BUNDLES"
if [ "$NUM_BUNDLES" = "$EXPECTED_BUNDLES" ]; then
echo "Correct number of bundles in store!"
else
echo "Incorrect number of bundles in store!"
fi

wait_for_key $1

cleanup

if [ "$NUM_BUNDLES" = "$EXPECTED_BUNDLES" ]; then
exit $RC
else
exit 1
fi

0 comments on commit 1c829a2

Please sign in to comment.