From fc1d5b9f464770db6eabb91f2b6eb598dd283386 Mon Sep 17 00:00:00 2001 From: yanghua Date: Mon, 19 Aug 2024 14:27:44 +0800 Subject: [PATCH] [TOSFS #20] Override fssepc#info default implementation to optimize performance --- tosfs/core.py | 192 +++++++++++++++++++++++++++++++++++--- tosfs/tests/test_tosfs.py | 22 +++++ 2 files changed, 203 insertions(+), 11 deletions(-) diff --git a/tosfs/core.py b/tosfs/core.py index d2c1049..e12493f 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -29,6 +29,9 @@ # environment variable names ENV_NAME_TOSFS_LOGGING_LEVEL = "TOSFS_LOGGING_LEVEL" +# constants +SERVER_RESPONSE_CODE_NOT_FOUND = 404 + logger = logging.getLogger("tosfs") @@ -124,6 +127,172 @@ def ls( return files if detail else sorted([o["name"] for o in files]) + def info( + self, + path: str, + bucket: Optional[str] = None, + key: Optional[str] = None, + refresh: bool = False, + version_id: Optional[str] = None, + ) -> dict: + """Give details of entry at path. + + Returns a single dictionary, with exactly the same information as ``ls`` + would with ``detail=True``. + + The default implementation should calls ls and could be overridden by a + shortcut. kwargs are passed on to ```ls()``. + + Some file systems might not be able to measure the file's size, in + which case, the returned dict will include ``'size': None``. + + Returns + ------- + dict with keys: name (full path in the FS), size (in bytes), type (file, + directory, or something else) and other FS-specific keys. + + """ + if path in ["/", ""]: + return {"name": path, "size": 0, "type": "directory"} + path = self._strip_protocol(path) + bucket, key, path_version_id = self._split_path(path) + fullpath = "/".join((bucket, key)) + + if version_id is not None and not self.version_aware: + raise ValueError( + "version_id cannot be specified due to the " + "filesystem is not support version aware." + ) + + if not refresh and (info := self._info_from_cache(path, fullpath, version_id)): + return info + + if not key: + return self._bucket_info(bucket) + + if info := self._object_info(bucket, key, version_id): + return info + + return self._try_dir_info(bucket, key, path, fullpath) + + def _info_from_cache( + self, path: str, fullpath: str, version_id: Optional[str] + ) -> dict: + out = self._ls_from_cache(fullpath) + if out is not None: + if self.version_aware and version_id is not None: + # If cached info does not match requested version_id, + # fallback to calling head_object + out = [ + o + for o in out + if o["name"] == fullpath and version_id == o.get("VersionId") + ] + if out: + return out[0] + else: + out = [ + o + for o in out + if o["name"] == fullpath or o["name"] == fullpath.rstrip("/") + ] + if out: + return out[0] + return {"name": path, "size": 0, "type": "directory"} + + return {} + + def _bucket_info(self, bucket: str) -> dict: + """Get the information of a bucket. + + :param bucket: The bucket name. + :return: The information of the bucket. + """ + try: + self.tos_client.head_bucket(bucket) + return self._fill_bucket_info(bucket) + except tos.exceptions.TosClientError as e: + logger.error("Tosfs failed with client error: %s", e) + raise e + except tos.exceptions.TosServerError as e: + if e.status_code == SERVER_RESPONSE_CODE_NOT_FOUND: + raise FileNotFoundError(bucket) from e + else: + logger.error("Tosfs failed with server error: %s", e) + raise e + except Exception as e: + logger.error("Tosfs failed with unknown error: %s", e) + raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + + def _object_info( + self, bucket: str, key: str, version_id: Optional[str] = None + ) -> dict: + """Get the information of an object. + + :param bucket: The bucket name. + :param key: The object key. + :param version_id: The version id of the object. + :return: The information of the object. + """ + try: + out = self.tos_client.head_object(bucket, key, version_id=version_id) + return { + "ETag": out.etag or "", + "LastModified": out.last_modified or "", + "size": out.content_length or 0, + "name": "/".join((bucket, key)), + "type": "file", + "StorageClass": out.storage_class or "STANDARD", + "VersionId": out.version_id or "", + "ContentType": out.content_type or "", + } + except tos.exceptions.TosClientError as e: + logger.error("Tosfs failed with client error: %s", e) + raise e + except tos.exceptions.TosServerError as e: + if e.status_code == SERVER_RESPONSE_CODE_NOT_FOUND: + pass + else: + logger.error("Tosfs failed with server error: %s", e) + raise e + except Exception as e: + logger.error("Tosfs failed with unknown error: %s", e) + raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + + return {} + + def _try_dir_info(self, bucket: str, key: str, path: str, fullpath: str) -> dict: + try: + # We check to see if the path is a directory by attempting to list its + # contexts. If anything is found, it is indeed a directory + out = self.tos_client.list_objects_type2( + bucket, + prefix=key.rstrip("/") + "/" if key else "", + delimiter="/", + max_keys=1, + ) + + if out.key_count > 0 or out.contents or out.common_prefixes: + return { + "name": fullpath, + "type": "directory", + "size": 0, + "StorageClass": "DIRECTORY", + } + + raise FileNotFoundError(path) + except tos.exceptions.TosClientError as e: + logger.error("Tosfs failed with client error: %s", e) + raise e + except tos.exceptions.TosServerError as e: + logger.error("Tosfs failed with server error: %s", e) + raise e + except FileNotFoundError as e: + raise e + except Exception as e: + logger.error("Tosfs failed with unknown error: %s", e) + raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + def _lsbuckets(self, refresh: bool = False) -> List[dict]: """List all buckets in the account. @@ -143,17 +312,7 @@ def _lsbuckets(self, refresh: bool = False) -> List[dict]: logger.error("Tosfs failed with unknown error: %s", e) raise TosfsError(f"Tosfs failed with unknown error: {e}") from e - buckets = [ - { - "Key": bucket.name, - "Size": 0, - "StorageClass": "BUCKET", - "size": 0, - "type": "directory", - "name": bucket.name, - } - for bucket in resp.buckets - ] + buckets = [self._fill_bucket_info(bucket.name) for bucket in resp.buckets] self.dircache[""] = buckets return self.dircache[""] @@ -354,3 +513,14 @@ def _fill_object_info( ): result["name"] += f"?versionId={obj.version_id}" return result + + @staticmethod + def _fill_bucket_info(bucket_name: str) -> dict: + return { + "Key": bucket_name, + "Size": 0, + "StorageClass": "BUCKET", + "size": 0, + "type": "directory", + "name": bucket_name, + } diff --git a/tosfs/tests/test_tosfs.py b/tosfs/tests/test_tosfs.py index b242fb6..6c35db5 100644 --- a/tosfs/tests/test_tosfs.py +++ b/tosfs/tests/test_tosfs.py @@ -83,3 +83,25 @@ def test_inner_rm(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) - assert tosfs.ls(f"{bucket}/{temporary_workspace}", detail=False) == [] tosfs._rm(f"{bucket}/{temporary_workspace}/{file_name}") + + +def test_info(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None: + assert tosfs.info("") == {"name": "", "size": 0, "type": "directory"} + assert tosfs.info("/") == {"name": "/", "size": 0, "type": "directory"} + assert tosfs.info(bucket) == { + "Key": "proton-ci", + "Size": 0, + "StorageClass": "BUCKET", + "name": "proton-ci", + "size": 0, + "type": "directory", + } + assert tosfs.info(f"{bucket}/{temporary_workspace}") == { + "name": f"{bucket}/{temporary_workspace}", + "type": "directory", + "size": 0, + "StorageClass": "DIRECTORY", + } + + with pytest.raises(FileNotFoundError): + tosfs.info(f"{bucket}/nonexistent")