From 7197b668002b78e2944f88b59f15e78e19dc4017 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 22 Aug 2024 20:00:26 +0200 Subject: [PATCH] end-to-end test finished --- tests/end-to-end.rs | 100 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 86 insertions(+), 14 deletions(-) diff --git a/tests/end-to-end.rs b/tests/end-to-end.rs index a83f612..afe6344 100644 --- a/tests/end-to-end.rs +++ b/tests/end-to-end.rs @@ -1,16 +1,22 @@ -use std::{sync::mpsc, thread, time::Duration}; +use std::{ + fs::OpenOptions, + io::Write, + sync::{atomic::AtomicBool, mpsc, Arc}, + thread, + time::Duration, +}; use cfdp::{ dest::DestinationHandler, filestore::NativeFilestore, - request::StaticPutRequestCacher, + request::{PutRequestOwned, StaticPutRequestCacher}, source::SourceHandler, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, EntityType, IndicationConfig, LocalEntityConfig, PduWithInfo, RemoteEntityConfig, StdCheckTimerCreator, TransactionId, UserFaultHookProvider, }; use spacepackets::{ - cfdp::{ChecksumType, ConditionCode}, + cfdp::{ChecksumType, ConditionCode, TransmissionMode}, seq_count::SeqCountProviderSyncU16, util::UnsignedByteFieldU16, }; @@ -18,6 +24,8 @@ use spacepackets::{ const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1); const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2); +const FILE_DATA: &str = "Hello World!"; + #[derive(Default)] pub struct ExampleFaultHandler {} @@ -63,18 +71,22 @@ impl UserFaultHookProvider for ExampleFaultHandler { pub struct ExampleCfdpUser { entity_type: EntityType, + completion_signal: Arc, } impl ExampleCfdpUser { - pub fn new(entity_type: EntityType) -> Self { - Self { entity_type } + pub fn new(entity_type: EntityType, completion_signal: Arc) -> Self { + Self { + entity_type, + completion_signal, + } } } impl CfdpUser for ExampleCfdpUser { fn transaction_indication(&mut self, id: &crate::TransactionId) { println!( - "{:?} entity: transaction indication for {:?}", + "{:?} entity: Transaction indication for {:?}", self.entity_type, id ); } @@ -91,11 +103,13 @@ impl CfdpUser for ExampleCfdpUser { "{:?} entity: Transaction finished: {:?}", self.entity_type, finished_params ); + self.completion_signal + .store(true, std::sync::atomic::Ordering::Relaxed); } fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams) { println!( - "{:?} entity: Metadata {:?} received", + "{:?} entity: Metadata received: {:?}", self.entity_type, md_recvd_params ); } @@ -141,7 +155,29 @@ impl CfdpUser for ExampleCfdpUser { } } -fn main() { +#[test] +fn end_to_end_test() { + // Simplified event handling using atomic signals. + let stop_signal_source = Arc::new(AtomicBool::new(false)); + let stop_signal_dest = stop_signal_source.clone(); + let stop_signal_ctrl = stop_signal_source.clone(); + + let completion_signal_source = Arc::new(AtomicBool::new(false)); + let completion_signal_source_main = completion_signal_source.clone(); + + let completion_signal_dest = Arc::new(AtomicBool::new(false)); + let completion_signal_dest_main = completion_signal_dest.clone(); + + let srcfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); + let mut file = OpenOptions::new() + .write(true) + .open(&srcfile) + .expect("opening file failed"); + file.write_all(FILE_DATA.as_bytes()) + .expect("writing file content failed"); + let destdir = tempfile::tempdir().expect("creating temp directory failed"); + let destfile = destdir.path().join("test.txt"); + let local_cfg_source = LocalEntityConfig::new( LOCAL_ID.into(), IndicationConfig::default(), @@ -168,7 +204,7 @@ fn main() { remote_cfg_of_dest, seq_count_provider, ); - let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending); + let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending, completion_signal_source); let local_cfg_dest = LocalEntityConfig::new( REMOTE_ID.into(), @@ -191,9 +227,23 @@ fn main() { remote_cfg_of_source, StdCheckTimerCreator::default(), ); - let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving); + let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest); + + let put_request = PutRequestOwned::new_regular_request( + REMOTE_ID.into(), + srcfile.to_str().expect("invaid path string"), + destfile.to_str().expect("invaid path string"), + Some(TransmissionMode::Unacknowledged), + Some(true), + ) + .expect("put request creation failed"); + + let start = std::time::Instant::now(); let jh_source = thread::spawn(move || { + source_handler + .put_request(&put_request) + .expect("put request failed"); loop { let mut next_delay = None; let mut undelayed_call_count = 0; @@ -209,12 +259,12 @@ fn main() { match source_handler.state_machine(&mut cfdp_user_source, packet_info.as_ref()) { Ok(sent_packets) => { if sent_packets == 0 { - next_delay = Some(Duration::from_millis(200)); + next_delay = Some(Duration::from_millis(50)); } } Err(e) => { println!("Source handler error: {}", e); - next_delay = Some(Duration::from_millis(200)); + next_delay = Some(Duration::from_millis(50)); } } if let Some(delay) = next_delay { @@ -222,6 +272,9 @@ fn main() { } else { undelayed_call_count += 1; } + if stop_signal_source.load(std::sync::atomic::Ordering::Relaxed) { + break; + } // Safety feature against configuration errors. if undelayed_call_count >= 200 { println!("Source handler state machine possible in permanent loop"); @@ -246,12 +299,12 @@ fn main() { match dest_handler.state_machine(&mut cfdp_user_dest, packet_info.as_ref()) { Ok(sent_packets) => { if sent_packets == 0 { - next_delay = Some(Duration::from_millis(200)); + next_delay = Some(Duration::from_millis(50)); } } Err(e) => { println!("Source handler error: {}", e); - next_delay = Some(Duration::from_millis(200)); + next_delay = Some(Duration::from_millis(50)); } } if let Some(delay) = next_delay { @@ -259,6 +312,9 @@ fn main() { } else { undelayed_call_count += 1; } + if stop_signal_dest.load(std::sync::atomic::Ordering::Relaxed) { + break; + } // Safety feature against configuration errors. if undelayed_call_count >= 200 { println!("Source handler state machine possible in permanent loop"); @@ -267,6 +323,22 @@ fn main() { } }); + loop { + if completion_signal_source_main.load(std::sync::atomic::Ordering::Relaxed) + && completion_signal_dest_main.load(std::sync::atomic::Ordering::Relaxed) + { + let file = std::fs::read_to_string(destfile).expect("reading file failed"); + assert_eq!(file, FILE_DATA); + // Stop the threads gracefully. + stop_signal_ctrl.store(true, std::sync::atomic::Ordering::Relaxed); + break; + } + if std::time::Instant::now() - start > Duration::from_secs(2) { + panic!("file transfer not finished in 2 seconds"); + } + std::thread::sleep(Duration::from_millis(50)); + } + jh_source.join().unwrap(); jh_dest.join().unwrap(); }