From 7d1a0d2d1f7231ba291c79bf1535c843828cc12b Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 5 Dec 2024 07:13:49 -0500 Subject: [PATCH] Don't panic if a whole read isn't consumed --- rust/src/hdfs/block_reader.rs | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/rust/src/hdfs/block_reader.rs b/rust/src/hdfs/block_reader.rs index 1063969..ead9d23 100644 --- a/rust/src/hdfs/block_reader.rs +++ b/rust/src/hdfs/block_reader.rs @@ -222,6 +222,15 @@ impl ReplicatedBlockStream { )) } + async fn get_next_packet( + connection: &mut DatanodeConnection, + checksum_info: Option, + ) -> Result<(PacketHeaderProto, Bytes)> { + let packet = connection.read_packet().await?; + let header = packet.header; + Ok((header, packet.get_data(&checksum_info)?)) + } + fn start_packet_listener( mut connection: DatanodeConnection, checksum_info: Option, @@ -229,18 +238,20 @@ impl ReplicatedBlockStream { ) -> JoinHandle> { tokio::spawn(async move { loop { - let packet = connection.read_packet().await?; - let header = packet.header; - let data = packet.get_data(&checksum_info)?; - - // If the packet is empty it means it's the last packet - // so tell the DataNode the read was a success and finish this task - if data.is_empty() { + let next_packet = Self::get_next_packet(&mut connection, checksum_info).await; + if next_packet.as_ref().is_ok_and(|(_, data)| data.is_empty()) { + // If the packet is empty it means it's the last packet + // so tell the DataNode the read was a success and finish this task connection.send_read_success().await?; break; } - sender.send(Ok((header, data))).await.unwrap(); + if sender.send(next_packet).await.is_err() { + // The block reader was dropped, so just kill the listener + return Err(HdfsError::DataTransferError( + "Reader was dropped without consuming all data".to_string(), + )); + } } Ok(connection) })