From cd03e5d18a0c8a2a734770aecb6f8b564683c2ad Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 9 Sep 2024 21:19:52 +0200 Subject: [PATCH] dest handler cancel handling --- src/dest.rs | 43 +++++++-- src/source.rs | 256 +++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 238 insertions(+), 61 deletions(-) diff --git a/src/dest.rs b/src/dest.rs index b6fbc13..af64f47 100644 --- a/src/dest.rs +++ b/src/dest.rs @@ -92,9 +92,11 @@ struct TransferState { transaction_id: Option, metadata_params: MetadataGenericParams, progress: u64, + file_size_eof: u64, metadata_only: bool, condition_code: ConditionCode, delivery_code: DeliveryCode, + fault_location_finished: Option, file_status: FileStatus, completion_disposition: CompletionDisposition, checksum: u32, @@ -108,9 +110,11 @@ impl Default for TransferState { transaction_id: None, metadata_params: Default::default(), progress: Default::default(), + file_size_eof: Default::default(), metadata_only: false, condition_code: ConditionCode::NoError, delivery_code: DeliveryCode::Incomplete, + fault_location_finished: None, file_status: FileStatus::Unreported, completion_disposition: CompletionDisposition::Completed, checksum: 0, @@ -241,9 +245,14 @@ pub enum DestError { /// allocation is prohibited. Furthermore, it uses the [VirtualFilestore] abstraction to allow /// usage on systems without a [std] filesystem. /// -/// This handler does not support concurrency out of the box. Instead, if concurrent handling -/// is required, it is recommended to create a new handler and run all active handlers inside a -/// thread pool, or move the newly created handler to a new thread. +/// This handler is able to deal with file copy operations to directories, similarly to how the +/// UNIX tool `cp` works. If the destination path is a directory instead of a regular full path, +/// the source path base file name will be appended to the destination path to form the resulting +/// new full path. +/// +// This handler also does not support concurrency out of the box but is flexible enough to be used +/// in different concurrent contexts. For example, you can dynamically create new handlers and +/// run them inside a thread pool, or move the newly created handler to a new thread.""" pub struct DestinationHandler< PduSender: PduSendProvider, UserFaultHook: UserFaultHookProvider, @@ -358,6 +367,10 @@ impl< } } + pub fn cancel_request(&mut self, transaction_id: &TransactionId) { + // TODO: Implement. + } + /// Returns [None] if the state machine is IDLE, and the transmission mode of the current /// request otherwise. pub fn transmission_mode(&self) -> Option { @@ -534,7 +547,14 @@ impl< let regular_transfer_finish = if eof_pdu.condition_code() == ConditionCode::NoError { self.handle_no_error_eof_pdu(&eof_pdu)? } else { - return Err(DestError::NotImplemented); + // This is an EOF (Cancel), perform Cancel Response Procedures according to chapter + // 4.6.6 of the standard. + self.trigger_notice_of_completion_cancelled(eof_pdu.condition_code()); + self.tparams.tstate.progress = eof_pdu.file_size(); + self.tparams.tstate.delivery_code = DeliveryCode::Incomplete; + self.tparams.tstate.fault_location_finished = + Some(EntityIdTlv::new(self.tparams.remote_cfg.unwrap().entity_id)); + true }; if regular_transfer_finish { self.file_transfer_complete_transition(); @@ -542,6 +562,11 @@ impl< Ok(()) } + fn trigger_notice_of_completion_cancelled(&mut self, cond_code: ConditionCode) { + self.tparams.tstate.completion_disposition = CompletionDisposition::Cancelled; + self.tparams.tstate.condition_code = cond_code; + } + /// Returns whether the transfer can be completed regularly. fn handle_no_error_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result { // CFDP 4.6.1.2.9: Declare file size error if progress exceeds file size @@ -886,14 +911,13 @@ impl< { FinishedPduCreator::new_default(pdu_header, tstate.delivery_code, tstate.file_status) } else { - // TODO: Are there cases where this ID is actually the source entity ID? - let entity_id = EntityIdTlv::new(self.local_cfg.id); - FinishedPduCreator::new_with_error( + FinishedPduCreator::new_generic( pdu_header, tstate.condition_code, tstate.delivery_code, tstate.file_status, - entity_id, + &[], + tstate.fault_location_finished, ) }; finished_pdu.write_to_bytes(&mut self.packet_buf)?; @@ -1690,4 +1714,7 @@ mod tests { .expect("EOF no error insertion failed"); tb.check_completion_indication_success(&mut test_user); } + + #[test] + fn test_tranfer_cancellation() {} } diff --git a/src/source.rs b/src/source.rs index dea3df4..9306a39 100644 --- a/src/source.rs +++ b/src/source.rs @@ -433,20 +433,52 @@ impl< { return Err(PutRequestError::FileDoesNotExist); } - self.tstate = Some(TransferState::new( - TransactionId::new( - self.local_cfg().id, - UnsignedByteField::new( - SeqCountProvider::MAX_BIT_WIDTH / 8, - self.seq_count_provider.get_and_increment().into(), - ), + + let transaction_id = TransactionId::new( + self.local_cfg().id, + UnsignedByteField::new( + SeqCountProvider::MAX_BIT_WIDTH / 8, + self.seq_count_provider.get_and_increment().into(), ), - *remote_cfg, + ); + // Both the source entity and destination entity ID field must have the same size. + // We use the larger of either the Put Request destination ID or the local entity ID + // as the size for the new entity IDs. + let larger_entity_width = core::cmp::max( + self.local_cfg.id.size(), + self.put_request_cacher.static_fields.destination_id.size(), + ); + let create_id = |cached_id: &UnsignedByteField| { + if larger_entity_width != cached_id.size() { + UnsignedByteField::new(larger_entity_width, cached_id.value_const()) + } else { + *cached_id + } + }; + + // Set PDU configuration fields which are important for generating PDUs. + self.pdu_conf + .set_source_and_dest_id( + create_id(&self.local_cfg.id), + create_id(&self.put_request_cacher.static_fields.destination_id), + ) + .unwrap(); + // Set up other PDU configuration fields. + self.pdu_conf.direction = Direction::TowardsReceiver; + self.pdu_conf.crc_flag = remote_cfg.crc_on_transmission_by_default.into(); + self.pdu_conf.transaction_seq_num = *transaction_id.seq_num(); + self.pdu_conf.trans_mode = transmission_mode; + self.fparams.segment_len = self.calculate_max_file_seg_len(remote_cfg); + + // Set up the transfer context structure. + self.tstate = Some(TransferState { + transaction_id, + remote_cfg: *remote_cfg, transmission_mode, closure_requested, - None, - None, - )); + cond_code_eof: None, + finished_params: None, + }); self.state_helper.state = super::State::Busy; Ok(()) } @@ -655,32 +687,6 @@ impl< self.pdu_conf.file_flag = LargeFileFlag::Normal } } - // Both the source entity and destination entity ID field must have the same size. - // We use the larger of either the Put Request destination ID or the local entity ID - // as the size for the new entity IDs. - let larger_entity_width = core::cmp::max( - self.local_cfg.id.size(), - self.put_request_cacher.static_fields.destination_id.size(), - ); - let create_id = |cached_id: &UnsignedByteField| { - if larger_entity_width != cached_id.size() { - UnsignedByteField::new(larger_entity_width, cached_id.value_const()) - } else { - *cached_id - } - }; - self.pdu_conf - .set_source_and_dest_id( - create_id(&self.local_cfg.id), - create_id(&self.put_request_cacher.static_fields.destination_id), - ) - .unwrap(); - // Set up other PDU configuration fields. - self.pdu_conf.direction = Direction::TowardsReceiver; - self.pdu_conf.crc_flag = tstate.remote_cfg.crc_on_transmission_by_default.into(); - self.pdu_conf.transaction_seq_num = *tstate.transaction_id.seq_num(); - self.pdu_conf.trans_mode = tstate.transmission_mode; - self.fparams.segment_len = self.calculate_max_file_seg_len(&tstate.remote_cfg); cfdp_user.transaction_indication(&tstate.transaction_id); Ok(()) } @@ -892,7 +898,7 @@ impl< PduHeader::new_no_file_data(self.pdu_conf, 0), tstate.cond_code_eof.unwrap_or(ConditionCode::NoError), checksum, - self.fparams.file_size, + self.fparams.progress, None, ); self.pdu_send_helper(&eof_pdu)?; @@ -1112,6 +1118,7 @@ mod tests { use crate::{ filestore::NativeFilestore, request::PutRequestOwned, + source::TransactionStep, tests::{basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler}, FaultHandler, IndicationConfig, PduRawWithInfo, StdCountdown, StdRemoteEntityConfigProvider, StdTimerCreator, CRC_32, @@ -1149,6 +1156,13 @@ mod tests { check_idle_on_drop: bool, } + #[allow(dead_code)] + struct TransferInfo { + id: TransactionId, + closure_requested: bool, + pdu_header: PduHeader, + } + impl SourceHandlerTestbench { fn new( crc_on_transmission_by_default: bool, @@ -1215,6 +1229,15 @@ mod tests { .all_queues_empty() } + #[allow(dead_code)] + fn test_fault_handler(&self) -> &RefCell { + self.handler.local_cfg.user_fault_hook() + } + + fn test_fault_handler_mut(&mut self) -> &mut RefCell { + self.handler.local_cfg.user_fault_hook_mut() + } + fn pdu_queue_empty(&self) -> bool { self.handler.pdu_sender.queue_empty() } @@ -1270,7 +1293,7 @@ mod tests { Some(with_closure), ) .expect("creating put request failed"); - let (closure_requested, pdu_header) = self.common_no_acked_file_transfer( + let transaction_info = self.common_no_acked_file_transfer( cfdp_user, put_request, cfdp_user.expected_file_size, @@ -1278,7 +1301,7 @@ mod tests { let mut current_offset = 0; let chunks = file_data.chunks( calculate_max_file_seg_len_for_max_packet_len_and_pdu_header( - &pdu_header, + &transaction_info.pdu_header, self.max_packet_len, None, ), @@ -1292,21 +1315,19 @@ mod tests { } self.common_eof_pdu_check( cfdp_user, - closure_requested, + transaction_info.closure_requested, cfdp_user.expected_file_size, checksum, ); - (pdu_header, fd_pdus) + (transaction_info.pdu_header, fd_pdus) } - // Returns a tuple. First parameter: Closure requested. Second parameter: PDU header of - // metadata PDU. fn common_no_acked_file_transfer( &mut self, cfdp_user: &mut TestCfdpUser, put_request: PutRequestOwned, filesize: u64, - ) -> (bool, PduHeader) { + ) -> TransferInfo { assert_eq!(cfdp_user.transaction_indication_call_count, 0); assert_eq!(cfdp_user.eof_sent_call_count, 0); @@ -1314,6 +1335,7 @@ mod tests { .expect("put_request call failed"); assert_eq!(self.handler.state(), State::Busy); assert_eq!(self.handler.step(), TransactionStep::Idle); + let id = self.handler.transaction_id().unwrap(); let sent_packets = self .handler .state_machine_no_packet(cfdp_user) @@ -1363,7 +1385,11 @@ mod tests { metadata_pdu.metadata_params().closure_requested }; assert_eq!(metadata_pdu.options(), &[]); - (closure_requested, *pdu_header) + TransferInfo { + pdu_header: *pdu_header, + closure_requested, + id, + } } fn check_next_file_pdu(&mut self, expected_offset: u64, expected_data: &[u8]) { @@ -1485,11 +1511,11 @@ mod tests { ) .expect("creating put request failed"); let mut cfdp_user = tb.create_user(0, filesize); - let (closure_requested, _) = + let transaction_info = tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize); tb.common_eof_pdu_check( &mut cfdp_user, - closure_requested, + transaction_info.closure_requested, filesize, CRC_32.digest().finalize(), ) @@ -1569,15 +1595,15 @@ mod tests { ) .expect("creating put request failed"); let mut cfdp_user = tb.create_user(0, filesize); - let (closure_requested, pdu_header) = + let transaction_info = tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize); tb.common_eof_pdu_check( &mut cfdp_user, - closure_requested, + transaction_info.closure_requested, filesize, CRC_32.digest().finalize(), ); - tb.finish_handling(&mut cfdp_user, pdu_header) + tb.finish_handling(&mut cfdp_user, transaction_info.pdu_header) } #[test] @@ -1647,11 +1673,12 @@ mod tests { ) .expect("creating put request failed"); let mut cfdp_user = tb.create_user(0, filesize); - let (closure_requested, _pdu_header) = + let transaction_info = tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize); + let expected_id = tb.handler.transaction_id().unwrap(); tb.common_eof_pdu_check( &mut cfdp_user, - closure_requested, + transaction_info.closure_requested, filesize, CRC_32.digest().finalize(), ); @@ -1669,5 +1696,128 @@ mod tests { assert_eq!(eof_pdu.condition_code(), ConditionCode::CheckLimitReached); assert_eq!(eof_pdu.file_size(), 0); assert_eq!(eof_pdu.file_checksum(), 0); + + // Cancellation fault should have been triggered. + let fault_handler = tb.test_fault_handler_mut(); + let fh_ref_mut = fault_handler.get_mut(); + assert!(!fh_ref_mut.cancellation_queue_empty()); + assert_eq!(fh_ref_mut.notice_of_cancellation_queue.len(), 1); + let (id, cond_code, progress) = fh_ref_mut.notice_of_cancellation_queue.pop_back().unwrap(); + assert_eq!(id, expected_id); + assert_eq!(cond_code, ConditionCode::CheckLimitReached); + assert_eq!(progress, 0); + fh_ref_mut.all_queues_empty(); + } + + #[test] + fn test_cancelled_transfer_empty_file() { + let fault_handler = TestFaultHandler::default(); + let test_sender = TestCfdpSender::default(); + let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512); + let filesize = 0; + let put_request = PutRequestOwned::new_regular_request( + REMOTE_ID.into(), + &tb.srcfile, + &tb.destfile, + Some(TransmissionMode::Unacknowledged), + Some(false), + ) + .expect("creating put request failed"); + let mut cfdp_user = tb.create_user(0, filesize); + assert_eq!(cfdp_user.transaction_indication_call_count, 0); + assert_eq!(cfdp_user.eof_sent_call_count, 0); + + tb.put_request(&put_request) + .expect("put_request call failed"); + assert_eq!(tb.handler.state(), State::Busy); + assert_eq!(tb.handler.step(), TransactionStep::Idle); + assert!(tb.get_next_sent_pdu().is_none()); + let id = tb.handler.transaction_id().unwrap(); + tb.handler + .cancel_request(&mut cfdp_user, &id) + .expect("transaction cancellation failed"); + assert_eq!(tb.handler.state(), State::Idle); + assert_eq!(tb.handler.step(), TransactionStep::Idle); + // EOF (Cancel) PDU will be generated + let eof_pdu = tb + .get_next_sent_pdu() + .expect("no EOF PDU generated like expected"); + assert_eq!( + eof_pdu.file_directive_type.unwrap(), + FileDirectiveType::EofPdu + ); + let eof_pdu = EofPdu::from_bytes(&eof_pdu.raw_pdu).unwrap(); + assert_eq!( + eof_pdu.condition_code(), + ConditionCode::CancelRequestReceived + ); + assert_eq!(eof_pdu.file_checksum(), 0); + assert_eq!(eof_pdu.file_size(), 0); + tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc); + } + + #[test] + fn test_cancelled_transfer_mid_transfer() { + let fault_handler = TestFaultHandler::default(); + let test_sender = TestCfdpSender::default(); + let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 128); + let mut file = OpenOptions::new() + .write(true) + .open(&tb.srcfile) + .expect("opening file failed"); + let mut rand_data = [0u8; 140]; + rand::thread_rng().fill(&mut rand_data[..]); + file.write_all(&rand_data) + .expect("writing file content failed"); + drop(file); + let put_request = PutRequestOwned::new_regular_request( + REMOTE_ID.into(), + &tb.srcfile, + &tb.destfile, + Some(TransmissionMode::Unacknowledged), + Some(false), + ) + .expect("creating put request failed"); + let file_size = rand_data.len() as u64; + let mut cfdp_user = tb.create_user(0, file_size); + let transaction_info = + tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, file_size); + let mut chunks = rand_data.chunks( + calculate_max_file_seg_len_for_max_packet_len_and_pdu_header( + &transaction_info.pdu_header, + tb.max_packet_len, + None, + ), + ); + let mut digest = CRC_32.digest(); + let first_chunk = chunks.next().expect("no chunk found"); + digest.update(first_chunk); + let checksum = digest.finalize(); + let next_packet = tb.get_next_sent_pdu().unwrap(); + assert_eq!(next_packet.pdu_type, PduType::FileData); + let fd_pdu = FileDataPdu::from_bytes(&next_packet.raw_pdu).unwrap(); + assert_eq!(fd_pdu.file_data(), &rand_data[0..first_chunk.len()]); + let expected_id = tb.handler.transaction_id().unwrap(); + assert!(tb + .handler + .cancel_request(&mut cfdp_user, &expected_id) + .expect("cancellation failed")); + assert_eq!(tb.handler.state(), State::Idle); + assert_eq!(tb.handler.step(), TransactionStep::Idle); + let next_packet = tb.get_next_sent_pdu().unwrap(); + assert_eq!(next_packet.pdu_type, PduType::FileDirective); + assert_eq!( + next_packet.file_directive_type.unwrap(), + FileDirectiveType::EofPdu + ); + // As specified in 4.11.2.2 of the standard, the file size will be the progress of the + // file copy operation so far, and the checksum is calculated for that progress. + let eof_pdu = EofPdu::from_bytes(&next_packet.raw_pdu).expect("EOF PDU creation failed"); + assert_eq!(eof_pdu.file_size(), first_chunk.len() as u64); + assert_eq!(eof_pdu.file_checksum(), checksum); + assert_eq!( + eof_pdu.condition_code(), + ConditionCode::CancelRequestReceived + ); } }