Skip to content

Commit

Permalink
Store router state as map instead of bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman committed Dec 4, 2023
1 parent b915664 commit 79f7653
Showing 1 changed file with 17 additions and 23 deletions.
40 changes: 17 additions & 23 deletions rust/src/hdfs/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn connect(addr: &str) -> Result<TcpStream> {
#[derive(Debug)]
pub(crate) struct AlignmentContext {
state_id: i64,
router_federated_state: Option<Vec<u8>>,
router_federated_state: Option<HashMap<String, i64>>,
}

impl AlignmentContext {
Expand All @@ -67,11 +67,11 @@ impl AlignmentContext {
let new_map = hdfs::RouterFederatedStateProto::decode(Bytes::from(new_router_state))?
.namespace_state_ids;

let mut current_map = if let Some(current_state) = self.router_federated_state.take() {
hdfs::RouterFederatedStateProto::decode(Bytes::from(current_state))?
.namespace_state_ids
let current_map = if let Some(cur) = self.router_federated_state.as_mut() {
cur
} else {
HashMap::new()
self.router_federated_state = Some(HashMap::new());
self.router_federated_state.as_mut().unwrap()
};

for (key, value) in new_map.into_iter() {
Expand All @@ -80,16 +80,19 @@ impl AlignmentContext {
i64::max(value, *current_map.get(&key).unwrap_or(&i64::MIN)),
);
}
self.router_federated_state = Some(
hdfs::RouterFederatedStateProto {
namespace_state_ids: current_map,
}
.encode_to_vec(),
);
}

Ok(())
}

fn encode_router_state(&self) -> Option<Vec<u8>> {
self.router_federated_state.as_ref().map(|state| {
hdfs::RouterFederatedStateProto {
namespace_state_ids: state.clone(),
}
.encode_to_vec()
})
}
}

impl Default for AlignmentContext {
Expand Down Expand Up @@ -209,7 +212,7 @@ impl RpcConnection {
client_id: self.client_id.clone(),
retry_count: Some(retry_count),
state_id: Some(context.state_id),
router_federated_state: context.router_federated_state.clone(),
router_federated_state: context.encode_router_state(),
..Default::default()
}
}
Expand Down Expand Up @@ -660,7 +663,6 @@ impl DatanodeConnection {
mod test {
use std::collections::HashMap;

use bytes::Bytes;
use prost::Message;

use crate::{hdfs::connection::MAX_PACKET_HEADER_SIZE, proto::hdfs};
Expand All @@ -685,14 +687,6 @@ mod test {
.encode_to_vec()
}

fn get_router_state(alignment_context: &AlignmentContext) -> HashMap<String, i64> {
hdfs::RouterFederatedStateProto::decode(Bytes::from(
alignment_context.router_federated_state.clone().unwrap(),
))
.unwrap()
.namespace_state_ids
}

#[test]
fn test_router_federated_state() {
let mut alignment_context = AlignmentContext::default();
Expand All @@ -708,7 +702,7 @@ mod test {

assert!(alignment_context.router_federated_state.is_some());

let router_state = get_router_state(&alignment_context);
let router_state = alignment_context.router_federated_state.as_ref().unwrap();

assert_eq!(router_state.len(), 1);
assert_eq!(*router_state.get("ns-1").unwrap(), 3);
Expand All @@ -720,7 +714,7 @@ mod test {
.update(None, Some(encode_router_state(&state_map)))
.unwrap();

let router_state = get_router_state(&alignment_context);
let router_state = alignment_context.router_federated_state.as_ref().unwrap();

assert_eq!(router_state.len(), 2);
assert_eq!(*router_state.get("ns-1").unwrap(), 5);
Expand Down

0 comments on commit 79f7653

Please sign in to comment.