Skip to content

Commit

Permalink
Don't panic if a whole read isn't consumed
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman committed Dec 5, 2024
1 parent c7d40bd commit 7d1a0d2
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions rust/src/hdfs/block_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,25 +222,36 @@ impl ReplicatedBlockStream {
))
}

async fn get_next_packet(
connection: &mut DatanodeConnection,
checksum_info: Option<ReadOpChecksumInfoProto>,
) -> Result<(PacketHeaderProto, Bytes)> {
let packet = connection.read_packet().await?;
let header = packet.header;
Ok((header, packet.get_data(&checksum_info)?))
}

fn start_packet_listener(
mut connection: DatanodeConnection,
checksum_info: Option<ReadOpChecksumInfoProto>,
sender: Sender<Result<(PacketHeaderProto, Bytes)>>,
) -> JoinHandle<Result<DatanodeConnection>> {
tokio::spawn(async move {
loop {
let packet = connection.read_packet().await?;
let header = packet.header;
let data = packet.get_data(&checksum_info)?;

// If the packet is empty it means it's the last packet
// so tell the DataNode the read was a success and finish this task
if data.is_empty() {
let next_packet = Self::get_next_packet(&mut connection, checksum_info).await;
if next_packet.as_ref().is_ok_and(|(_, data)| data.is_empty()) {
// If the packet is empty it means it's the last packet
// so tell the DataNode the read was a success and finish this task
connection.send_read_success().await?;
break;
}

sender.send(Ok((header, data))).await.unwrap();
if sender.send(next_packet).await.is_err() {
// The block reader was dropped, so just kill the listener
return Err(HdfsError::DataTransferError(
"Reader was dropped without consuming all data".to_string(),
));
}
}
Ok(connection)
})
Expand Down

0 comments on commit 7d1a0d2

Please sign in to comment.