Skip to content

Commit

Permalink
[TOSFS #20] Override fssepc#info default implementation to optimize p…
Browse files Browse the repository at this point in the history
…erformance
  • Loading branch information
yanghua committed Aug 19, 2024
1 parent fa1ade8 commit fc1d5b9
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 11 deletions.
192 changes: 181 additions & 11 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
# environment variable names
ENV_NAME_TOSFS_LOGGING_LEVEL = "TOSFS_LOGGING_LEVEL"

# constants
SERVER_RESPONSE_CODE_NOT_FOUND = 404

logger = logging.getLogger("tosfs")


Expand Down Expand Up @@ -124,6 +127,172 @@ def ls(

return files if detail else sorted([o["name"] for o in files])

def info(
self,
path: str,
bucket: Optional[str] = None,
key: Optional[str] = None,
refresh: bool = False,
version_id: Optional[str] = None,
) -> dict:
"""Give details of entry at path.
Returns a single dictionary, with exactly the same information as ``ls``
would with ``detail=True``.
The default implementation should calls ls and could be overridden by a
shortcut. kwargs are passed on to ```ls()``.
Some file systems might not be able to measure the file's size, in
which case, the returned dict will include ``'size': None``.
Returns
-------
dict with keys: name (full path in the FS), size (in bytes), type (file,
directory, or something else) and other FS-specific keys.
"""
if path in ["/", ""]:
return {"name": path, "size": 0, "type": "directory"}
path = self._strip_protocol(path)
bucket, key, path_version_id = self._split_path(path)
fullpath = "/".join((bucket, key))

if version_id is not None and not self.version_aware:
raise ValueError(
"version_id cannot be specified due to the "
"filesystem is not support version aware."
)

if not refresh and (info := self._info_from_cache(path, fullpath, version_id)):
return info

if not key:
return self._bucket_info(bucket)

if info := self._object_info(bucket, key, version_id):
return info

return self._try_dir_info(bucket, key, path, fullpath)

def _info_from_cache(
self, path: str, fullpath: str, version_id: Optional[str]
) -> dict:
out = self._ls_from_cache(fullpath)
if out is not None:
if self.version_aware and version_id is not None:
# If cached info does not match requested version_id,
# fallback to calling head_object
out = [
o
for o in out
if o["name"] == fullpath and version_id == o.get("VersionId")
]
if out:
return out[0]
else:
out = [
o
for o in out
if o["name"] == fullpath or o["name"] == fullpath.rstrip("/")
]
if out:
return out[0]
return {"name": path, "size": 0, "type": "directory"}

return {}

def _bucket_info(self, bucket: str) -> dict:
"""Get the information of a bucket.
:param bucket: The bucket name.
:return: The information of the bucket.
"""
try:
self.tos_client.head_bucket(bucket)
return self._fill_bucket_info(bucket)
except tos.exceptions.TosClientError as e:
logger.error("Tosfs failed with client error: %s", e)
raise e
except tos.exceptions.TosServerError as e:
if e.status_code == SERVER_RESPONSE_CODE_NOT_FOUND:
raise FileNotFoundError(bucket) from e
else:
logger.error("Tosfs failed with server error: %s", e)
raise e
except Exception as e:
logger.error("Tosfs failed with unknown error: %s", e)
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def _object_info(
self, bucket: str, key: str, version_id: Optional[str] = None
) -> dict:
"""Get the information of an object.
:param bucket: The bucket name.
:param key: The object key.
:param version_id: The version id of the object.
:return: The information of the object.
"""
try:
out = self.tos_client.head_object(bucket, key, version_id=version_id)
return {
"ETag": out.etag or "",
"LastModified": out.last_modified or "",
"size": out.content_length or 0,
"name": "/".join((bucket, key)),
"type": "file",
"StorageClass": out.storage_class or "STANDARD",
"VersionId": out.version_id or "",
"ContentType": out.content_type or "",
}
except tos.exceptions.TosClientError as e:
logger.error("Tosfs failed with client error: %s", e)
raise e
except tos.exceptions.TosServerError as e:
if e.status_code == SERVER_RESPONSE_CODE_NOT_FOUND:
pass
else:
logger.error("Tosfs failed with server error: %s", e)
raise e
except Exception as e:
logger.error("Tosfs failed with unknown error: %s", e)
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

return {}

def _try_dir_info(self, bucket: str, key: str, path: str, fullpath: str) -> dict:
try:
# We check to see if the path is a directory by attempting to list its
# contexts. If anything is found, it is indeed a directory
out = self.tos_client.list_objects_type2(
bucket,
prefix=key.rstrip("/") + "/" if key else "",
delimiter="/",
max_keys=1,
)

if out.key_count > 0 or out.contents or out.common_prefixes:
return {
"name": fullpath,
"type": "directory",
"size": 0,
"StorageClass": "DIRECTORY",
}

raise FileNotFoundError(path)
except tos.exceptions.TosClientError as e:
logger.error("Tosfs failed with client error: %s", e)
raise e
except tos.exceptions.TosServerError as e:
logger.error("Tosfs failed with server error: %s", e)
raise e
except FileNotFoundError as e:
raise e
except Exception as e:
logger.error("Tosfs failed with unknown error: %s", e)
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def _lsbuckets(self, refresh: bool = False) -> List[dict]:
"""List all buckets in the account.
Expand All @@ -143,17 +312,7 @@ def _lsbuckets(self, refresh: bool = False) -> List[dict]:
logger.error("Tosfs failed with unknown error: %s", e)
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

buckets = [
{
"Key": bucket.name,
"Size": 0,
"StorageClass": "BUCKET",
"size": 0,
"type": "directory",
"name": bucket.name,
}
for bucket in resp.buckets
]
buckets = [self._fill_bucket_info(bucket.name) for bucket in resp.buckets]
self.dircache[""] = buckets

return self.dircache[""]
Expand Down Expand Up @@ -354,3 +513,14 @@ def _fill_object_info(
):
result["name"] += f"?versionId={obj.version_id}"
return result

@staticmethod
def _fill_bucket_info(bucket_name: str) -> dict:
return {
"Key": bucket_name,
"Size": 0,
"StorageClass": "BUCKET",
"size": 0,
"type": "directory",
"name": bucket_name,
}
22 changes: 22 additions & 0 deletions tosfs/tests/test_tosfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,25 @@ def test_inner_rm(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -
assert tosfs.ls(f"{bucket}/{temporary_workspace}", detail=False) == []

tosfs._rm(f"{bucket}/{temporary_workspace}/{file_name}")


def test_info(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
assert tosfs.info("") == {"name": "", "size": 0, "type": "directory"}
assert tosfs.info("/") == {"name": "/", "size": 0, "type": "directory"}
assert tosfs.info(bucket) == {
"Key": "proton-ci",
"Size": 0,
"StorageClass": "BUCKET",
"name": "proton-ci",
"size": 0,
"type": "directory",
}
assert tosfs.info(f"{bucket}/{temporary_workspace}") == {
"name": f"{bucket}/{temporary_workspace}",
"type": "directory",
"size": 0,
"StorageClass": "DIRECTORY",
}

with pytest.raises(FileNotFoundError):
tosfs.info(f"{bucket}/nonexistent")

0 comments on commit fc1d5b9

Please sign in to comment.