Skip to content

Commit

Permalink
Core: Optimize rm API via batch delete (#72)
Browse files Browse the repository at this point in the history
* Core: Optimize rm API via batch delete

* Fix fsspec rm test issue

* Fix code style issue
  • Loading branch information
yanghua authored Sep 11, 2024
1 parent ea46a84 commit 28870a8
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
78 changes: 78 additions & 0 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from fsspec import AbstractFileSystem
from fsspec.spec import AbstractBufferedFile
from fsspec.utils import setup_logging as setup_logger
from tos.exceptions import TosClientError, TosServerError
from tos.models import CommonPrefixInfo
from tos.models2 import (
CreateMultipartUploadOutput,
Expand Down Expand Up @@ -394,6 +395,48 @@ def rmdir(self, path: str) -> None:
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def rm(
self, path: str, recursive: bool = False, maxdepth: Optional[int] = None
) -> None:
"""Delete files.
Parameters
----------
path: str or list of str
File(s) to delete.
recursive: bool
If file(s) are directories, recursively delete contents and then
also remove the directory
maxdepth: int or None
Depth to pass to walk for finding files to delete, if recursive.
If None, there will be no limit and infinite recursion may be
possible.
"""
if isinstance(path, str):
if not self.exists(path):
raise FileNotFoundError(path)

bucket, key, _ = self._split_path(path)
if not key:
raise TosfsError(f"Cannot remove a bucket {bucket} using rm api.")

if not recursive or maxdepth:
return super().rm(path, recursive=recursive, maxdepth=maxdepth)

if self.isfile(path):
self.rm_file(path)
else:
try:
self._list_and_batch_delete_objects(bucket, key)
except (TosClientError, TosServerError) as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e
else:
for single_path in path:
self.rm(single_path, recursive=recursive, maxdepth=maxdepth)

def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None:
"""Create directory entry at path.
Expand Down Expand Up @@ -1064,6 +1107,41 @@ def glob(

return out if detail else list(out)

def _list_and_batch_delete_objects(self, bucket: str, key: str) -> None:
is_truncated = True
continuation_token = ""
all_results = []

class DeletingObject:
def __init__(self, key: str, version_id: Optional[str] = None):
self.key = key
self.version_id = version_id

while is_truncated:
resp = self.tos_client.list_objects_type2(
bucket,
prefix=key.rstrip("/") + "/",
delimiter="/",
max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS,
continuation_token=continuation_token,
)
is_truncated = resp.is_truncated
continuation_token = resp.next_continuation_token
all_results.extend(resp.contents + resp.common_prefixes)

deleting_objects = [
DeletingObject(o.key if hasattr(o, "key") else o.prefix)
for o in all_results
]

if deleting_objects:
delete_resp = self.tos_client.delete_multi_objects(
bucket, deleting_objects, quiet=True
)
if delete_resp.error:
for d in delete_resp.error:
logger.warning("Deleted object: %s failed", d)

def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None:
"""Copy file between locations on tos.
Expand Down
35 changes: 35 additions & 0 deletions tosfs/tests/test_tosfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,41 @@ def test_glob(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> No
)


def test_rm(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
file_name = random_str()
dir_name = random_str()
sub_dir_name = random_str()
sub_file_name = random_str()

# Test Non-Recursive Deletion
tosfs.touch(f"{bucket}/{temporary_workspace}/{file_name}")
assert tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}")
tosfs.rm(f"{bucket}/{temporary_workspace}/{file_name}")
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}")

# Test Recursive Deletion
tosfs.makedirs(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}")
tosfs.touch(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_file_name}")
tosfs.touch(
f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}"
)
assert tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_file_name}")
assert tosfs.exists(
f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}"
)
tosfs.rm(f"{bucket}/{temporary_workspace}/{dir_name}", recursive=True)
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}")
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}")

# Test Deletion of Non-Existent Path
with pytest.raises(FileNotFoundError):
tosfs.rm(f"{bucket}/{temporary_workspace}/nonexistent")

# Test Deletion of Bucket
with pytest.raises(TosfsError):
tosfs.rm(bucket)


###########################################################
# File operation tests #
###########################################################
Expand Down

0 comments on commit 28870a8

Please sign in to comment.