Skip to content

Commit

Permalink
now only put request must be injected
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed Aug 22, 2024
1 parent 563e1a4 commit 7a509c7
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 15 deletions.
5 changes: 1 addition & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,9 +689,7 @@ pub trait PduProvider {
fn packet_target(&self) -> Result<PacketTarget, PduError>;
}

pub struct DummyPduProvider {
phantom: core::marker::PhantomData<()>,
}
pub struct DummyPduProvider(());

impl PduProvider for DummyPduProvider {
fn pdu_type(&self) -> PduType {
Expand Down Expand Up @@ -833,7 +831,6 @@ pub mod alloc_mod {
pub pdu_type: PduType,
pub file_directive_type: Option<FileDirectiveType>,
pub pdu: alloc::vec::Vec<u8>,
//packet_target: PacketTarget,
}

impl PduWithInfo {
Expand Down
83 changes: 72 additions & 11 deletions tests/end-to-end.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use std::{sync::mpsc, thread, time::Duration};

use cfdp::{
dest::DestinationHandler,
filestore::NativeFilestore,
request::StaticPutRequestCacher,
source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
EntityType, FaultHandler, IndicationConfig, LocalEntityConfig, PacketInfo, PduWithInfo,
RemoteEntityConfig, StdRemoteEntityConfigProvider, TransactionId, UserFaultHookProvider,
VecRemoteEntityConfigProvider,
EntityType, IndicationConfig, LocalEntityConfig, PduWithInfo, RemoteEntityConfig,
StdCheckTimerCreator, TransactionId, UserFaultHookProvider,
};
use spacepackets::{
cfdp::{ChecksumType, ConditionCode},
ecss::CrcType,
seq_count::{SeqCountProviderSimple, SeqCountProviderSyncU16},
seq_count::SeqCountProviderSyncU16,
util::UnsignedByteFieldU16,
};

Expand Down Expand Up @@ -143,15 +142,15 @@ impl CfdpUser for ExampleCfdpUser {
}

fn main() {
let local_cfg = LocalEntityConfig::new(
let local_cfg_source = LocalEntityConfig::new(
LOCAL_ID.into(),
IndicationConfig::default(),
ExampleFaultHandler::default(),
);
let (source_tx, source_rx) = mpsc::channel::<PduWithInfo>();
let (dest_tx, dest_rx) = mpsc::channel::<PduWithInfo>();
let put_request_cacher = StaticPutRequestCacher::new(2048);
let remote_cfg = RemoteEntityConfig::new_with_default_values(
let remote_cfg_of_dest = RemoteEntityConfig::new_with_default_values(
REMOTE_ID.into(),
1024,
true,
Expand All @@ -161,17 +160,40 @@ fn main() {
);
let seq_count_provider = SeqCountProviderSyncU16::default();
let mut source_handler = SourceHandler::new(
local_cfg,
local_cfg_source,
source_tx,
NativeFilestore::default(),
put_request_cacher,
2048,
remote_cfg,
remote_cfg_of_dest,
seq_count_provider,
);
let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending);

thread::spawn(move || {
let local_cfg_dest = LocalEntityConfig::new(
REMOTE_ID.into(),
IndicationConfig::default(),
ExampleFaultHandler::default(),
);
let remote_cfg_of_source = RemoteEntityConfig::new_with_default_values(
LOCAL_ID.into(),
1024,
true,
false,
spacepackets::cfdp::TransmissionMode::Unacknowledged,
ChecksumType::Crc32,
);
let mut dest_handler = DestinationHandler::new(
local_cfg_dest,
1024,
dest_tx,
NativeFilestore::default(),
remote_cfg_of_source,
StdCheckTimerCreator::default(),
);
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving);

let jh_source = thread::spawn(move || {
loop {
let mut next_delay = None;
let mut undelayed_call_count = 0;
Expand Down Expand Up @@ -206,6 +228,45 @@ fn main() {
thread::sleep(Duration::from_millis(100));
}
}
//source_handler.(source_rx);
});

let jh_dest = thread::spawn(move || {
loop {
let mut next_delay = None;
let mut undelayed_call_count = 0;
let packet_info = match source_rx.try_recv() {
Ok(pdu_with_info) => Some(pdu_with_info),
Err(e) => match e {
mpsc::TryRecvError::Empty => None,
mpsc::TryRecvError::Disconnected => {
panic!("unexpected disconnect from destination channel sender");
}
},
};
match dest_handler.state_machine(&mut cfdp_user_dest, packet_info.as_ref()) {
Ok(sent_packets) => {
if sent_packets == 0 {
next_delay = Some(Duration::from_millis(200));
}
}
Err(e) => {
println!("Source handler error: {}", e);
next_delay = Some(Duration::from_millis(200));
}
}
if let Some(delay) = next_delay {
thread::sleep(delay);
} else {
undelayed_call_count += 1;
}
// Safety feature against configuration errors.
if undelayed_call_count >= 200 {
println!("Source handler state machine possible in permanent loop");
thread::sleep(Duration::from_millis(100));
}
}
});

jh_source.join().unwrap();
jh_dest.join().unwrap();
}

0 comments on commit 7a509c7

Please sign in to comment.