Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance: Read optimize #88

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 39 additions & 65 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ def __init__(
socket_timeout=socket_timeout,
high_latency_log_threshold=high_latency_log_threshold,
credentials_provider=credentials_provider,
enable_crc=False,
enable_verify_ssl=False,
disable_encoding_meta=True,
)
self.version_aware = version_aware
self.default_block_size = (
Expand Down Expand Up @@ -1560,10 +1563,31 @@ def exists(self, path: str, **kwargs: Any) -> bool:
# if the path is a bucket
if not key:
return self._exists_bucket(bucket)
elif self.isfile(path):
return self._exists_object(bucket, key, path, version_id)
else:
return self._exists_object(bucket, key.rstrip("/") + "/", path, version_id)

try:
return retryable_func_executor(
lambda: self.tos_client.head_object(bucket, key) or True,
max_retry_num=self.max_retry_num,
)
except TosServerError as e:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND:
try:
return retryable_func_executor(
lambda: self.tos_client.head_object(
bucket, key.rstrip("/") + "/"
)
or True,
max_retry_num=self.max_retry_num,
)
except TosServerError as ex:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND:
return False
else:
raise ex
else:
raise e
except Exception as ex:
raise TosfsError(f"Tosfs failed with unknown error: {ex}") from ex

def _exists_bucket(self, bucket: str) -> bool:
"""Check if a bucket exists in the TOS.
Expand Down Expand Up @@ -1612,60 +1636,6 @@ def _exists_bucket(self, bucket: str) -> bool:
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def _exists_object(
self, bucket: str, key: str, path: str, version_id: Optional[str] = None
) -> bool:
"""Check if an object exists in the TOS.

Parameters
----------
bucket : str
The name of the bucket.
key : str
The key of the object.
path : str
The full path of the object.
version_id : str, optional
The version ID of the object (default is None).

Returns
-------
bool
True if the object exists, False otherwise.

Raises
------
tos.exceptions.TosClientError
If there is a client error while checking the object.
tos.exceptions.TosServerError
If there is a server error while checking the object.
TosfsError
If there is an unknown error while checking the object.

Examples
--------
>>> fs = TosFileSystem()
>>> fs._exists_object("mybucket", "myfile", "tos://mybucket/myfile")
True
>>> fs._exists_object("mybucket", "nonexistentfile", "tos://mybucket/nonexistentfile")
False

"""
try:
return retryable_func_executor(
lambda: self.tos_client.head_object(bucket, key) or True,
max_retry_num=self.max_retry_num,
)
except TosClientError as e:
raise e
except TosServerError as e:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND:
return False
else:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def _lsbuckets(self) -> List[dict]:
"""List all buckets in the account.

Expand Down Expand Up @@ -2003,6 +1973,7 @@ def __init__(
self.fs = fs
self.bucket = bucket
self.key = key
self.version_id = path_version_id
self.path = path
self.mode = mode
self.autocommit = autocommit
Expand Down Expand Up @@ -2163,22 +2134,25 @@ def _call_upload_part(
)

def _fetch_range(self, start: int, end: int) -> bytes:
bucket, key, version_id = self.fs._split_path(self.path)
if start == end:
logger.debug(
"skip fetch for negative range - bucket=%s,key=%s,start=%d,end=%d",
bucket,
key,
self.bucket,
self.key,
start,
end,
)
return b""
logger.debug("Fetch: %s/%s, %s-%s", bucket, key, start, end)
logger.debug("Fetch: %s/%s, %s-%s", self.bucket, self.key, start, end)

def fetch() -> bytes:
return self.fs.tos_client.get_object(
bucket, key, version_id, range_start=start, range_end=end
).read()
temp_buffer = io.BytesIO()
for chunk in self.fs.tos_client.get_object(
self.bucket, self.key, self.version_id, range_start=start, range_end=end
):
temp_buffer.write(chunk)
temp_buffer.seek(0)
return temp_buffer.read()

return retryable_func_executor(fetch, max_retry_num=self.fs.max_retry_num)

Expand Down