diff --git a/rust/src/hdfs/connection.rs b/rust/src/hdfs/connection.rs index f5bda50..0bc73d4 100644 --- a/rust/src/hdfs/connection.rs +++ b/rust/src/hdfs/connection.rs @@ -50,7 +50,7 @@ async fn connect(addr: &str) -> Result { #[derive(Debug)] pub(crate) struct AlignmentContext { state_id: i64, - router_federated_state: Option>, + router_federated_state: Option>, } impl AlignmentContext { @@ -67,11 +67,11 @@ impl AlignmentContext { let new_map = hdfs::RouterFederatedStateProto::decode(Bytes::from(new_router_state))? .namespace_state_ids; - let mut current_map = if let Some(current_state) = self.router_federated_state.take() { - hdfs::RouterFederatedStateProto::decode(Bytes::from(current_state))? - .namespace_state_ids + let current_map = if let Some(cur) = self.router_federated_state.as_mut() { + cur } else { - HashMap::new() + self.router_federated_state = Some(HashMap::new()); + self.router_federated_state.as_mut().unwrap() }; for (key, value) in new_map.into_iter() { @@ -80,16 +80,19 @@ impl AlignmentContext { i64::max(value, *current_map.get(&key).unwrap_or(&i64::MIN)), ); } - self.router_federated_state = Some( - hdfs::RouterFederatedStateProto { - namespace_state_ids: current_map, - } - .encode_to_vec(), - ); } Ok(()) } + + fn encode_router_state(&self) -> Option> { + self.router_federated_state.as_ref().map(|state| { + hdfs::RouterFederatedStateProto { + namespace_state_ids: state.clone(), + } + .encode_to_vec() + }) + } } impl Default for AlignmentContext { @@ -209,7 +212,7 @@ impl RpcConnection { client_id: self.client_id.clone(), retry_count: Some(retry_count), state_id: Some(context.state_id), - router_federated_state: context.router_federated_state.clone(), + router_federated_state: context.encode_router_state(), ..Default::default() } } @@ -660,7 +663,6 @@ impl DatanodeConnection { mod test { use std::collections::HashMap; - use bytes::Bytes; use prost::Message; use crate::{hdfs::connection::MAX_PACKET_HEADER_SIZE, proto::hdfs}; @@ -685,14 +687,6 @@ mod test { .encode_to_vec() } - fn get_router_state(alignment_context: &AlignmentContext) -> HashMap { - hdfs::RouterFederatedStateProto::decode(Bytes::from( - alignment_context.router_federated_state.clone().unwrap(), - )) - .unwrap() - .namespace_state_ids - } - #[test] fn test_router_federated_state() { let mut alignment_context = AlignmentContext::default(); @@ -708,7 +702,7 @@ mod test { assert!(alignment_context.router_federated_state.is_some()); - let router_state = get_router_state(&alignment_context); + let router_state = alignment_context.router_federated_state.as_ref().unwrap(); assert_eq!(router_state.len(), 1); assert_eq!(*router_state.get("ns-1").unwrap(), 3); @@ -720,7 +714,7 @@ mod test { .update(None, Some(encode_router_state(&state_map))) .unwrap(); - let router_state = get_router_state(&alignment_context); + let router_state = alignment_context.router_federated_state.as_ref().unwrap(); assert_eq!(router_state.len(), 2); assert_eq!(*router_state.get("ns-1").unwrap(), 5);