Skip to content

Commit

Permalink
Core: Remove cache in all tosfs APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Aug 23, 2024
1 parent a81701d commit b21db03
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 121 deletions.
132 changes: 33 additions & 99 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,10 @@ def __init__(
self.version_aware = version_aware
super().__init__(**kwargs)

def invalidate_cache(self, path: Optional[str] = None) -> None:
"""Invalidate the cache for the given path."""
if path is None:
self.dircache.clear()
else:
path = self._strip_protocol(path)
self.dircache.pop(path, None)
while path:
self.dircache.pop(path, None)
path = self._parent(path)

def ls(
self,
path: str,
detail: bool = False,
refresh: bool = False,
versions: bool = False,
**kwargs: Union[str, bool, float, None],
) -> Union[List[dict], List[str]]:
Expand All @@ -105,8 +93,6 @@ def ls(
The path to list.
detail : bool, optional
Whether to return detailed information (default is False).
refresh : bool, optional
Whether to refresh the cache (default is False).
versions : bool, optional
Whether to list object versions (default is False).
**kwargs : dict, optional
Expand Down Expand Up @@ -136,15 +122,13 @@ def ls(
"""
path = self._strip_protocol(path).rstrip("/")
if path in ["", "/"]:
files = self._lsbuckets(refresh)
files = self._lsbuckets()
return files if detail else sorted([o["name"] for o in files])

files = self._lsdir(path, refresh, versions=versions)
files = self._lsdir(path, versions=versions)
if not files and "/" in path:
try:
files = self._lsdir(
self._parent(path), refresh=refresh, versions=versions
)
files = self._lsdir(self._parent(path), versions=versions)
except IOError:
pass
files = [
Expand All @@ -160,7 +144,6 @@ def info(
path: str,
bucket: Optional[str] = None,
key: Optional[str] = None,
refresh: bool = False,
version_id: Optional[str] = None,
) -> dict:
"""Give details of entry at path.
Expand Down Expand Up @@ -192,9 +175,6 @@ def info(
"filesystem is not support version aware."
)

if not refresh and (info := self._info_from_cache(path, fullpath, version_id)):
return info

if not key:
return self._bucket_info(bucket)

Expand Down Expand Up @@ -244,41 +224,13 @@ def rmdir(self, path: str) -> None:

try:
self.tos_client.delete_object(bucket, key.rstrip("/") + "/")
self.invalidate_cache(path.rstrip("/"))
except tos.exceptions.TosClientError as e:
raise e
except tos.exceptions.TosServerError as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def _info_from_cache(
self, path: str, fullpath: str, version_id: Optional[str]
) -> dict:
out = self._ls_from_cache(fullpath)
if out is not None:
if self.version_aware and version_id is not None:
# If cached info does not match requested version_id,
# fallback to calling head_object
out = [
o
for o in out
if o["name"] == fullpath and version_id == o.get("VersionId")
]
if out:
return out[0]
else:
out = [
o
for o in out
if o["name"] == fullpath or o["name"] == fullpath.rstrip("/")
]
if out:
return out[0]
return {"name": path, "size": 0, "type": "directory"}

return {}

def _bucket_info(self, bucket: str) -> dict:
"""Get the information of a bucket.
Expand Down Expand Up @@ -413,14 +365,9 @@ def _try_dir_info(self, bucket: str, key: str, path: str, fullpath: str) -> dict
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def _lsbuckets(self, refresh: bool = False) -> List[dict]:
def _lsbuckets(self) -> List[dict]:
"""List all buckets in the account.
Parameters
----------
refresh : bool, optional
Whether to refresh the cache (default is False).
Returns
-------
List[dict]
Expand All @@ -443,38 +390,31 @@ def _lsbuckets(self, refresh: bool = False) -> List[dict]:
If there is an unknown error while listing the buckets.
"""
if "" not in self.dircache or refresh:
try:
resp = self.tos_client.list_buckets()
except tos.exceptions.TosClientError as e:
raise e
except tos.exceptions.TosServerError as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

buckets = [self._fill_bucket_info(bucket.name) for bucket in resp.buckets]
self.dircache[""] = buckets
try:
resp = self.tos_client.list_buckets()
except tos.exceptions.TosClientError as e:
raise e
except tos.exceptions.TosServerError as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

return self.dircache[""]
return [self._fill_bucket_info(bucket.name) for bucket in resp.buckets]

def _lsdir(
self,
path: str,
refresh: bool = False,
max_items: int = 1000,
delimiter: str = "/",
prefix: str = "",
versions: bool = False,
) -> List[Union[CommonPrefixInfo, ListedObject, ListedObjectVersion]]:
) -> List[dict]:
"""List objects in a directory.
Parameters
----------
path : str
The path to list.
refresh : bool, optional
Whether to refresh the cache (default is False).
max_items : int, optional
The maximum number of items to return (default is 1000).
delimiter : str, optional
Expand All @@ -486,10 +426,8 @@ def _lsdir(
Returns
-------
List[Union[CommonPrefixInfo, ListedObject, ListedObjectVersion]]
List[dict]
A list of objects in the directory.
The list may contain `CommonPrefixInfo` for directories,
`ListedObject` for files, and `ListedObjectVersion` for versioned objects.
Raises
------
Expand All @@ -508,27 +446,24 @@ def _lsdir(
prefix = ""
if key:
prefix = key.lstrip("/") + "/" + prefix
if path not in self.dircache or refresh or not delimiter or versions:
logger.debug("Get directory listing for %s", path)
dirs = []
files = []
for obj in self._listdir(
bucket,
max_items=max_items,
delimiter=delimiter,
prefix=prefix,
versions=versions,
):
if isinstance(obj, CommonPrefixInfo):
dirs.append(self._fill_common_prefix_info(obj, bucket))
else:
files.append(self._fill_object_info(obj, bucket, versions))
files += dirs

if delimiter and files and not versions:
self.dircache[path] = files
return files
return self.dircache[path]

logger.debug("Get directory listing for %s", path)
dirs = []
files = []
for obj in self._listdir(
bucket,
max_items=max_items,
delimiter=delimiter,
prefix=prefix,
versions=versions,
):
if isinstance(obj, CommonPrefixInfo):
dirs.append(self._fill_common_prefix_info(obj, bucket))
else:
files.append(self._fill_object_info(obj, bucket, versions))
files += dirs

return files

def _listdir(
self,
Expand Down Expand Up @@ -627,7 +562,6 @@ def _listdir(

def _rm(self, path: str) -> None:
bucket, key, _ = self._split_path(path)
self.invalidate_cache(path)

try:
self.tos_client.delete_object(bucket, key)
Expand Down
22 changes: 0 additions & 22 deletions tosfs/tests/test_tosfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from unittest.mock import MagicMock

import pytest
from tos.exceptions import TosServerError
Expand Down Expand Up @@ -52,26 +50,6 @@ def test_ls_dir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) ->
assert tosfs.ls(f"{bucket}/{temporary_workspace}/nonexistent", detail=False) == []


def test_ls_cache(tosfs: TosFileSystem, bucket: str) -> None:
with unittest.mock.patch.object(
tosfs.tos_client,
"list_objects_type2",
return_value=MagicMock(
is_truncated=False,
contents=[MagicMock(key="mock_key", size=123)],
common_prefixes=[],
next_continuation_token=None,
),
):
# Call ls method and get result from server
tosfs.ls(bucket, detail=False, refresh=True)
# Get result from cache
tosfs.ls(bucket, detail=False, refresh=False)

# Verify that list_objects_type2 was called only once
assert tosfs.tos_client.list_objects_type2.call_count == 1


def test_inner_rm(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
file_name = random_path()
tosfs.tos_client.put_object(bucket=bucket, key=f"{temporary_workspace}/{file_name}")
Expand Down

0 comments on commit b21db03

Please sign in to comment.