Skip to content

Commit

Permalink
Core: Optimize rm API via batch delete
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Sep 11, 2024
1 parent ea46a84 commit 09c5316
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 0 deletions.
69 changes: 69 additions & 0 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,41 @@ def rmdir(self, path: str) -> None:
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def rm(
self, path: str, recursive: bool = False, maxdepth: Optional[int] = None
) -> None:
"""Delete files.
Parameters
----------
path: str or list of str
File(s) to delete.
recursive: bool
If file(s) are directories, recursively delete contents and then
also remove the directory
maxdepth: int or None
Depth to pass to walk for finding files to delete, if recursive.
If None, there will be no limit and infinite recursion may be
possible.
"""
if not self.exists(path):
raise FileNotFoundError(path)

bucket, key, _ = self._split_path(path)
if not key:
raise TosfsError(f"Cannot remove a bucket {bucket} using rm api.")

if not recursive or maxdepth:
return super().rm(path, recursive=recursive, maxdepth=maxdepth)

try:
self._list_and_batch_delete_objects(bucket, key)
except (tos.exceptions.TosClientError, tos.exceptions.TosServerError) as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None:
"""Create directory entry at path.
Expand Down Expand Up @@ -1064,6 +1099,40 @@ def glob(

return out if detail else list(out)

def _list_and_batch_delete_objects(self, bucket: str, key: str) -> None:
is_truncated = True
continuation_token = ""
all_results = []

class DeletingObject:
def __init__(self, key: str, version_id: Optional[str] = None):
self.key = key
self.version_id = version_id

while is_truncated:
resp = self.tos_client.list_objects_type2(
bucket,
prefix=key.rstrip("/") + "/",
delimiter="/",
max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS,
continuation_token=continuation_token,
)
is_truncated = resp.is_truncated
continuation_token = resp.next_continuation_token
all_results.extend(resp.contents + resp.common_prefixes)

deleting_objects = [
DeletingObject(o.key if hasattr(o, "key") else o.prefix)
for o in all_results
]

delete_resp = self.tos_client.delete_multi_objects(
bucket, deleting_objects, quiet=True
)
if delete_resp.error:
for d in delete_resp.error:
logger.warning("Deleted object: %s failed", d)

def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None:
"""Copy file between locations on tos.
Expand Down
35 changes: 35 additions & 0 deletions tosfs/tests/test_tosfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,41 @@ def test_glob(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> No
)


def test_rm(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
file_name = random_str()
dir_name = random_str()
sub_dir_name = random_str()
sub_file_name = random_str()

# Test Non-Recursive Deletion
tosfs.touch(f"{bucket}/{temporary_workspace}/{file_name}")
assert tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}")
tosfs.rm(f"{bucket}/{temporary_workspace}/{file_name}")
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}")

# Test Recursive Deletion
tosfs.makedirs(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}")
tosfs.touch(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_file_name}")
tosfs.touch(
f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}"
)
assert tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_file_name}")
assert tosfs.exists(
f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}"
)
tosfs.rm(f"{bucket}/{temporary_workspace}/{dir_name}", recursive=True)
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}")
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{dir_name}")

# Test Deletion of Non-Existent Path
with pytest.raises(FileNotFoundError):
tosfs.rm(f"{bucket}/{temporary_workspace}/nonexistent")

# Test Deletion of Bucket
with pytest.raises(TosfsError):
tosfs.rm(bucket)


###########################################################
# File operation tests #
###########################################################
Expand Down

0 comments on commit 09c5316

Please sign in to comment.