Skip to content

Commit

Permalink
Add get to CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman committed Jan 19, 2025
1 parent a0c43cc commit bde3e29
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ resolver = "2"
[workspace.dependencies]
bytes = "1"
env_logger = "0.11"
futures = "0.3"
log = "0.4"
thiserror = "2"
tokio = "1"
Expand Down
1 change: 1 addition & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ name = "hdfs_native._internal"
[dependencies]
bytes = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
hdfs-native = { path = "../rust" }
log = { workspace = true }
pyo3 = { version = "0.23", features = ["extension-module", "abi3", "abi3-py39"] }
Expand Down
13 changes: 10 additions & 3 deletions python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,16 @@ def minidfs():
child.kill()


@pytest.fixture(scope="module")
def client(minidfs: str) -> Client:
return Client(minidfs)
@pytest.fixture
def client(minidfs: str):
client = Client(minidfs)

try:
yield client
finally:
statuses = list(client.list_status("/"))
for status in statuses:
client.delete(status.path, True)


@pytest.fixture(scope="module")
Expand Down
12 changes: 12 additions & 0 deletions python/hdfs_native/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def __init__(self, inner: "RawFileReader"):
def __len__(self) -> int:
return self.inner.file_length()

def __iter__(self) -> Iterator[bytes]:
return self.read_range_stream(0, len(self))

def __enter__(self):
# Don't need to do anything special here
return self
Expand Down Expand Up @@ -82,6 +85,15 @@ def read_range(self, offset: int, len: int) -> bytes:
"""Read `len` bytes from the file starting at `offset`. Doesn't affect the position in the file"""
return self.inner.read_range(offset, len)

def read_range_stream(self, offset: int, len: int) -> Iterator[bytes]:
"""
Read `len` bytes from the file starting at `offset` as an iterator of bytes. Doesn't affect
the position in the file.
This is the most efficient way to iteratively read a file.
"""
return self.inner.read_range_stream(offset, len)

def close(self) -> None:
pass

Expand Down
6 changes: 6 additions & 0 deletions python/hdfs_native/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ class RawFileReader:
def read_range(self, offset: int, len: int) -> bytes:
"""Read `len` bytes from the file starting at `offset`. Doesn't affect the position in the file"""

def read_range_stream(self, offset: int, len: int) -> Iterator[bytes]:
"""
Read `len` bytes from the file starting at `offset` as an iterator of bytes. Doesn't affect
the position in the file.
"""

class RawFileWriter:
def write(self, buf: Buffer) -> int:
"""Writes `buf` to the file"""
Expand Down
121 changes: 120 additions & 1 deletion python/hdfs_native/cli.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import functools
import os
import re
import shutil
import sys
from argparse import ArgumentParser, Namespace
from typing import List, Optional, Sequence
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Optional, Sequence, Tuple
from urllib.parse import urlparse

from hdfs_native import Client
Expand Down Expand Up @@ -50,6 +52,42 @@ def _glob_path(client: Client, glob: str) -> List[str]:
return [glob]


def _download_file(
client: Client,
remote_src: str,
local_dst: str,
force: bool = False,
preserve: bool = False,
) -> None:
if not force and os.path.exists(local_dst):
raise FileExistsError(f"{local_dst} already exists, use --force to overwrite")

with client.read(remote_src) as remote_file:
with open(local_dst, "wb") as local_file:
for chunk in remote_file.read_range_stream(0, len(remote_file)):
local_file.write(chunk)

if preserve:
status = client.get_file_info(remote_src)
os.utime(
local_dst,
(status.access_time / 1000, status.modification_time / 1000),
)
os.chmod(local_dst, status.permission)


def _upload_file(
client: Client,
local_src: str,
remote_dst: str,
force: bool = False,
preserve: bool = False,
) -> None:
with open(local_src, "rb") as local_file:
with client.create(remote_dst) as remote_file:
shutil.copyfileobj(local_file, remote_file)


def cat(args: Namespace):
for src in args.src:
client = _client_for_url(src)
Expand Down Expand Up @@ -99,6 +137,48 @@ def chown(args: Namespace):
client.set_owner(path, owner, group)


def get(args: Namespace):
paths: List[Tuple[Client, str]] = []

for url in args.src:
client = _client_for_url(url)
for path in _glob_path(client, _path_for_url(url)):
paths.append((client, path))

dst_is_dir = os.path.isdir(args.localdst)

if len(paths) > 1 and not dst_is_dir:
raise ValueError("Destination must be directory when copying multiple files")
elif not dst_is_dir:
_download_file(
paths[0][0],
paths[0][1],
args.localdst,
force=args.force,
preserve=args.preserve,
)
else:
with ThreadPoolExecutor(args.threads) as executor:
futures = []
for client, path in paths:
filename = os.path.basename(path)

futures.append(
executor.submit(
_download_file,
client,
path,
os.path.join(args.localdst, filename),
force=args.force,
preserve=args.preserve,
)
)

# Iterate to raise any exceptions thrown
for f in as_completed(futures):
f.result()


def mkdir(args: Namespace):
create_parent = args.parent

Expand Down Expand Up @@ -193,6 +273,45 @@ def main(in_args: Optional[Sequence[str]] = None):
chown_parser.add_argument("path", nargs="+", help="File pattern to modify")
chown_parser.set_defaults(func=chown)

get_parser = subparsers.add_parser(
"get",
aliases=["copyToLocal"],
help="Copy files to a local destination",
description="""Copy files matching a pattern to a local destination.
When copying multiple files, the destination must be a directory""",
)
get_parser.add_argument(
"-p",
"--preserve",
action="store_true",
default=False,
help="Preserve timestamps and the mode",
)
get_parser.add_argument(
"-f",
"--force",
action="store_true",
default=False,
help="Overwrite the destination if it already exists",
)
get_parser.add_argument(
"-t",
"--threads",
type=int,
help="Number of threads to use",
default=1,
)
get_parser.add_argument(
"src",
nargs="+",
help="Source patterns to copy",
)
get_parser.add_argument(
"localdst",
help="Local destination to write to",
)
get_parser.set_defaults(func=get)

mkdir_parser = subparsers.add_parser(
"mkdir",
help="Create a directory",
Expand Down
40 changes: 39 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use ::hdfs_native::file::{FileReader, FileWriter};
use ::hdfs_native::WriteOptions;
Expand All @@ -9,6 +9,8 @@ use ::hdfs_native::{
Client,
};
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use hdfs_native::acl::{AclEntry, AclStatus};
use hdfs_native::client::ContentSummary;
use pyo3::{exceptions::PyRuntimeError, prelude::*};
Expand Down Expand Up @@ -220,6 +222,32 @@ impl PyAclEntry {
}
}

#[pyclass(name = "FileReadStream")]
struct PyFileReadStream {
inner: Arc<Mutex<BoxStream<'static, hdfs_native::Result<Bytes>>>>,
rt: Arc<Runtime>,
}

#[pymethods]
impl PyFileReadStream {
pub fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}

fn __next__(slf: PyRefMut<'_, Self>) -> PyHdfsResult<Option<Cow<[u8]>>> {
let inner = Arc::clone(&slf.inner);
let rt = Arc::clone(&slf.rt);
if let Some(result) = slf
.py()
.allow_threads(|| rt.block_on(inner.lock().unwrap().next()))
{
Ok(Some(Cow::from(result?.to_vec())))
} else {
Ok(None)
}
}
}

#[pyclass]
struct RawFileReader {
inner: FileReader,
Expand Down Expand Up @@ -258,6 +286,16 @@ impl RawFileReader {
.to_vec(),
))
}

pub fn read_range_stream(&self, offset: usize, len: usize) -> PyFileReadStream {
let stream = Arc::new(Mutex::new(
self.inner.read_range_stream(offset, len).boxed(),
));
PyFileReadStream {
inner: stream,
rt: Arc::clone(&self.rt),
}
}
}

#[pyclass(get_all, set_all, name = "WriteOptions")]
Expand Down
Loading

0 comments on commit bde3e29

Please sign in to comment.