Skip to content

Commit

Permalink
end-to-end test finished
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed Aug 22, 2024
1 parent 7a509c7 commit 7197b66
Showing 1 changed file with 86 additions and 14 deletions.
100 changes: 86 additions & 14 deletions tests/end-to-end.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
use std::{sync::mpsc, thread, time::Duration};
use std::{
fs::OpenOptions,
io::Write,
sync::{atomic::AtomicBool, mpsc, Arc},
thread,
time::Duration,
};

use cfdp::{
dest::DestinationHandler,
filestore::NativeFilestore,
request::StaticPutRequestCacher,
request::{PutRequestOwned, StaticPutRequestCacher},
source::SourceHandler,
user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams},
EntityType, IndicationConfig, LocalEntityConfig, PduWithInfo, RemoteEntityConfig,
StdCheckTimerCreator, TransactionId, UserFaultHookProvider,
};
use spacepackets::{
cfdp::{ChecksumType, ConditionCode},
cfdp::{ChecksumType, ConditionCode, TransmissionMode},
seq_count::SeqCountProviderSyncU16,
util::UnsignedByteFieldU16,
};

const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);

const FILE_DATA: &str = "Hello World!";

#[derive(Default)]
pub struct ExampleFaultHandler {}

Expand Down Expand Up @@ -63,18 +71,22 @@ impl UserFaultHookProvider for ExampleFaultHandler {

pub struct ExampleCfdpUser {
entity_type: EntityType,
completion_signal: Arc<AtomicBool>,
}

impl ExampleCfdpUser {
pub fn new(entity_type: EntityType) -> Self {
Self { entity_type }
pub fn new(entity_type: EntityType, completion_signal: Arc<AtomicBool>) -> Self {
Self {
entity_type,
completion_signal,
}
}
}

impl CfdpUser for ExampleCfdpUser {
fn transaction_indication(&mut self, id: &crate::TransactionId) {
println!(
"{:?} entity: transaction indication for {:?}",
"{:?} entity: Transaction indication for {:?}",
self.entity_type, id
);
}
Expand All @@ -91,11 +103,13 @@ impl CfdpUser for ExampleCfdpUser {
"{:?} entity: Transaction finished: {:?}",
self.entity_type, finished_params
);
self.completion_signal
.store(true, std::sync::atomic::Ordering::Relaxed);
}

fn metadata_recvd_indication(&mut self, md_recvd_params: &MetadataReceivedParams) {
println!(
"{:?} entity: Metadata {:?} received",
"{:?} entity: Metadata received: {:?}",
self.entity_type, md_recvd_params
);
}
Expand Down Expand Up @@ -141,7 +155,29 @@ impl CfdpUser for ExampleCfdpUser {
}
}

fn main() {
#[test]
fn end_to_end_test() {
// Simplified event handling using atomic signals.
let stop_signal_source = Arc::new(AtomicBool::new(false));
let stop_signal_dest = stop_signal_source.clone();
let stop_signal_ctrl = stop_signal_source.clone();

let completion_signal_source = Arc::new(AtomicBool::new(false));
let completion_signal_source_main = completion_signal_source.clone();

let completion_signal_dest = Arc::new(AtomicBool::new(false));
let completion_signal_dest_main = completion_signal_dest.clone();

let srcfile = tempfile::NamedTempFile::new().unwrap().into_temp_path();
let mut file = OpenOptions::new()
.write(true)
.open(&srcfile)
.expect("opening file failed");
file.write_all(FILE_DATA.as_bytes())
.expect("writing file content failed");
let destdir = tempfile::tempdir().expect("creating temp directory failed");
let destfile = destdir.path().join("test.txt");

let local_cfg_source = LocalEntityConfig::new(
LOCAL_ID.into(),
IndicationConfig::default(),
Expand All @@ -168,7 +204,7 @@ fn main() {
remote_cfg_of_dest,
seq_count_provider,
);
let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending);
let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending, completion_signal_source);

let local_cfg_dest = LocalEntityConfig::new(
REMOTE_ID.into(),
Expand All @@ -191,9 +227,23 @@ fn main() {
remote_cfg_of_source,
StdCheckTimerCreator::default(),
);
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving);
let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving, completion_signal_dest);

let put_request = PutRequestOwned::new_regular_request(
REMOTE_ID.into(),
srcfile.to_str().expect("invaid path string"),
destfile.to_str().expect("invaid path string"),
Some(TransmissionMode::Unacknowledged),
Some(true),
)
.expect("put request creation failed");

let start = std::time::Instant::now();

let jh_source = thread::spawn(move || {
source_handler
.put_request(&put_request)
.expect("put request failed");
loop {
let mut next_delay = None;
let mut undelayed_call_count = 0;
Expand All @@ -209,19 +259,22 @@ fn main() {
match source_handler.state_machine(&mut cfdp_user_source, packet_info.as_ref()) {
Ok(sent_packets) => {
if sent_packets == 0 {
next_delay = Some(Duration::from_millis(200));
next_delay = Some(Duration::from_millis(50));
}
}
Err(e) => {
println!("Source handler error: {}", e);
next_delay = Some(Duration::from_millis(200));
next_delay = Some(Duration::from_millis(50));
}
}
if let Some(delay) = next_delay {
thread::sleep(delay);
} else {
undelayed_call_count += 1;
}
if stop_signal_source.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
// Safety feature against configuration errors.
if undelayed_call_count >= 200 {
println!("Source handler state machine possible in permanent loop");
Expand All @@ -246,19 +299,22 @@ fn main() {
match dest_handler.state_machine(&mut cfdp_user_dest, packet_info.as_ref()) {
Ok(sent_packets) => {
if sent_packets == 0 {
next_delay = Some(Duration::from_millis(200));
next_delay = Some(Duration::from_millis(50));
}
}
Err(e) => {
println!("Source handler error: {}", e);
next_delay = Some(Duration::from_millis(200));
next_delay = Some(Duration::from_millis(50));
}
}
if let Some(delay) = next_delay {
thread::sleep(delay);
} else {
undelayed_call_count += 1;
}
if stop_signal_dest.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
// Safety feature against configuration errors.
if undelayed_call_count >= 200 {
println!("Source handler state machine possible in permanent loop");
Expand All @@ -267,6 +323,22 @@ fn main() {
}
});

loop {
if completion_signal_source_main.load(std::sync::atomic::Ordering::Relaxed)
&& completion_signal_dest_main.load(std::sync::atomic::Ordering::Relaxed)
{
let file = std::fs::read_to_string(destfile).expect("reading file failed");
assert_eq!(file, FILE_DATA);
// Stop the threads gracefully.
stop_signal_ctrl.store(true, std::sync::atomic::Ordering::Relaxed);
break;
}
if std::time::Instant::now() - start > Duration::from_secs(2) {
panic!("file transfer not finished in 2 seconds");
}
std::thread::sleep(Duration::from_millis(50));
}

jh_source.join().unwrap();
jh_dest.join().unwrap();
}

0 comments on commit 7197b66

Please sign in to comment.