diff --git a/src/dest.rs b/src/dest.rs index cbd27a8..8c4db36 100644 --- a/src/dest.rs +++ b/src/dest.rs @@ -1,14 +1,14 @@ -use crate::{user::TransactionFinishedParams, GenericSendError}; +use crate::{user::TransactionFinishedParams, DummyPduProvider, GenericSendError, PduProvider}; use core::str::{from_utf8, Utf8Error}; use std::path::{Path, PathBuf}; use super::{ filestore::{FilestoreError, NativeFilestore, VirtualFilestore}, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams}, - CheckTimerProviderCreator, CountdownProvider, EntityType, LocalEntityConfig, PacketInfo, - PacketTarget, PduSendProvider, RemoteEntityConfig, RemoteEntityConfigProvider, State, - StdCheckTimer, StdCheckTimerCreator, StdRemoteEntityConfigProvider, TimerContext, - TransactionId, UserFaultHookProvider, + CheckTimerProviderCreator, CountdownProvider, EntityType, LocalEntityConfig, PacketTarget, + PduSendProvider, RemoteEntityConfig, RemoteEntityConfigProvider, State, StdCheckTimer, + StdCheckTimerCreator, StdRemoteEntityConfigProvider, TimerContext, TransactionId, + UserFaultHookProvider, }; use smallvec::SmallVec; use spacepackets::{ @@ -290,6 +290,13 @@ impl< } } + pub fn state_machine_no_packet( + &mut self, + cfdp_user: &mut impl CfdpUser, + ) -> Result { + self.state_machine(cfdp_user, None::<&DummyPduProvider>) + } + /// This is the core function to drive the destination handler. It is also used to insert /// packets into the destination handler. /// @@ -301,7 +308,7 @@ impl< pub fn state_machine( &mut self, cfdp_user: &mut impl CfdpUser, - packet_to_insert: Option<&PacketInfo>, + packet_to_insert: Option<&impl PduProvider>, ) -> Result { if let Some(packet) = packet_to_insert { self.insert_packet(cfdp_user, packet)?; @@ -335,28 +342,28 @@ impl< fn insert_packet( &mut self, cfdp_user: &mut impl CfdpUser, - packet_info: &PacketInfo, + packet_to_insert: &impl PduProvider, ) -> Result<(), DestError> { - if packet_info.target() != PacketTarget::DestEntity { + if packet_to_insert.packet_target()? != PacketTarget::DestEntity { // Unwrap is okay here, a PacketInfo for a file data PDU should always have the // destination as the target. return Err(DestError::CantProcessPacketType { - pdu_type: packet_info.pdu_type(), - directive_type: packet_info.pdu_directive(), + pdu_type: packet_to_insert.pdu_type(), + directive_type: packet_to_insert.file_directive_type(), }); } - match packet_info.pdu_type { + match packet_to_insert.pdu_type() { PduType::FileDirective => { - if packet_info.pdu_directive.is_none() { + if packet_to_insert.file_directive_type().is_none() { return Err(DestError::DirectiveFieldEmpty); } self.handle_file_directive( cfdp_user, - packet_info.pdu_directive.unwrap(), - packet_info.raw_packet, + packet_to_insert.file_directive_type().unwrap(), + packet_to_insert.pdu(), ) } - PduType::FileData => self.handle_file_data(cfdp_user, packet_info.raw_packet), + PduType::FileData => self.handle_file_data(cfdp_user, packet_to_insert.pdu()), } } @@ -863,7 +870,7 @@ mod tests { basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler, LOCAL_ID, }, - CheckTimerProviderCreator, CountdownProvider, FaultHandler, IndicationConfig, + CheckTimerProviderCreator, CountdownProvider, FaultHandler, IndicationConfig, PacketInfo, StdRemoteEntityConfigProvider, CRC_32, }; @@ -1297,7 +1304,7 @@ mod tests { testbench.set_check_timer_expired(); testbench .handler - .state_machine(&mut test_user, None) + .state_machine_no_packet(&mut test_user) .expect("fsm failure"); let fault_handler = testbench.handler.local_cfg.fault_handler.user_hook.borrow(); @@ -1339,7 +1346,7 @@ mod tests { testbench.set_check_timer_expired(); testbench .handler - .state_machine(&mut test_user, None) + .state_machine_no_packet(&mut test_user) .expect("fsm error"); testbench.state_check( State::Busy, @@ -1348,7 +1355,7 @@ mod tests { testbench.set_check_timer_expired(); testbench .handler - .state_machine(&mut test_user, None) + .state_machine_no_packet(&mut test_user) .expect("fsm error"); testbench.state_check(State::Idle, TransactionStep::Idle); diff --git a/src/lib.rs b/src/lib.rs index 08892ef..9d62270 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,8 @@ use crc::{Crc, CRC_32_CKSUM}; #[cfg(feature = "std")] use hashbrown::HashMap; +#[cfg(feature = "alloc")] +pub use alloc_mod::*; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; use spacepackets::{ @@ -515,6 +517,8 @@ pub enum GenericSendError { RxDisconnected, #[error("queue is full, fill count {0:?}")] QueueFull(Option), + #[error("other send error")] + Other, } #[cfg(feature = "std")] @@ -529,16 +533,10 @@ pub trait PduSendProvider { #[cfg(feature = "std")] pub mod std_mod { - use std::{sync::mpsc, vec::Vec}; + use std::sync::mpsc; use super::*; - pub struct PduWithInfo { - pub pdu_type: PduType, - pub file_directive_type: Option, - pub pdu: Vec, - } - impl PduSendProvider for mpsc::Sender { fn send_pdu( &self, @@ -546,11 +544,11 @@ pub mod std_mod { file_directive_type: Option, raw_pdu: &[u8], ) -> Result<(), GenericSendError> { - self.send(PduWithInfo { + self.send(PduWithInfo::new( pdu_type, file_directive_type, - pdu: raw_pdu.to_vec(), - }) + raw_pdu.to_vec(), + )) .map_err(|_| GenericSendError::RxDisconnected)?; Ok(()) } @@ -684,41 +682,56 @@ pub enum PacketTarget { DestEntity, } +pub trait PduProvider { + fn pdu_type(&self) -> PduType; + fn file_directive_type(&self) -> Option; + fn pdu(&self) -> &[u8]; + fn packet_target(&self) -> Result; +} + +pub struct DummyPduProvider { + phantom: core::marker::PhantomData<()>, +} + +impl PduProvider for DummyPduProvider { + fn pdu_type(&self) -> PduType { + PduType::FileData + } + + fn file_directive_type(&self) -> Option { + None + } + + fn pdu(&self) -> &[u8] { + &[] + } + + fn packet_target(&self) -> Result { + Ok(PacketTarget::SourceEntity) + } +} + /// This is a helper struct which contains base information about a particular PDU packet. /// This is also necessary information for CFDP packet routing. For example, some packet types /// like file data PDUs can only be used by CFDP source entities. pub struct PacketInfo<'raw_packet> { pdu_type: PduType, - pdu_directive: Option, - target: PacketTarget, + file_directive_type: Option, + //target: PacketTarget, raw_packet: &'raw_packet [u8], } -impl<'raw> PacketInfo<'raw> { - pub fn new(raw_packet: &'raw [u8]) -> Result { - let (pdu_header, header_len) = PduHeader::from_bytes(raw_packet)?; - if pdu_header.pdu_type() == PduType::FileData { - return Ok(Self { - pdu_type: pdu_header.pdu_type(), - pdu_directive: None, - target: PacketTarget::DestEntity, - raw_packet, - }); - } - if pdu_header.pdu_datafield_len() < 1 { - return Err(PduError::FormatError); - } - // Route depending on PDU type and directive type if applicable. Retrieve directive type - // from the raw stream for better performance (with sanity and directive code check). - // The routing is based on section 4.5 of the CFDP standard which specifies the PDU forwarding - // procedure. - let directive = FileDirectiveType::try_from(raw_packet[header_len]).map_err(|_| { - PduError::InvalidDirectiveType { - found: raw_packet[header_len], - expected: None, - } - })?; - let packet_target = match directive { +pub fn determine_packet_target( + file_directive_type: Option, + raw_pdu: &[u8], +) -> Result { + if file_directive_type.is_none() { + return Ok(PacketTarget::DestEntity); + } + let (_, header_len) = PduHeader::from_bytes(raw_pdu)?; + let file_directive_type = file_directive_type.unwrap(); + let packet_target = + match file_directive_type { // Section c) of 4.5.3: These PDUs should always be targeted towards the file sender a.k.a. // the source handler FileDirectiveType::NakPdu @@ -733,9 +746,9 @@ impl<'raw> PacketInfo<'raw> { // extract the PDU type from the raw stream. If it is an EOF PDU, this packet is passed to // the source handler, for a Finished PDU, it is passed to the destination handler. FileDirectiveType::AckPdu => { - let acked_directive = FileDirectiveType::try_from(raw_packet[header_len + 1]) + let acked_directive = FileDirectiveType::try_from(raw_pdu[header_len + 1]) .map_err(|_| PduError::InvalidDirectiveType { - found: raw_packet[header_len], + found: raw_pdu[header_len], expected: None, })?; if acked_directive == FileDirectiveType::EofPdu { @@ -745,35 +758,115 @@ impl<'raw> PacketInfo<'raw> { } else { // TODO: Maybe a better error? This might be confusing.. return Err(PduError::InvalidDirectiveType { - found: raw_packet[header_len + 1], + found: raw_pdu[header_len + 1], expected: None, }); } } }; + Ok(packet_target) +} + +impl<'raw> PacketInfo<'raw> { + pub fn new(raw_packet: &'raw [u8]) -> Result { + let (pdu_header, header_len) = PduHeader::from_bytes(raw_packet)?; + if pdu_header.pdu_type() == PduType::FileData { + return Ok(Self { + pdu_type: pdu_header.pdu_type(), + file_directive_type: None, + raw_packet, + }); + } + if pdu_header.pdu_datafield_len() < 1 { + return Err(PduError::FormatError); + } + // Route depending on PDU type and directive type if applicable. Retrieve directive type + // from the raw stream for better performance (with sanity and directive code check). + // The routing is based on section 4.5 of the CFDP standard which specifies the PDU forwarding + // procedure. + let directive = FileDirectiveType::try_from(raw_packet[header_len]).map_err(|_| { + PduError::InvalidDirectiveType { + found: raw_packet[header_len], + expected: None, + } + })?; Ok(Self { pdu_type: pdu_header.pdu_type(), - pdu_directive: Some(directive), - target: packet_target, + file_directive_type: Some(directive), raw_packet, }) } - pub fn pdu_type(&self) -> PduType { - self.pdu_type + pub fn raw_packet(&self) -> &[u8] { + self.raw_packet } +} - pub fn pdu_directive(&self) -> Option { - self.pdu_directive +impl PduProvider for PacketInfo<'_> { + fn pdu_type(&self) -> PduType { + self.pdu_type } - pub fn target(&self) -> PacketTarget { - self.target + fn file_directive_type(&self) -> Option { + self.file_directive_type } - pub fn raw_packet(&self) -> &[u8] { + fn pdu(&self) -> &[u8] { self.raw_packet } + + fn packet_target(&self) -> Result { + determine_packet_target(self.file_directive_type, self.raw_packet) + } +} + +#[cfg(feature = "alloc")] +pub mod alloc_mod { + use spacepackets::cfdp::{ + pdu::{FileDirectiveType, PduError}, + PduType, + }; + + use crate::{determine_packet_target, PacketTarget, PduProvider}; + + pub struct PduWithInfo { + pub pdu_type: PduType, + pub file_directive_type: Option, + pub pdu: alloc::vec::Vec, + //packet_target: PacketTarget, + } + + impl PduWithInfo { + pub fn new( + pdu_type: PduType, + file_directive_type: Option, + pdu: alloc::vec::Vec, + ) -> Self { + Self { + pdu_type, + file_directive_type, + pdu, + } + } + } + + impl PduProvider for PduWithInfo { + fn pdu_type(&self) -> PduType { + self.pdu_type + } + + fn file_directive_type(&self) -> Option { + self.file_directive_type + } + + fn pdu(&self) -> &[u8] { + &self.pdu + } + + fn packet_target(&self) -> Result { + determine_packet_target(self.file_directive_type, &self.pdu) + } + } } #[cfg(test)] @@ -1079,12 +1172,15 @@ pub(crate) mod tests { let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); assert_eq!(packet_info.pdu_type(), PduType::FileDirective); - assert!(packet_info.pdu_directive().is_some()); + assert!(packet_info.file_directive_type().is_some()); assert_eq!( - packet_info.pdu_directive().unwrap(), + packet_info.file_directive_type().unwrap(), FileDirectiveType::MetadataPdu ); - assert_eq!(packet_info.target(), PacketTarget::DestEntity); + assert_eq!( + packet_info.packet_target().unwrap(), + PacketTarget::DestEntity + ); } #[test] @@ -1097,8 +1193,11 @@ pub(crate) mod tests { .expect("writing file data PDU failed"); let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); assert_eq!(packet_info.pdu_type(), PduType::FileData); - assert!(packet_info.pdu_directive().is_none()); - assert_eq!(packet_info.target(), PacketTarget::DestEntity); + assert!(packet_info.file_directive_type().is_none()); + assert_eq!( + packet_info.packet_target().unwrap(), + PacketTarget::DestEntity + ); } #[test] @@ -1111,9 +1210,9 @@ pub(crate) mod tests { .expect("writing file data PDU failed"); let packet_info = PacketInfo::new(&buf).expect("creating packet info failed"); assert_eq!(packet_info.pdu_type(), PduType::FileDirective); - assert!(packet_info.pdu_directive().is_some()); + assert!(packet_info.file_directive_type().is_some()); assert_eq!( - packet_info.pdu_directive().unwrap(), + packet_info.file_directive_type().unwrap(), FileDirectiveType::EofPdu ); } diff --git a/src/source.rs b/src/source.rs index 264f064..efce388 100644 --- a/src/source.rs +++ b/src/source.rs @@ -22,13 +22,13 @@ use spacepackets::{ use spacepackets::seq_count::SequenceCountProvider; -use crate::GenericSendError; +use crate::{DummyPduProvider, GenericSendError, PduProvider}; use super::{ filestore::{FilestoreError, VirtualFilestore}, request::{ReadablePutRequest, StaticPutRequestCacher}, user::{CfdpUser, TransactionFinishedParams}, - LocalEntityConfig, PacketInfo, PacketTarget, PduSendProvider, RemoteEntityConfig, + LocalEntityConfig, PacketTarget, PduSendProvider, RemoteEntityConfig, RemoteEntityConfigProvider, State, TransactionId, UserFaultHookProvider, }; @@ -192,6 +192,13 @@ impl< } } + pub fn state_machine_no_packet( + &mut self, + cfdp_user: &mut impl CfdpUser, + ) -> Result { + self.state_machine(cfdp_user, None::<&DummyPduProvider>) + } + /// This is the core function to drive the source handler. It is also used to insert /// packets into the source handler. /// @@ -203,7 +210,7 @@ impl< pub fn state_machine( &mut self, cfdp_user: &mut impl CfdpUser, - packet_to_insert: Option<&PacketInfo>, + packet_to_insert: Option<&impl PduProvider>, ) -> Result { if let Some(packet) = packet_to_insert { self.insert_packet(cfdp_user, packet)?; @@ -224,17 +231,17 @@ impl< fn insert_packet( &mut self, _cfdp_user: &mut impl CfdpUser, - packet_info: &PacketInfo, + packet_to_insert: &impl PduProvider, ) -> Result<(), SourceError> { - if packet_info.target() != PacketTarget::SourceEntity { + if packet_to_insert.packet_target()? != PacketTarget::SourceEntity { // Unwrap is okay here, a PacketInfo for a file data PDU should always have the // destination as the target. return Err(SourceError::CantProcessPacketType { - pdu_type: packet_info.pdu_type(), - directive_type: packet_info.pdu_directive(), + pdu_type: packet_to_insert.pdu_type(), + directive_type: packet_to_insert.file_directive_type(), }); } - if packet_info.pdu_type() == PduType::FileData { + if packet_to_insert.pdu_type() == PduType::FileData { // The [PacketInfo] API should ensure that file data PDUs can not be passed // into a source entity, so this should never happen. return Err(SourceError::UnexpectedPdu { @@ -244,11 +251,11 @@ impl< } // Unwrap is okay here, the [PacketInfo] API should ensure that the directive type is // always a valid value. - match packet_info - .pdu_directive() + match packet_to_insert + .file_directive_type() .expect("PDU directive type unexpectedly not set") { - FileDirectiveType::FinishedPdu => self.handle_finished_pdu(packet_info)?, + FileDirectiveType::FinishedPdu => self.handle_finished_pdu(packet_to_insert)?, FileDirectiveType::NakPdu => self.handle_nak_pdu(), FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(), FileDirectiveType::AckPdu => return Err(SourceError::NotImplemented), @@ -256,8 +263,8 @@ impl< | FileDirectiveType::PromptPdu | FileDirectiveType::MetadataPdu => { return Err(SourceError::CantProcessPacketType { - pdu_type: packet_info.pdu_type(), - directive_type: packet_info.pdu_directive(), + pdu_type: packet_to_insert.pdu_type(), + directive_type: packet_to_insert.file_directive_type(), }); } } @@ -722,7 +729,7 @@ impl< Ok(()) } - fn handle_finished_pdu(&mut self, packet_info: &PacketInfo) -> Result<(), SourceError> { + fn handle_finished_pdu(&mut self, pdu_provider: &impl PduProvider) -> Result<(), SourceError> { // Ignore this packet when we are idle. if self.state_helper.state == State::Idle { return Ok(()); @@ -733,7 +740,7 @@ impl< directive_type: Some(FileDirectiveType::FinishedPdu), }); } - let finished_pdu = FinishedPduReader::new(packet_info.raw_packet())?; + let finished_pdu = FinishedPduReader::new(pdu_provider.pdu())?; // Unwrapping should be fine here, the transfer state is valid when we are not in IDLE // mode. self.tstate.as_mut().unwrap().finished_params = Some(FinishedParams { @@ -809,7 +816,7 @@ mod tests { filestore::NativeFilestore, request::PutRequestOwned, tests::{basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler}, - FaultHandler, IndicationConfig, StdRemoteEntityConfigProvider, CRC_32, + FaultHandler, IndicationConfig, PacketInfo, StdRemoteEntityConfigProvider, CRC_32, }; use spacepackets::seq_count::SeqCountProviderSimple; @@ -975,7 +982,7 @@ mod tests { assert_eq!(tb.handler.step(), TransactionStep::Idle); let sent_packets = tb .handler - .state_machine(cfdp_user, None) + .state_machine_no_packet(cfdp_user) .expect("source handler FSM failure"); assert_eq!(sent_packets, 2); assert!(!tb.pdu_queue_empty()); @@ -1111,7 +1118,7 @@ mod tests { let mut fd_pdus = 0; for segment in chunks { check_next_file_pdu(tb, current_offset, segment); - tb.handler.state_machine(cfdp_user, None).unwrap(); + tb.handler.state_machine_no_packet(cfdp_user).unwrap(); fd_pdus += 1; current_offset += segment.len() as u64; } diff --git a/tests/end-to-end.rs b/tests/end-to-end.rs index e6a67d8..7b06fc5 100644 --- a/tests/end-to-end.rs +++ b/tests/end-to-end.rs @@ -1,9 +1,12 @@ -use std::sync::mpsc; +use std::{sync::mpsc, thread, time::Duration}; use cfdp::{ - filestore::NativeFilestore, request::StaticPutRequestCacher, source::SourceHandler, - FaultHandler, IndicationConfig, LocalEntityConfig, PduWithInfo, RemoteEntityConfig, - StdRemoteEntityConfigProvider, TransactionId, UserFaultHookProvider, + filestore::NativeFilestore, + request::StaticPutRequestCacher, + source::SourceHandler, + user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, + EntityType, FaultHandler, IndicationConfig, LocalEntityConfig, PacketInfo, PduWithInfo, + RemoteEntityConfig, StdRemoteEntityConfigProvider, TransactionId, UserFaultHookProvider, VecRemoteEntityConfigProvider, }; use spacepackets::{ @@ -26,6 +29,10 @@ impl UserFaultHookProvider for ExampleFaultHandler { cond: ConditionCode, progress: u64, ) { + panic!( + "unexpected suspension of transaction {:?}, condition code {:?}, progress {}", + transaction_id, cond, progress + ); } fn notice_of_cancellation_cb( @@ -34,11 +41,105 @@ impl UserFaultHookProvider for ExampleFaultHandler { cond: ConditionCode, progress: u64, ) { + panic!( + "unexpected cancellation of transaction {:?}, condition code {:?}, progress {}", + transaction_id, cond, progress + ); } - fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {} + fn abandoned_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) { + panic!( + "unexpected abandonment of transaction {:?}, condition code {:?}, progress {}", + transaction_id, cond, progress + ); + } - fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {} + fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) { + panic!( + "ignoring unexpected error in transaction {:?}, condition code {:?}, progress {}", + transaction_id, cond, progress + ); + } +} + +pub struct ExampleCfdpUser { + entity_type: EntityType, +} + +impl ExampleCfdpUser { + pub fn new(entity_type: EntityType) -> Self { + Self { entity_type } + } +} + +impl CfdpUser for ExampleCfdpUser { + fn transaction_indication(&mut self, id: &crate::TransactionId) { + println!( + "{:?} entity: transaction indication for {:?}", + self.entity_type, id + ); + } + + fn eof_sent_indication(&mut self, id: &crate::TransactionId) { + println!( + "{:?} entity: EOF sent for transaction {:?}", + self.entity_type, id + ); + } + + fn transaction_finished_indication(&mut self, finished_params: &TransactionFinishedParams) { + println!( + "{:?} entity: Transaction finished: {:?}", + self.entity_type, finished_params + ); + } + + fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams) { + println!( + "{:?} entity: Metadata {:?} received", + self.entity_type, md_recvd_params + ); + } + + fn file_segment_recvd_indication(&mut self, segment_recvd_params: &FileSegmentRecvdParams) { + println!( + "{:?} entity: File segment {:?} received", + self.entity_type, segment_recvd_params + ); + } + + fn report_indication(&mut self, _id: &crate::TransactionId) {} + + fn suspended_indication(&mut self, _id: &crate::TransactionId, _condition_code: ConditionCode) { + panic!("unexpected suspended indication"); + } + + fn resumed_indication(&mut self, _id: &crate::TransactionId, _progresss: u64) {} + + fn fault_indication( + &mut self, + _id: &crate::TransactionId, + _condition_code: ConditionCode, + _progress: u64, + ) { + panic!("unexpected fault indication"); + } + + fn abandoned_indication( + &mut self, + _id: &crate::TransactionId, + _condition_code: ConditionCode, + _progress: u64, + ) { + panic!("unexpected abandoned indication"); + } + + fn eof_recvd_indication(&mut self, id: &crate::TransactionId) { + println!( + "{:?} entity: EOF received for transaction {:?}", + self.entity_type, id + ); + } } fn main() { @@ -48,6 +149,7 @@ fn main() { ExampleFaultHandler::default(), ); let (source_tx, source_rx) = mpsc::channel::(); + let (dest_tx, dest_rx) = mpsc::channel::(); let put_request_cacher = StaticPutRequestCacher::new(2048); let remote_cfg = RemoteEntityConfig::new_with_default_values( REMOTE_ID.into(), @@ -58,7 +160,7 @@ fn main() { ChecksumType::Crc32, ); let seq_count_provider = SeqCountProviderSyncU16::default(); - let source_handler = SourceHandler::new( + let mut source_handler = SourceHandler::new( local_cfg, source_tx, NativeFilestore::default(), @@ -67,4 +169,43 @@ fn main() { remote_cfg, seq_count_provider, ); + let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending); + + thread::spawn(move || { + loop { + let mut next_delay = None; + let mut undelayed_call_count = 0; + let packet_info = match dest_rx.try_recv() { + Ok(pdu_with_info) => Some(pdu_with_info), + Err(e) => match e { + mpsc::TryRecvError::Empty => None, + mpsc::TryRecvError::Disconnected => { + panic!("unexpected disconnect from destination channel sender"); + } + }, + }; + match source_handler.state_machine(&mut cfdp_user_source, packet_info.as_ref()) { + Ok(sent_packets) => { + if sent_packets == 0 { + next_delay = Some(Duration::from_millis(200)); + } + } + Err(e) => { + println!("Source handler error: {}", e); + next_delay = Some(Duration::from_millis(200)); + } + } + if let Some(delay) = next_delay { + thread::sleep(delay); + } else { + undelayed_call_count += 1; + } + // Safety feature against configuration errors. + if undelayed_call_count >= 200 { + println!("Source handler state machine possible in permanent loop"); + thread::sleep(Duration::from_millis(100)); + } + } + //source_handler.(source_rx); + }); }