diff --git a/README.md b/README.md index 1b23ab8..b81211b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,5 @@ # Native Rust HDFS client -This is a proof-of-concept HDFS client written natively in Rust. All other clients I have found in any other language are simply wrappers around libhdfs and require all the same Java dependencies, so I wanted to see if I could write one from scratch given that HDFS isn't really changing very often anymore. Several basic features are working, however it is not nearly as robust and the real HDFS client. - -What this is not trying to do is implement all HDFS client/FileSystem interfaces, just things involving reading and writing data. +This is an experimental HDFS client written natively in Rust. Several basic features are working, however it is not nearly as robust and the real HDFS client. ## Supported HDFS features Here is a list of currently supported and unsupported but possible future features. @@ -12,10 +10,15 @@ Here is a list of currently supported and unsupported but possible future featur - [x] Writing - [x] Rename - [x] Delete +- [x] Basic Permissions and ownership +- [x] ACLs +- [x] Content summary +- [x] Set replication +- [x] Set timestamps ### HDFS Features - [x] Name Services -- [ ] Observer reads (state ID tracking is supported, but needs improvements on tracking Observer/Active NameNode) +- [x] Observer reads - [x] ViewFS - [x] Router based federation - [x] Erasure coded reads and writes diff --git a/python/hdfs_native/__init__.py b/python/hdfs_native/__init__.py index d64d40e..44fa40a 100644 --- a/python/hdfs_native/__init__.py +++ b/python/hdfs_native/__init__.py @@ -1,14 +1,24 @@ import io import os -from typing import TYPE_CHECKING, Dict, Iterator, Optional +from typing import TYPE_CHECKING, Dict, Iterator, List, Optional # For some reason mypy doesn't think this exists from typing_extensions import Buffer # type: ignore -from ._internal import RawClient, WriteOptions +from ._internal import ( + AclEntry, + AclStatus, + ContentSummary, + FileStatus, + RawClient, + WriteOptions, +) if TYPE_CHECKING: - from ._internal import ContentSummary, FileStatus, RawFileReader, RawFileWriter + from ._internal import ( + RawFileReader, + RawFileWriter, + ) class FileReader(io.RawIOBase): @@ -95,11 +105,11 @@ def __init__( ): self.inner = RawClient(url, config) - def get_file_info(self, path: str) -> "FileStatus": + def get_file_info(self, path: str) -> FileStatus: """Gets the file status for the file at `path`""" return self.inner.get_file_info(path) - def list_status(self, path: str, recursive: bool = False) -> Iterator["FileStatus"]: + def list_status(self, path: str, recursive: bool = False) -> Iterator[FileStatus]: """Gets the status of files rooted at `path`. If `recursive` is true, lists all files recursively.""" return self.inner.list_status(path, recursive) @@ -181,8 +191,46 @@ def set_replication(self, path: str, replication: int) -> bool: """ return self.inner.set_replication(path, replication) - def get_content_summary(self, path: str) -> "ContentSummary": + def get_content_summary(self, path: str) -> ContentSummary: """ Gets a content summary for `path` """ return self.inner.get_content_summary(path) + + def modify_acl_entries(self, path: str, entries: List[AclEntry]) -> None: + """ + Update ACL entries for file or directory at `path`. Existing entries will remain. + """ + return self.inner.modify_acl_entries(path, entries) + + def remove_acl_entries(self, path: str, entries: List[AclEntry]) -> None: + """ + Remove specific ACL entries for file or directory at `path`. + """ + return self.inner.remove_acl_entries(path, entries) + + def remove_default_acl(self, path: str) -> None: + """ + Remove all default ACLs for file or directory at `path`. + """ + return self.inner.remove_default_acl(path) + + def remove_acl(self, path: str) -> None: + """ + Remove all ACL entries for file or directory at `path`. + """ + return self.inner.remove_acl(path) + + def set_acl(self, path: str, entries: List[AclEntry]) -> None: + """ + Override all ACL entries for file or directory at `path`. If only access ACLs are provided, + default ACLs are maintained. Likewise if only default ACLs are provided, access ACLs are + maintained. + """ + return self.inner.set_acl(path, entries) + + def get_acl_status(self, path: str) -> AclStatus: + """ + Get the ACL status for the file or directory at `path`. + """ + return self.inner.get_acl_status(path) diff --git a/python/hdfs_native/_internal.pyi b/python/hdfs_native/_internal.pyi index d013466..49174dd 100644 --- a/python/hdfs_native/_internal.pyi +++ b/python/hdfs_native/_internal.pyi @@ -1,4 +1,4 @@ -from typing import Dict, Iterator, Optional +from typing import Dict, Iterator, List, Literal, Optional # For some reason mypy doesn't think this exists from typing_extensions import Buffer # type: ignore @@ -23,6 +23,31 @@ class ContentSummary: space_consumed: int space_quota: int +AclEntryType = Literal["user", "group", "mask", "other"] +AclEntryScope = Literal["access", "default"] +FsAction = Literal["---", "--x", "-w-", "-wx", "r--", "r-x", "rw-", "rwx"] + +class AclEntry: + type: AclEntryType + scope: AclEntryScope + permissions: FsAction + name: Optional[str] + + def __init__( + self, + type: AclEntryType, + scope: AclEntryScope, + permissions: FsAction, + name: Optional[str] = None, + ): ... + +class AclStatus: + owner: str + group: str + sticky: bool + entries: List[AclEntry] + permission: int + class WriteOptions: block_size: Optional[int] replication: Optional[int] @@ -85,4 +110,10 @@ class RawClient: ) -> None: ... def set_permission(self, path: str, permission: int) -> None: ... def set_replication(self, path: str, replication: int) -> bool: ... - def get_content_summary(self, path) -> ContentSummary: ... + def get_content_summary(self, path: str) -> ContentSummary: ... + def modify_acl_entries(self, path: str, entries: List[AclEntry]) -> None: ... + def remove_acl_entries(self, path: str, entries: List[AclEntry]) -> None: ... + def remove_default_acl(self, path: str) -> None: ... + def remove_acl(self, path: str) -> None: ... + def set_acl(self, path: str, entries: List[AclEntry]) -> None: ... + def get_acl_status(self, path: str) -> AclStatus: ... diff --git a/python/src/lib.rs b/python/src/lib.rs index 597ae40..9c4263e 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -9,6 +9,7 @@ use ::hdfs_native::{ Client, }; use bytes::Bytes; +use hdfs_native::acl::{AclEntry, AclStatus}; use hdfs_native::client::ContentSummary; use pyo3::{exceptions::PyRuntimeError, prelude::*}; use tokio::runtime::Runtime; @@ -131,6 +132,93 @@ impl PyContentSummary { } } +#[pyclass(get_all, frozen, name = "AclStatus")] +struct PyAclStatus { + owner: String, + group: String, + sticky: bool, + entries: Vec, + permission: u16, +} + +impl From for PyAclStatus { + fn from(value: AclStatus) -> Self { + Self { + owner: value.owner, + group: value.group, + sticky: value.sticky, + entries: value.entries.into_iter().map(PyAclEntry::from).collect(), + permission: value.permission, + } + } +} + +#[pymethods] +impl PyAclStatus { + /// Return a dataclass-esque format for the repr + fn __repr__(&self) -> String { + format!("AclStatus(owner='{}')", self.owner) + } +} + +#[pyclass(get_all, set_all, name = "AclEntry")] +#[derive(Clone, Default)] +struct PyAclEntry { + r#type: String, + scope: String, + permissions: String, + name: Option, +} + +impl From for PyAclEntry { + fn from(value: AclEntry) -> Self { + Self { + r#type: value.r#type.to_string(), + scope: value.scope.to_string(), + permissions: value.permissions.to_string(), + name: value.name, + } + } +} + +impl From for AclEntry { + fn from(value: PyAclEntry) -> Self { + Self { + r#type: value.r#type.into(), + scope: value.scope.into(), + permissions: value.permissions.into(), + name: value.name, + } + } +} + +impl FromIterator for Vec { + fn from_iter>(iter: T) -> Self { + iter.into_iter().map(AclEntry::from).collect() + } +} + +#[pymethods] +impl PyAclEntry { + #[new] + pub fn new(r#type: String, scope: String, permissions: String, name: Option) -> Self { + Self { + r#type, + scope, + permissions, + name, + } + } + + /// Return a dataclass-esque format for the repr + fn __repr__(&self) -> String { + format!( + "AclEntry(type='{}', scope='{}', permissions='{}', name='{:?}')", + self.r#type, self.scope, self.permissions, self.name + ) + } +} + #[pyclass] struct RawFileReader { inner: FileReader, @@ -400,12 +488,65 @@ impl RawClient { .allow_threads(|| self.rt.block_on(self.inner.get_content_summary(path)))? .into()) } + + pub fn modify_acl_entries( + &self, + path: &str, + acl_spec: Vec, + py: Python, + ) -> PyHdfsResult<()> { + Ok(py.allow_threads(|| { + self.rt.block_on( + self.inner + .modify_acl_entries(path, acl_spec.into_iter().collect()), + ) + })?) + } + + pub fn remove_acl_entries( + &self, + path: &str, + acl_spec: Vec, + py: Python, + ) -> PyHdfsResult<()> { + Ok(py.allow_threads(|| { + self.rt.block_on( + self.inner + .remove_acl_entries(path, acl_spec.into_iter().collect()), + ) + })?) + } + + pub fn remove_default_acl(&self, path: &str, py: Python) -> PyHdfsResult<()> { + Ok(py.allow_threads(|| self.rt.block_on(self.inner.remove_default_acl(path)))?) + } + + pub fn remove_acl(&self, path: &str, py: Python) -> PyHdfsResult<()> { + Ok(py.allow_threads(|| self.rt.block_on(self.inner.remove_acl(path)))?) + } + + pub fn set_acl(&self, path: &str, acl_spec: Vec, py: Python) -> PyHdfsResult<()> { + Ok(py.allow_threads(|| { + self.rt + .block_on(self.inner.set_acl(path, acl_spec.into_iter().collect())) + })?) + } + + pub fn get_acl_status(&self, path: &str, py: Python) -> PyHdfsResult { + Ok(py + .allow_threads(|| self.rt.block_on(self.inner.get_acl_status(path)))? + .into()) + } } /// A Python module implemented in Rust. #[pymodule] fn _internal(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/python/tests/test_integration.py b/python/tests/test_integration.py index 38af4a0..3291e52 100644 --- a/python/tests/test_integration.py +++ b/python/tests/test_integration.py @@ -1,6 +1,6 @@ import io -from hdfs_native import Client, WriteOptions +from hdfs_native import AclEntry, Client, WriteOptions def test_integration(client: Client): @@ -115,3 +115,34 @@ def test_write_options(client: Client): assert file_info.length == 0 assert file_info.permission == 0o700 assert file_info.blocksize == 1024 * 1024 + + +def test_acls(client: Client): + client.create("/test").close() + + acl_status = client.get_acl_status("/test") + assert len(acl_status.entries) == 0 + + client.modify_acl_entries("/test", [AclEntry("user", "access", "r-x", "testuser")]) + # Should be 2 entries now, a default group entry gets added as well + acl_status = client.get_acl_status("/test") + assert len(acl_status.entries) == 2 + + client.remove_acl("/test") + acl_status = client.get_acl_status("/test") + assert len(acl_status.entries) == 0 + + client.delete("/test") + + client.mkdirs("/testdir") + + client.modify_acl_entries( + "/testdir", [AclEntry("user", "default", "rwx", "testuser")] + ) + # 4 other defaults get added automatically + acl_status = client.get_acl_status("/testdir") + assert len(acl_status.entries) == 5 + + client.remove_default_acl("/testdir") + acl_status = client.get_acl_status("/testdir") + assert len(acl_status.entries) == 0 diff --git a/rust/src/acl.rs b/rust/src/acl.rs new file mode 100644 index 0000000..3a9ae74 --- /dev/null +++ b/rust/src/acl.rs @@ -0,0 +1,287 @@ +use std::fmt::Display; + +use crate::proto::hdfs::{ + acl_entry_proto::{AclEntryScopeProto, AclEntryTypeProto, FsActionProto}, + AclEntryProto, AclStatusProto, +}; + +#[derive(Clone, Debug, PartialEq)] +pub enum AclEntryType { + User, + Group, + Mask, + Other, +} + +impl From for AclEntryTypeProto { + fn from(value: AclEntryType) -> Self { + match value { + AclEntryType::User => AclEntryTypeProto::User, + AclEntryType::Group => AclEntryTypeProto::Group, + AclEntryType::Mask => AclEntryTypeProto::Mask, + AclEntryType::Other => AclEntryTypeProto::Other, + } + } +} + +impl From for AclEntryType { + fn from(value: AclEntryTypeProto) -> Self { + match value { + AclEntryTypeProto::User => AclEntryType::User, + AclEntryTypeProto::Group => AclEntryType::Group, + AclEntryTypeProto::Mask => AclEntryType::Mask, + AclEntryTypeProto::Other => AclEntryType::Other, + } + } +} + +impl From<&str> for AclEntryType { + fn from(value: &str) -> Self { + match value.to_ascii_lowercase().as_ref() { + "user" => AclEntryType::User, + "group" => AclEntryType::Group, + "mask" => AclEntryType::Mask, + "other" => AclEntryType::Other, + _ => panic!("Unknown ACL entry type {}", value), + } + } +} + +impl From for AclEntryType { + fn from(value: String) -> Self { + Self::from(value.as_ref()) + } +} + +impl Display for AclEntryType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + AclEntryType::User => "user", + AclEntryType::Group => "group", + AclEntryType::Mask => "mask", + AclEntryType::Other => "other", + } + ) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub enum AclEntryScope { + Access, + Default, +} + +impl From for AclEntryScopeProto { + fn from(value: AclEntryScope) -> Self { + match value { + AclEntryScope::Access => AclEntryScopeProto::Access, + AclEntryScope::Default => AclEntryScopeProto::Default, + } + } +} + +impl From for AclEntryScope { + fn from(value: AclEntryScopeProto) -> Self { + match value { + AclEntryScopeProto::Access => AclEntryScope::Access, + AclEntryScopeProto::Default => AclEntryScope::Default, + } + } +} + +impl From<&str> for AclEntryScope { + fn from(value: &str) -> Self { + match value.to_ascii_lowercase().as_ref() { + "access" => AclEntryScope::Access, + "default" => AclEntryScope::Default, + _ => panic!("Unknown ACL entry scope {}", value), + } + } +} + +impl From for AclEntryScope { + fn from(value: String) -> Self { + Self::from(value.as_ref()) + } +} + +impl Display for AclEntryScope { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + AclEntryScope::Access => "access", + AclEntryScope::Default => "default", + } + ) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub enum FsAction { + None = 0, + Execute = 1, + Write = 2, + WriteExecute = 3, + Read = 4, + ReadExecute = 5, + ReadWrite = 6, + PermAll = 7, +} + +impl From for FsActionProto { + fn from(value: FsAction) -> Self { + match value { + FsAction::None => FsActionProto::None, + FsAction::Execute => FsActionProto::Execute, + FsAction::Write => FsActionProto::Write, + FsAction::WriteExecute => FsActionProto::WriteExecute, + FsAction::Read => FsActionProto::Read, + FsAction::ReadExecute => FsActionProto::ReadExecute, + FsAction::ReadWrite => FsActionProto::ReadWrite, + FsAction::PermAll => FsActionProto::PermAll, + } + } +} + +impl From for FsAction { + fn from(value: FsActionProto) -> Self { + match value { + FsActionProto::None => FsAction::None, + FsActionProto::Execute => FsAction::Execute, + FsActionProto::Write => FsAction::Write, + FsActionProto::WriteExecute => FsAction::WriteExecute, + FsActionProto::Read => FsAction::Read, + FsActionProto::ReadExecute => FsAction::ReadExecute, + FsActionProto::ReadWrite => FsAction::ReadWrite, + FsActionProto::PermAll => FsAction::PermAll, + } + } +} + +impl From<&str> for FsAction { + fn from(value: &str) -> Self { + match value { + "---" => FsAction::None, + "--x" => FsAction::Execute, + "-w-" => FsAction::Write, + "-wx" => FsAction::WriteExecute, + "r--" => FsAction::Read, + "r-x" => FsAction::ReadExecute, + "rw-" => FsAction::ReadWrite, + "rwx" => FsAction::PermAll, + _ => panic!("Unknown file system permission {}", value), + } + } +} + +impl From for FsAction { + fn from(value: String) -> Self { + Self::from(value.as_ref()) + } +} + +impl Display for FsAction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + FsAction::None => "---", + FsAction::Execute => "--x", + FsAction::Write => "-w-", + FsAction::WriteExecute => "-wx", + FsAction::Read => "r--", + FsAction::ReadExecute => "r-x", + FsAction::ReadWrite => "rw-", + FsAction::PermAll => "rwx", + } + ) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct AclEntry { + pub r#type: AclEntryType, + pub scope: AclEntryScope, + pub permissions: FsAction, + pub name: Option, +} + +impl AclEntry { + /// Create a new ACL entry. + pub fn new( + r#type: impl Into, + scope: impl Into, + permissions: impl Into, + name: Option, + ) -> Self { + Self { + r#type: r#type.into(), + scope: scope.into(), + permissions: permissions.into(), + name, + } + } +} + +impl From for AclEntryProto { + fn from(value: AclEntry) -> Self { + let r#type: AclEntryTypeProto = value.r#type.into(); + let scope: AclEntryScopeProto = value.scope.into(); + let permissions: FsActionProto = value.permissions.into(); + Self { + r#type: r#type as i32, + scope: scope as i32, + permissions: permissions as i32, + name: value.name, + } + } +} + +impl FromIterator for Vec { + fn from_iter>(iter: T) -> Self { + iter.into_iter().map(AclEntryProto::from).collect() + } +} + +impl From for AclEntry { + fn from(value: AclEntryProto) -> Self { + Self { + r#type: value.r#type().into(), + scope: value.scope().into(), + permissions: value.permissions().into(), + name: value.name, + } + } +} + +impl FromIterator for Vec { + fn from_iter>(iter: T) -> Self { + iter.into_iter().map(AclEntry::from).collect() + } +} + +pub struct AclStatus { + pub owner: String, + pub group: String, + pub sticky: bool, + pub entries: Vec, + pub permission: u16, +} + +impl From for AclStatus { + fn from(value: AclStatusProto) -> Self { + Self { + owner: value.owner, + group: value.group, + sticky: value.sticky, + entries: value.entries.into_iter().collect(), + permission: value.permission.unwrap().perm as u16, + } + } +} diff --git a/rust/src/client.rs b/rust/src/client.rs index 7f0d48e..341f1eb 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -6,6 +6,7 @@ use futures::stream::BoxStream; use futures::{stream, StreamExt}; use url::Url; +use crate::acl::{AclEntry, AclStatus}; use crate::common::config::{self, Configuration}; use crate::ec::resolve_ec_policy; use crate::error::{HdfsError, Result}; @@ -497,7 +498,7 @@ impl Client { Ok(result) } - /// Gets a content summary for a file or directory rooted at `path + /// Gets a content summary for a file or directory rooted at `path`. pub async fn get_content_summary(&self, path: &str) -> Result { let (link, resolved_path) = self.mount_table.resolve(path); let result = link @@ -508,6 +509,63 @@ impl Client { Ok(result.into()) } + + /// Update ACL entries for file or directory at `path`. Existing entries will remain. + pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec) -> Result<()> { + let (link, resolved_path) = self.mount_table.resolve(path); + link.protocol + .modify_acl_entries(&resolved_path, acl_spec) + .await?; + + Ok(()) + } + + /// Remove specific ACL entries for file or directory at `path`. + pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec) -> Result<()> { + let (link, resolved_path) = self.mount_table.resolve(path); + link.protocol + .remove_acl_entries(&resolved_path, acl_spec) + .await?; + + Ok(()) + } + + /// Remove all default ACLs for file or directory at `path`. + pub async fn remove_default_acl(&self, path: &str) -> Result<()> { + let (link, resolved_path) = self.mount_table.resolve(path); + link.protocol.remove_default_acl(&resolved_path).await?; + + Ok(()) + } + + /// Remove all ACL entries for file or directory at `path`. + pub async fn remove_acl(&self, path: &str) -> Result<()> { + let (link, resolved_path) = self.mount_table.resolve(path); + link.protocol.remove_acl(&resolved_path).await?; + + Ok(()) + } + + /// Override all ACL entries for file or directory at `path`. If only access ACLs are provided, + /// default ACLs are maintained. Likewise if only default ACLs are provided, access ACLs are + /// maintained. + pub async fn set_acl(&self, path: &str, acl_spec: Vec) -> Result<()> { + let (link, resolved_path) = self.mount_table.resolve(path); + link.protocol.set_acl(&resolved_path, acl_spec).await?; + + Ok(()) + } + + /// Get the ACL status for the file or directory at `path`. + pub async fn get_acl_status(&self, path: &str) -> Result { + let (link, resolved_path) = self.mount_table.resolve(path); + Ok(link + .protocol + .get_acl_status(&resolved_path) + .await? + .result + .into()) + } } impl Default for Client { diff --git a/rust/src/hdfs/protocol.rs b/rust/src/hdfs/protocol.rs index 30d2965..a7f371c 100644 --- a/rust/src/hdfs/protocol.rs +++ b/rust/src/hdfs/protocol.rs @@ -8,6 +8,7 @@ use prost::Message; use tokio::task::JoinHandle; use uuid::Uuid; +use crate::acl::AclEntry; use crate::proto::hdfs::{ self, DataEncryptionKeyProto, FsServerDefaultsProto, GetDataEncryptionKeyResponseProto, }; @@ -48,27 +49,32 @@ impl NamenodeProtocol { } } - pub(crate) async fn get_file_info(&self, src: &str) -> Result { - let message = hdfs::GetFileInfoRequestProto { - src: src.to_string(), - }; - debug!("get_file_info request: {:?}", &message); + async fn call( + &self, + method_name: &'static str, + message: impl Message, + write: bool, + ) -> Result { + debug!("{} request: {:?}", method_name, &message); let response = self .proxy - .call( - "getFileInfo", - message.encode_length_delimited_to_vec(), - false, - ) + .call(method_name, message.encode_length_delimited_to_vec(), write) .await?; - let decoded = hdfs::GetFileInfoResponseProto::decode_length_delimited(response)?; - debug!("get_file_info response: {:?}", &decoded); + let decoded = T::decode_length_delimited(response)?; + debug!("{} response: {:?}", method_name, &decoded); Ok(decoded) } + pub(crate) async fn get_file_info(&self, src: &str) -> Result { + let message = hdfs::GetFileInfoRequestProto { + src: src.to_string(), + }; + self.call("getFileInfo", message, false).await + } + pub(crate) async fn get_listing( &self, src: &str, @@ -80,20 +86,8 @@ impl NamenodeProtocol { start_after, need_location, }; - debug!("get_listing request: {:?}", &message); - let response = self - .proxy - .call( - "getListing", - message.encode_length_delimited_to_vec(), - false, - ) - .await?; - - let decoded = hdfs::GetListingResponseProto::decode_length_delimited(response)?; - debug!("get_listing response: {:?}", &decoded); - Ok(decoded) + self.call("getListing", message, false).await } pub(crate) async fn get_located_file_info( @@ -104,37 +98,13 @@ impl NamenodeProtocol { src: Some(src.to_string()), need_block_token: Some(true), }; - debug!("getLocatedFileInfo request: {:?}", &message); - - let response = self - .proxy - .call( - "getLocatedFileInfo", - message.encode_length_delimited_to_vec(), - false, - ) - .await?; - - let decoded = hdfs::GetLocatedFileInfoResponseProto::decode_length_delimited(response)?; - debug!("getLocatedFileInfo response: {:?}", &decoded); - Ok(decoded) + self.call("getLocatedFileInfo", message, false).await } pub(crate) async fn get_server_defaults(&self) -> Result { let message = hdfs::GetServerDefaultsRequestProto::default(); - let response = self - .proxy - .call( - "getServerDefaults", - message.encode_length_delimited_to_vec(), - false, - ) - .await?; - - let decoded = hdfs::GetServerDefaultsResponseProto::decode_length_delimited(response)?; - debug!("get_server_defaults response: {:?}", &decoded); - Ok(decoded) + self.call("getServerDefaults", message, false).await } pub(crate) async fn get_cached_server_defaults(&self) -> Result { @@ -153,18 +123,7 @@ impl NamenodeProtocol { ) -> Result { let message = hdfs::GetDataEncryptionKeyRequestProto::default(); - let response = self - .proxy - .call( - "getDataEncryptionKey", - message.encode_length_delimited_to_vec(), - false, - ) - .await?; - - let decoded = hdfs::GetDataEncryptionKeyResponseProto::decode_length_delimited(response)?; - debug!("get_data_encryption_key response: {:?}", &decoded); - Ok(decoded) + self.call("getDataEncryptionKey", message, false).await } pub(crate) async fn get_cached_data_encryption_key( @@ -219,16 +178,7 @@ impl NamenodeProtocol { ..Default::default() }; - debug!("create request: {:?}", &message); - - let response = self - .proxy - .call("create", message.encode_length_delimited_to_vec(), true) - .await?; - - let decoded = hdfs::CreateResponseProto::decode_length_delimited(response)?; - debug!("create response: {:?}", &decoded); - Ok(decoded) + self.call("create", message, true).await } pub(crate) async fn append( @@ -247,16 +197,7 @@ impl NamenodeProtocol { flag: Some(flag), }; - debug!("append request: {:?}", &message); - - let response = self - .proxy - .call("append", message.encode_length_delimited_to_vec(), true) - .await?; - - let decoded = hdfs::AppendResponseProto::decode_length_delimited(response)?; - debug!("append response: {:?}", &decoded); - Ok(decoded) + self.call("append", message, true).await } pub(crate) async fn add_block( @@ -273,16 +214,20 @@ impl NamenodeProtocol { ..Default::default() }; - debug!("add_block request: {:?}", &message); + self.call("addBlock", message, true).await + } - let response = self - .proxy - .call("addBlock", message.encode_length_delimited_to_vec(), true) - .await?; + #[allow(dead_code)] + pub(crate) async fn update_block_for_pipeline( + &self, + block: hdfs::ExtendedBlockProto, + ) -> Result { + let message = hdfs::UpdateBlockForPipelineRequestProto { + block, + client_name: self.client_name.clone(), + }; - let decoded = hdfs::AddBlockResponseProto::decode_length_delimited(response)?; - debug!("add_block response: {:?}", &decoded); - Ok(decoded) + self.call("updateBlockForPipeline", message, true).await } pub(crate) async fn complete( @@ -297,16 +242,7 @@ impl NamenodeProtocol { last, file_id, }; - debug!("complete request: {:?}", &message); - - let response = self - .proxy - .call("complete", message.encode_length_delimited_to_vec(), true) - .await?; - - let decoded = hdfs::CompleteResponseProto::decode_length_delimited(response)?; - debug!("complete response: {:?}", &decoded); - Ok(decoded) + self.call("complete", message, true).await } pub(crate) async fn mkdirs( @@ -323,16 +259,7 @@ impl NamenodeProtocol { create_parent, ..Default::default() }; - debug!("mkdirs request: {:?}", &message); - - let response = self - .proxy - .call("mkdirs", message.encode_length_delimited_to_vec(), true) - .await?; - - let decoded = hdfs::MkdirsResponseProto::decode_length_delimited(response)?; - debug!("mkdirs response: {:?}", &decoded); - Ok(decoded) + self.call("mkdirs", message, true).await } pub(crate) async fn rename( @@ -347,16 +274,7 @@ impl NamenodeProtocol { overwrite_dest: overwrite, ..Default::default() }; - debug!("rename request: {:?}", &message); - - let response = self - .proxy - .call("rename2", message.encode_length_delimited_to_vec(), true) - .await?; - - let decoded = hdfs::Rename2ResponseProto::decode_length_delimited(response)?; - debug!("rename response: {:?}", &decoded); - Ok(decoded) + self.call("rename2", message, true).await } pub(crate) async fn delete( @@ -368,16 +286,7 @@ impl NamenodeProtocol { src: src.to_string(), recursive, }; - debug!("delete request: {:?}", &message); - - let response = self - .proxy - .call("delete", message.encode_length_delimited_to_vec(), true) - .await?; - - let decoded = hdfs::DeleteResponseProto::decode_length_delimited(response)?; - debug!("delete response: {:?}", &decoded); - Ok(decoded) + self.call("delete", message, true).await } pub(crate) async fn renew_lease( @@ -388,16 +297,7 @@ impl NamenodeProtocol { client_name: self.client_name.clone(), namespaces, }; - debug!("renewLease request: {:?}", &message); - - let response = self - .proxy - .call("renewLease", message.encode_length_delimited_to_vec(), true) - .await?; - - let decoded = hdfs::RenewLeaseResponseProto::decode_length_delimited(response)?; - debug!("renewLease response: {:?}", &decoded); - Ok(decoded) + self.call("renewLease", message, true).await } pub(crate) async fn set_times( @@ -411,16 +311,7 @@ impl NamenodeProtocol { mtime, atime, }; - debug!("setTimes request: {:?}", &message); - - let response = self - .proxy - .call("setTimes", message.encode_length_delimited_to_vec(), true) - .await?; - - let decoded = hdfs::SetTimesResponseProto::decode_length_delimited(response)?; - debug!("setTimes response: {:?}", &decoded); - Ok(decoded) + self.call("setTimes", message, true).await } pub(crate) async fn set_owner( @@ -435,16 +326,7 @@ impl NamenodeProtocol { groupname: group.map(str::to_string), }; - debug!("setOwner request: {:?}", &message); - - let response = self - .proxy - .call("setOwner", message.encode_length_delimited_to_vec(), true) - .await?; - - let decoded = hdfs::SetOwnerResponseProto::decode_length_delimited(response)?; - debug!("setOwner response: {:?}", &decoded); - Ok(decoded) + self.call("setOwner", message, true).await } pub(crate) async fn set_permission( @@ -456,21 +338,7 @@ impl NamenodeProtocol { src: src.to_string(), permission: hdfs::FsPermissionProto { perm: permission }, }; - - debug!("setPermission request: {:?}", &message); - - let response = self - .proxy - .call( - "setPermission", - message.encode_length_delimited_to_vec(), - true, - ) - .await?; - - let decoded = hdfs::SetPermissionResponseProto::decode_length_delimited(response)?; - debug!("setPermission response: {:?}", &decoded); - Ok(decoded) + self.call("setPermission", message, true).await } pub(crate) async fn set_replication( @@ -482,21 +350,7 @@ impl NamenodeProtocol { src: src.to_string(), replication, }; - - debug!("setReplication request: {:?}", &message); - - let response = self - .proxy - .call( - "setReplication", - message.encode_length_delimited_to_vec(), - true, - ) - .await?; - - let decoded = hdfs::SetReplicationResponseProto::decode_length_delimited(response)?; - debug!("setReplication response: {:?}", &decoded); - Ok(decoded) + self.call("setReplication", message, true).await } pub(crate) async fn get_content_summary( @@ -506,21 +360,72 @@ impl NamenodeProtocol { let message = hdfs::GetContentSummaryRequestProto { path: path.to_string(), }; + self.call("getContentSummary", message, false).await + } - debug!("getContentSummary request: {:?}", &message); + pub(crate) async fn modify_acl_entries( + &self, + path: &str, + acl_spec: Vec, + ) -> Result { + let message = hdfs::ModifyAclEntriesRequestProto { + src: path.to_string(), + acl_spec: acl_spec.into_iter().collect(), + }; - let response = self - .proxy - .call( - "getContentSummary", - message.encode_length_delimited_to_vec(), - false, - ) - .await?; + self.call("modifyAclEntries", message, false).await + } - let decoded = hdfs::GetContentSummaryResponseProto::decode_length_delimited(response)?; - debug!("getContentSummary response: {:?}", &decoded); - Ok(decoded) + pub(crate) async fn remove_acl_entries( + &self, + path: &str, + acl_spec: Vec, + ) -> Result { + let message = hdfs::RemoveAclEntriesRequestProto { + src: path.to_string(), + acl_spec: acl_spec.into_iter().collect(), + }; + + self.call("removeAclEntries", message, false).await + } + + pub(crate) async fn remove_default_acl( + &self, + path: &str, + ) -> Result { + let message = hdfs::RemoveDefaultAclRequestProto { + src: path.to_string(), + }; + self.call("removeDefaultAcl", message, false).await + } + + pub(crate) async fn remove_acl(&self, path: &str) -> Result { + let message = hdfs::RemoveAclRequestProto { + src: path.to_string(), + }; + self.call("removeAcl", message, false).await + } + + pub(crate) async fn set_acl( + &self, + path: &str, + acl_spec: Vec, + ) -> Result { + let message = hdfs::SetAclRequestProto { + src: path.to_string(), + acl_spec: acl_spec.into_iter().collect(), + }; + self.call("setAcl", message, false).await + } + + pub(crate) async fn get_acl_status( + &self, + path: &str, + ) -> Result { + let message = hdfs::GetAclStatusRequestProto { + src: path.to_string(), + }; + self.call("getAclStatus", message, false).await } } diff --git a/rust/src/lib.rs b/rust/src/lib.rs index f4cffdf..5ace5d1 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -28,7 +28,7 @@ //! # Ok(()) //! # } //! ``` - +pub mod acl; pub mod client; pub(crate) mod common; #[cfg(feature = "benchmark")] diff --git a/rust/tests/test_integration.rs b/rust/tests/test_integration.rs index 5502d0b..096de74 100644 --- a/rust/tests/test_integration.rs +++ b/rust/tests/test_integration.rs @@ -5,7 +5,9 @@ mod common; mod test { use crate::common::{assert_bufs_equal, setup, TEST_FILE_INTS}; use bytes::{BufMut, BytesMut}; - use hdfs_native::{client::FileStatus, minidfs::DfsFeatures, Client, Result, WriteOptions}; + use hdfs_native::{ + acl::AclEntry, client::FileStatus, minidfs::DfsFeatures, Client, Result, WriteOptions, + }; use serial_test::serial; use std::collections::HashSet; @@ -184,6 +186,7 @@ mod test { test_set_permission(&client).await?; test_set_replication(&client).await?; test_get_content_summary(&client).await?; + test_acls(&client).await?; Ok(()) } @@ -439,6 +442,102 @@ mod test { // Test file plus the two we made above assert_eq!(content_summary.length, TEST_FILE_INTS as u64 * 4 + 4 + 6); + client.delete("/test", false).await?; + client.delete("/test2", false).await?; + + Ok(()) + } + + async fn test_acls(client: &Client) -> Result<()> { + client + .create("/test", WriteOptions::default()) + .await? + .close() + .await?; + + let acl_status = client.get_acl_status("/test").await?; + + assert!(acl_status.entries.is_empty()); + assert!(!acl_status.sticky); + + let user_entry = AclEntry::new("user", "access", "r--", Some("testuser".to_string())); + + let group_entry = AclEntry::new("group", "access", "-w-", Some("testgroup".to_string())); + + client + .modify_acl_entries("/test", vec![user_entry.clone()]) + .await?; + + let acl_status = client.get_acl_status("/test").await?; + + // Empty group permission added automatically + assert_eq!(acl_status.entries.len(), 2, "{:?}", acl_status.entries); + assert!(acl_status.entries.contains(&user_entry)); + + client + .modify_acl_entries("/test", vec![group_entry.clone()]) + .await?; + + let acl_status = client.get_acl_status("/test").await?; + + // Still contains the empty group + assert_eq!(acl_status.entries.len(), 3, "{:?}", acl_status.entries); + assert!(acl_status.entries.contains(&user_entry)); + assert!(acl_status.entries.contains(&group_entry)); + + client + .remove_acl_entries("/test", vec![group_entry.clone()]) + .await?; + + let acl_status = client.get_acl_status("/test").await?; + + assert_eq!(acl_status.entries.len(), 2, "{:?}", acl_status.entries); + assert!(acl_status.entries.contains(&user_entry)); + assert!(!acl_status.entries.contains(&group_entry)); + + client.remove_acl("/test").await?; + + let acl_status = client.get_acl_status("/test").await?; + + assert_eq!(acl_status.entries.len(), 0); + + client.delete("/test", false).await?; + + // Default acl + client.mkdirs("/testdir", 0o755, true).await?; + + client + .modify_acl_entries( + "/testdir", + vec![AclEntry { + r#type: hdfs_native::acl::AclEntryType::User, + scope: hdfs_native::acl::AclEntryScope::Default, + permissions: hdfs_native::acl::FsAction::Read, + name: Some("testuser".to_string()), + }], + ) + .await?; + + let acl_status = client.get_acl_status("/testdir").await?; + + // All defaults get added automatically based on permissions + assert_eq!(acl_status.entries.len(), 5, "{:?}", acl_status.entries); + + client + .create("/testdir/test", WriteOptions::default()) + .await? + .close() + .await?; + + let acl_status = client.get_acl_status("/testdir/test").await?; + + // Default user acl added above plus the empty group permission + assert_eq!(acl_status.entries.len(), 2, "{:?}", acl_status.entries); + + client.remove_default_acl("/testdir").await?; + + client.delete("/testdir", true).await?; + Ok(()) } }