Skip to content

Commit

Permalink
Fix clippy warnings and enforce in CI (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman authored Oct 24, 2023
1 parent 1213cd3 commit b53240a
Show file tree
Hide file tree
Showing 19 changed files with 264 additions and 260 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/rust-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ jobs:
- name: Install native libs
run: sudo apt-get install -y libkrb5-dev libgsasl-dev

# - name: build and lint with clippy
# run: cargo clippy --tests
- name: build and lint with clippy
run: cargo clippy --tests --features kerberos,token,integration-test,rs

- name: Check docs
run: cargo doc
Expand Down
46 changes: 27 additions & 19 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl MountTable {
let path = Path::new(src);
for link in self.mounts.iter() {
if let Some(resolved) = link.resolve(path) {
return (&link, resolved.to_string_lossy().into());
return (link, resolved.to_string_lossy().into());
}
}
(
Expand All @@ -112,19 +112,7 @@ impl Client {
/// host is treated as a name service that will be resolved using the HDFS config.
pub fn new(url: &str) -> Result<Self> {
let parsed_url = Url::parse(url)?;
Ok(Self::with_config(&parsed_url, Configuration::new()?)?)
}

/// Creates a new HDFS Client based on the fs.defaultFs setting.
pub fn default() -> Result<Self> {
let config = Configuration::new()?;
let url = config
.get(config::DEFAULT_FS)
.ok_or(HdfsError::InvalidArgument(format!(
"No {} setting found",
config::DEFAULT_FS
)))?;
Ok(Self::with_config(&Url::parse(&url)?, config)?)
Self::with_config(&parsed_url, Configuration::new()?)
}

fn with_config(url: &Url, config: Configuration) -> Result<Self> {
Expand Down Expand Up @@ -170,7 +158,7 @@ impl Client {
"Only hdfs mounts are supported for viewfs".to_string(),
));
}
let proxy = NameServiceProxy::new(&url, &config);
let proxy = NameServiceProxy::new(&url, config);
let protocol = Arc::new(NamenodeProtocol::new(proxy));

if let Some(prefix) = viewfs_path {
Expand Down Expand Up @@ -202,7 +190,7 @@ impl Client {
pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
let (link, resolved_path) = self.mount_table.resolve(path);
match link.protocol.get_file_info(&resolved_path).await?.fs {
Some(status) => Ok(FileStatus::from(status, &path)),
Some(status) => Ok(FileStatus::from(status, path)),
None => Err(HdfsError::FileNotFound(path.to_string())),
}
}
Expand Down Expand Up @@ -340,6 +328,26 @@ impl Client {
}
}

impl Default for Client {
/// Creates a new HDFS Client based on the fs.defaultFS setting. Panics if the config files fail to load,
/// no defaultFS is defined, or the defaultFS is invalid.
fn default() -> Self {
let config = Configuration::new().expect("Failed to load configuration");
let url = config
.get(config::DEFAULT_FS)
.ok_or(HdfsError::InvalidArgument(format!(
"No {} setting found",
config::DEFAULT_FS
)))
.expect("No fs.defaultFS config defined");
Self::with_config(
&Url::parse(&url).expect("Failed to parse fs.defaultFS"),
config,
)
.expect("Failed to create default client")
}
}

pub(crate) struct DirListingIterator {
path: String,
resolved_path: String,
Expand Down Expand Up @@ -386,14 +394,14 @@ impl DirListingIterator {
.into_iter()
.filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
.collect();
Ok(self.partial_listing.len() > 0)
Ok(!self.partial_listing.is_empty())
} else {
Err(HdfsError::FileNotFound(self.path.clone()))
}
}

pub async fn next(&mut self) -> Option<Result<FileStatus>> {
if self.partial_listing.len() == 0 && self.remaining > 0 {
if self.partial_listing.is_empty() && self.remaining > 0 {
if let Err(error) = self.get_next_batch().await {
self.remaining = 0;
return Some(Err(error));
Expand Down Expand Up @@ -482,7 +490,7 @@ impl FileStatus {
fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
let mut path = PathBuf::from(base_path);
if let Ok(relative_path) = std::str::from_utf8(&value.path) {
if relative_path.len() > 0 {
if !relative_path.is_empty() {
path.push(relative_path)
}
}
Expand Down
25 changes: 11 additions & 14 deletions rust/src/common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,33 @@ impl Configuration {
pub fn new() -> io::Result<Self> {
let mut map: HashMap<String, String> = HashMap::new();

match Self::get_conf_dir() {
Some(conf_dir) => {
for file in ["core-site.xml", "hdfs-site.xml"] {
let config_path = conf_dir.join(file);
if config_path.as_path().exists() {
Self::read_from_file(config_path.as_path())?
.into_iter()
.for_each(|(key, value)| {
map.insert(key, value);
})
}
if let Some(conf_dir) = Self::get_conf_dir() {
for file in ["core-site.xml", "hdfs-site.xml"] {
let config_path = conf_dir.join(file);
if config_path.as_path().exists() {
Self::read_from_file(config_path.as_path())?
.into_iter()
.for_each(|(key, value)| {
map.insert(key, value);
})
}
}
None => (),
}

Ok(Configuration { map })
}

/// Get a value from the config, returning None if the key wasn't defined.
pub fn get(&self, key: &str) -> Option<String> {
self.map.get(key).map(|v| v.clone())
self.map.get(key).cloned()
}

pub(crate) fn get_urls_for_nameservice(&self, nameservice: &str) -> Vec<String> {
self.map
.get(&format!("{}.{}", HA_NAMENODES_PREFIX, nameservice))
.into_iter()
.flat_map(|namenodes| {
namenodes.split(",").flat_map(|namenode_id| {
namenodes.split(',').flat_map(|namenode_id| {
self.map
.get(&format!(
"{}.{}.{}",
Expand Down
7 changes: 3 additions & 4 deletions rust/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ impl FileReader {
} else {
let offset = self.position;
self.position = usize::min(self.position + len, self.file_length());
self.read_range(offset, self.position - offset as usize)
.await
self.read_range(offset, self.position - offset).await
}
}

Expand Down Expand Up @@ -174,12 +173,12 @@ impl FileWriter {
)
.await?;

Ok(BlockWriter::new(
BlockWriter::new(
new_block.block,
self.status.blocksize() as usize,
self.server_defaults.clone(),
)
.await?)
.await
}

async fn get_block_writer(&mut self) -> Result<&mut BlockWriter> {
Expand Down
97 changes: 51 additions & 46 deletions rust/src/hdfs/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::security::sasl::{SaslReader, SaslRpcClient, SaslWriter};
use crate::security::user::UserInfo;
use crate::{HdfsError, Result};

const PROTOCOL: &'static str = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
const PROTOCOL: &str = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
const DATA_TRANSFER_VERSION: u16 = 28;

const MAX_PACKET_HEADER_SIZE: usize = 33;
Expand Down Expand Up @@ -160,30 +160,34 @@ impl RpcConnection {
call_id: i32,
retry_count: i32,
) -> common::RpcRequestHeaderProto {
let mut request_header = common::RpcRequestHeaderProto::default();
request_header.rpc_kind = Some(common::RpcKindProto::RpcProtocolBuffer as i32);
// RPC_FINAL_PACKET
request_header.rpc_op = Some(0);
request_header.call_id = call_id;
request_header.client_id = self.client_id.clone();
request_header.retry_count = Some(retry_count);
request_header.state_id = Some(self.alignment_context.state_id.load(Ordering::SeqCst));
request_header.router_federated_state = self
.alignment_context
.router_federated_state
.as_ref()
.map(|state| state.lock().unwrap().clone());
request_header
common::RpcRequestHeaderProto {
rpc_kind: Some(common::RpcKindProto::RpcProtocolBuffer as i32),
// RPC_FINAL_PACKET
rpc_op: Some(0),
call_id,
client_id: self.client_id.clone(),
retry_count: Some(retry_count),
state_id: Some(self.alignment_context.state_id.load(Ordering::SeqCst)),
router_federated_state: self
.alignment_context
.router_federated_state
.as_ref()
.map(|state| state.lock().unwrap().clone()),
..Default::default()
}
}

fn get_connection_context(&self) -> common::IpcConnectionContextProto {
let mut context = common::IpcConnectionContextProto::default();
context.protocol = Some(PROTOCOL.to_string());
let user_info = common::UserInformationProto {
effective_user: self.user_info.effective_user.clone(),
real_user: self.user_info.real_user.clone(),
};

let context = common::IpcConnectionContextProto {
protocol: Some(PROTOCOL.to_string()),
user_info: Some(user_info),
};

let mut user_info = common::UserInformationProto::default();
user_info.effective_user = self.user_info.effective_user.clone();
user_info.real_user = self.user_info.real_user.clone();
context.user_info = Some(user_info);
debug!("Connection context: {:?}", context);
context
}
Expand Down Expand Up @@ -220,11 +224,11 @@ impl RpcConnection {

let conn_header_buf = conn_header.encode_length_delimited_to_vec();

let mut msg_header = common::RequestHeaderProto::default();
msg_header.method_name = method_name.to_string();
msg_header.declaring_class_protocol_name = PROTOCOL.to_string();
msg_header.client_protocol_version = 1;

let msg_header = common::RequestHeaderProto {
method_name: method_name.to_string(),
declaring_class_protocol_name: PROTOCOL.to_string(),
client_protocol_version: 1,
};
debug!("RPC request header: {:?}", msg_header);

let header_buf = msg_header.encode_length_delimited_to_vec();
Expand Down Expand Up @@ -370,9 +374,11 @@ impl Packet {
bytes_per_checksum: u32,
max_packet_size: u32,
) -> Self {
let mut header = hdfs::PacketHeaderProto::default();
header.offset_in_block = offset;
header.seqno = seqno;
let header = hdfs::PacketHeaderProto {
offset_in_block: offset,
seqno,
..Default::default()
};

let num_chunks = Self::max_packet_chunks(bytes_per_checksum, max_packet_size);

Expand All @@ -394,8 +400,8 @@ impl Packet {
fn max_packet_chunks(bytes_per_checksum: u32, max_packet_size: u32) -> usize {
let data_size = max_packet_size as usize - MAX_PACKET_HEADER_SIZE;
let chunk_size = bytes_per_checksum as usize + CHECKSUM_BYTES;
let chunks = data_size / chunk_size;
chunks

data_size / chunk_size
}

pub(crate) fn write(&mut self, buf: &mut Bytes) {
Expand Down Expand Up @@ -470,7 +476,7 @@ pub(crate) struct DatanodeConnection {

impl DatanodeConnection {
pub(crate) async fn connect(url: &str) -> Result<Self> {
let stream = connect(&url).await?;
let stream = connect(url).await?;

let (reader, writer) = stream.into_split();

Expand Down Expand Up @@ -499,14 +505,16 @@ impl DatanodeConnection {
block: &hdfs::ExtendedBlockProto,
token: Option<common::TokenProto>,
) -> hdfs::ClientOperationHeaderProto {
let mut base_header = hdfs::BaseHeaderProto::default();
base_header.block = block.clone();
base_header.token = token;
let base_header = hdfs::BaseHeaderProto {
block: block.clone(),
token,
..Default::default()
};

let mut header = hdfs::ClientOperationHeaderProto::default();
header.base_header = base_header;
header.client_name = self.client_name.clone();
header
hdfs::ClientOperationHeaderProto {
base_header,
client_name: self.client_name.clone(),
}
}

pub(crate) async fn read_block_op_response(&mut self) -> Result<hdfs::BlockOpResponseProto> {
Expand Down Expand Up @@ -624,13 +632,10 @@ mod test {
#[test]
fn test_max_packet_header_size() {
// Create a dummy header to get its size
let mut header = hdfs::PacketHeaderProto::default();
header.offset_in_block = 0;
header.seqno = 0;
header.data_len = 0;
header.last_packet_in_block = false;
header.sync_block = Some(false);

let header = hdfs::PacketHeaderProto {
sync_block: Some(false),
..Default::default()
};
// Add 4 bytes for size of whole packet and 2 bytes for size of header
assert_eq!(MAX_PACKET_HEADER_SIZE, header.encoded_len() + 4 + 2);
}
Expand Down
Loading

0 comments on commit b53240a

Please sign in to comment.