From 6f86cc131cd8bcaefff28420f3e75ebe0794515b Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 11 Sep 2024 23:35:53 +0800 Subject: [PATCH] Infra: introduce retry func warpper --- poetry.lock | 16 +- pyproject.toml | 1 + tosfs/consts.py | 14 +- tosfs/core.py | 397 ++++++++++++++++++++++++++++----------------- tosfs/stability.py | 127 +++++++++++++++ tosfs/utils.py | 38 +---- 6 files changed, 395 insertions(+), 198 deletions(-) create mode 100644 tosfs/stability.py diff --git a/poetry.lock b/poetry.lock index fa70cdd..4b17890 100644 --- a/poetry.lock +++ b/poetry.lock @@ -613,6 +613,20 @@ pytz = "*" requests = ">=2.19.1,<3.dev0" six = "*" +[[package]] +name = "types-requests" +version = "2.32.0.20240907" +description = "Typing stubs for requests" +optional = false +python-versions = ">=3.8" +files = [ + {file = "types-requests-2.32.0.20240907.tar.gz", hash = "sha256:ff33935f061b5e81ec87997e91050f7b4af4f82027a7a7a9d9aaea04a963fdf8"}, + {file = "types_requests-2.32.0.20240907-py3-none-any.whl", hash = "sha256:1d1e79faeaf9d42def77f3c304893dea17a97cae98168ac69f3cb465516ee8da"}, +] + +[package.dependencies] +urllib3 = ">=2" + [[package]] name = "typing-extensions" version = "4.12.2" @@ -723,4 +737,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "4e5dfa8983ef184a7ecc10626b7ada201d7c852379100f28640f56a2d4a03af6" +content-hash = "2afca8d4ce9d407ec9c308bfb562ae0c046061066ad9c7a115a215e1c7941238" diff --git a/pyproject.toml b/pyproject.toml index f6140d7..68b4966 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ pytest = "==8.1.1" pytest-cov = "==5.0.0" coverage = "==7.5.0" ruff = "==0.6.0" +types-requests = "==2.32.0.20240907" [tool.pydocstyle] convention = "numpy" diff --git a/tosfs/consts.py b/tosfs/consts.py index 207a363..ef41521 100644 --- a/tosfs/consts.py +++ b/tosfs/consts.py @@ -14,18 +14,8 @@ """The module contains constants for the tosfs package.""" -# Tos server response codes -TOS_SERVER_RESPONSE_CODE_NOT_FOUND = 404 - -TOS_SERVER_RETRYABLE_ERROR_CODE_SET = { - "IncompleteBody", - "ExceedAccountQPSLimit", - "ExceedAccountRateLimit", - "ExceedBucketQPSLimit", - "ExceedBucketRateLimit", - "InternalError", - "ServiceUnavailable", -} +# Tos server response status codes +TOS_SERVER_STATUS_CODE_NOT_FOUND = 404 MANAGED_COPY_MAX_THRESHOLD = 5 * 2**30 MANAGED_COPY_MIN_THRESHOLD = 5 * 2**20 diff --git a/tosfs/core.py b/tosfs/core.py index ea142df..21e1860 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -31,7 +31,10 @@ CreateMultipartUploadOutput, ListedObject, ListedObjectVersion, + ListObjectType2Output, + ListObjectVersionsOutput, PartInfo, + UploadPartCopyOutput, UploadPartOutput, ) @@ -46,12 +49,13 @@ PART_MAX_SIZE, PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, RETRY_NUM, - TOS_SERVER_RESPONSE_CODE_NOT_FOUND, + TOS_SERVER_STATUS_CODE_NOT_FOUND, TOSFS_LOG_FORMAT, ) from tosfs.exceptions import TosfsError from tosfs.fsspec_utils import glob_translate -from tosfs.utils import find_bucket_key, get_brange, retryable_func_wrapper +from tosfs.stability import retryable_func +from tosfs.utils import find_bucket_key, get_brange logger = logging.getLogger("tosfs") @@ -98,7 +102,7 @@ def __init__( key: str = "", secret: str = "", region: Optional[str] = None, - max_retry_count: int = 20, + max_retry_num: int = 20, max_connections: int = 1024, connection_time: int = 10, socket_timeout: int = 30, @@ -122,7 +126,7 @@ def __init__( The secret access key(sk) to access the TOS service. region : str, optional The region of the TOS service. - max_retry_count : int, optional + max_retry_num : int, optional The maximum number of retries for a failed request (default is 20). max_connections : int, optional The maximum number of HTTP connections that can be opened in the @@ -161,7 +165,7 @@ def __init__( secret, endpoint_url, region, - max_retry_count=max_retry_count, + max_retry_count=0, max_connections=max_connections, connection_time=connection_time, socket_timeout=socket_timeout, @@ -174,6 +178,7 @@ def __init__( ) self.default_fill_cache = default_fill_cache self.default_cache_type = default_cache_type + self.max_retry_num = max_retry_num super().__init__(**kwargs) @@ -386,12 +391,10 @@ def rmdir(self, path: str) -> None: if len(self._listdir(bucket, max_items=1, prefix=key.rstrip("/") + "/")) > 0: raise TosfsError(f"Directory {path} is not empty.") - try: - self.tos_client.delete_object(bucket, key.rstrip("/") + "/") - except (TosClientError, TosServerError) as e: - raise e - except Exception as e: - raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + retryable_func( + lambda: self.tos_client.delete_object(bucket, key.rstrip("/") + "/"), + max_retry_num=self.max_retry_num, + ) def rm( self, path: str, recursive: bool = False, maxdepth: Optional[int] = None @@ -456,25 +459,25 @@ def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None: if not key: raise TosfsError(f"Cannot create a bucket {bucket} using mkdir api.") - try: - if create_parents: - parent = self._parent(f"{bucket}/{key}".rstrip("/") + "/") - if not self.exists(parent): - # here we need to create the parent directory recursively - self.mkdir(parent, create_parents=True) - self.tos_client.put_object(bucket, key.rstrip("/") + "/") + if create_parents: + parent = self._parent(f"{bucket}/{key}".rstrip("/") + "/") + if not self.exists(parent): + # here we need to create the parent directory recursively + self.mkdir(parent, create_parents=True) + + retryable_func( + lambda: self.tos_client.put_object(bucket, key.rstrip("/") + "/"), + max_retry_num=self.max_retry_num, + ) + else: + parent = self._parent(path) + if not self.exists(parent): + raise FileNotFoundError(f"Parent directory {parent} does not exist.") else: - parent = self._parent(path) - if not self.exists(parent): - raise FileNotFoundError( - f"Parent directory {parent} does not exist." - ) - else: - self.tos_client.put_object(bucket, key.rstrip("/") + "/") - except (TosClientError, TosServerError, FileNotFoundError) as e: - raise e - except Exception as e: - raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + retryable_func( + lambda: self.tos_client.put_object(bucket, key.rstrip("/") + "/"), + max_retry_num=self.max_retry_num, + ) def makedirs(self, path: str, exist_ok: bool = False) -> None: """Recursively make directories. @@ -534,12 +537,10 @@ def touch(self, path: str, truncate: bool = True, **kwargs: Any) -> None: if not truncate and self.exists(path): raise FileExistsError(f"File {path} already exists.") - try: - self.tos_client.put_object(bucket, key) - except (TosClientError, TosServerError) as e: - raise e - except Exception as e: - raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + retryable_func( + lambda: self.tos_client.put_object(bucket, key), + max_retry_num=self.max_retry_num, + ) def isdir(self, path: str) -> bool: """Check if the path is a directory. @@ -577,12 +578,14 @@ def isdir(self, path: str) -> bool: key = key.rstrip("/") + "/" try: - self.tos_client.head_object(bucket, key) - return True + return retryable_func( + lambda: self.tos_client.head_object(bucket, key) or True, + max_retry_num=self.max_retry_num, + ) except TosClientError as e: raise e except TosServerError as e: - if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND: + if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: return False else: raise e @@ -611,13 +614,14 @@ def isfile(self, path: str) -> bool: return False try: - # Attempt to get the object metadata - self.tos_client.head_object(bucket, key) - return True + return retryable_func( + lambda: self.tos_client.head_object(bucket, key) or True, + max_retry_num=self.max_retry_num, + ) except TosClientError as e: raise e except TosServerError as e: - if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND: + if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: return False raise e except Exception as e: @@ -674,40 +678,45 @@ def put_file( if "ContentType" not in kwargs: content_type, _ = mimetypes.guess_type(lpath) - bucket, key, _ = self._split_path(rpath) - - try: - if self.isfile(rpath): - self.makedirs(self._parent(rpath), exist_ok=True) + if self.isfile(rpath): + self.makedirs(self._parent(rpath), exist_ok=True) - if self.isdir(rpath): - rpath = os.path.join(rpath, os.path.basename(lpath)) + if self.isdir(rpath): + rpath = os.path.join(rpath, os.path.basename(lpath)) - bucket, key, _ = self._split_path(rpath) + bucket, key, _ = self._split_path(rpath) - with open(lpath, "rb") as f: - if size < min(PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, 2 * chunksize): - chunk = f.read() - self.tos_client.put_object( + with open(lpath, "rb") as f: + if size < min(PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, 2 * chunksize): + chunk = f.read() + retryable_func( + lambda: self.tos_client.put_object( bucket, key, content=chunk, content_type=content_type, - ) - else: - mpu = self.tos_client.create_multipart_upload( + ), + max_retry_num=self.max_retry_num, + ) + else: + mpu = retryable_func( + lambda: self.tos_client.create_multipart_upload( bucket, key, content_type=content_type - ) - self.tos_client.upload_part_from_file( + ), + max_retry_num=self.max_retry_num, + ) + retryable_func( + lambda: self.tos_client.upload_part_from_file( bucket, key, mpu.upload_id, file_path=lpath, part_number=1 - ) - self.tos_client.complete_multipart_upload( + ), + max_retry_num=self.max_retry_num, + ) + retryable_func( + lambda: self.tos_client.complete_multipart_upload( bucket, key, mpu.upload_id, complete_all=True - ) - except (TosClientError, TosServerError) as e: - raise e - except Exception as e: - raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + ), + max_retry_num=self.max_retry_num, + ) def get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None: """Get a file from the TOS filesystem and write to a local path. @@ -1108,12 +1117,22 @@ def __init__(self, key: str, version_id: Optional[str] = None): self.version_id = version_id while is_truncated: - resp = self.tos_client.list_objects_type2( - bucket, - prefix=key.rstrip("/") + "/", - delimiter="/", - max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS, - continuation_token=continuation_token, + + def _call_list_objects_type2( + continuation_token: str = continuation_token, + ) -> ListObjectType2Output: + return self.tos_client.list_objects_type2( + bucket, + prefix=key.rstrip("/") + "/", + delimiter="/", + max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS, + continuation_token=continuation_token, + ) + + resp = retryable_func( + _call_list_objects_type2, + args=(continuation_token,), + max_retry_num=self.max_retry_num, ) is_truncated = resp.is_truncated continuation_token = resp.next_continuation_token @@ -1125,8 +1144,11 @@ def __init__(self, key: str, version_id: Optional[str] = None): ] if deleting_objects: - delete_resp = self.tos_client.delete_multi_objects( - bucket, deleting_objects, quiet=True + delete_resp = retryable_func( + lambda: self.tos_client.delete_multi_objects( + bucket, deleting_objects, quiet=True + ), + max_retry_num=self.max_retry_num, ) if delete_resp.error: for d in delete_resp.error: @@ -1141,18 +1163,17 @@ def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None: buc2, key2, ver2 = self._split_path(path2) if ver2: raise ValueError("Cannot copy to a versioned file!") - try: - self.tos_client.copy_object( + + retryable_func( + lambda: self.tos_client.copy_object( bucket=buc2, key=key2, src_bucket=buc1, src_key=key1, src_version_id=ver1, - ) - except (TosClientError, TosServerError) as e: - raise e - except Exception as e: - raise TosfsError("Copy failed (%r -> %r): %s" % (path1, path2, e)) from e + ), + max_retry_num=self.max_retry_num, + ) def _copy_etag_preserved( self, path1: str, path2: str, size: int, total_parts: int, **kwargs: Any @@ -1164,7 +1185,10 @@ def _copy_etag_preserved( upload_id = None try: - mpu = self.tos_client.create_multipart_upload(bucket2, key2) + mpu = retryable_func( + lambda: self.tos_client.create_multipart_upload(bucket2, key2), + max_retry_num=self.max_retry_num, + ) upload_id = mpu.upload_id parts = [] @@ -1176,15 +1200,26 @@ def _copy_etag_preserved( if brange_last > size: brange_last = size - 1 - part = self.tos_client.upload_part_copy( - bucket=bucket2, - key=key2, - part_number=i, - upload_id=upload_id, - src_bucket=bucket1, - src_key=key1, - copy_source_range_start=brange_first, - copy_source_range_end=brange_last, + def _call_upload_part_copy( + i: int = i, + brange_first: int = brange_first, + brange_last: int = brange_last, + ) -> UploadPartCopyOutput: + return self.tos_client.upload_part_copy( + bucket=bucket2, + key=key2, + part_number=i, + upload_id=upload_id, + src_bucket=bucket1, + src_key=key1, + copy_source_range_start=brange_first, + copy_source_range_end=brange_last, + ) + + part = retryable_func( + _call_upload_part_copy, + args=(i, brange_first, brange_last), + max_retry_num=self.max_retry_num, ) parts.append( PartInfo( @@ -1198,9 +1233,19 @@ def _copy_etag_preserved( ) brange_first += part_size - self.tos_client.complete_multipart_upload(bucket2, key2, upload_id, parts) + retryable_func( + lambda: self.tos_client.complete_multipart_upload( + bucket2, key2, upload_id, parts + ), + max_retry_num=self.max_retry_num, + ) except Exception as e: - self.tos_client.abort_multipart_upload(bucket2, key2, upload_id) + retryable_func( + lambda: self.tos_client.abort_multipart_upload( + bucket2, key2, upload_id + ), + max_retry_num=self.max_retry_num, + ) raise TosfsError(f"Copy failed ({path1} -> {path2}): {e}") from e def _copy_managed( @@ -1227,10 +1272,16 @@ def _copy_managed( upload_id = None try: - mpu = self.tos_client.create_multipart_upload(bucket2, key2) + mpu = retryable_func( + lambda: self.tos_client.create_multipart_upload(bucket2, key2), + max_retry_num=self.max_retry_num, + ) upload_id = mpu.upload_id - out = [ - self.tos_client.upload_part_copy( + + def _call_upload_part_copy( + i: int, brange_first: int, brange_last: int + ) -> UploadPartCopyOutput: + return self.tos_client.upload_part_copy( bucket=bucket2, key=key2, part_number=i + 1, @@ -1240,6 +1291,13 @@ def _copy_managed( copy_source_range_start=brange_first, copy_source_range_end=brange_last, ) + + out = [ + retryable_func( + _call_upload_part_copy, + args=(i, brange_first, brange_last), + max_retry_num=self.max_retry_num, + ) for i, (brange_first, brange_last) in enumerate(get_brange(size, block)) ] @@ -1255,9 +1313,19 @@ def _copy_managed( for i, o in enumerate(out) ] - self.tos_client.complete_multipart_upload(bucket2, key2, upload_id, parts) + retryable_func( + lambda: self.tos_client.complete_multipart_upload( + bucket2, key2, upload_id, parts + ), + max_retry_num=self.max_retry_num, + ) except Exception as e: - self.tos_client.abort_multipart_upload(bucket2, key2, upload_id) + retryable_func( + lambda: self.tos_client.abort_multipart_upload( + bucket2, key2, upload_id + ), + max_retry_num=self.max_retry_num, + ) raise TosfsError(f"Copy failed ({path1} -> {path2}): {e}") from e def _find_file_dir( @@ -1299,19 +1367,17 @@ def _open_remote_file( range_start: int, **kwargs: Any, ) -> Tuple[BinaryIO, int]: - try: - resp = self.tos_client.get_object( + resp = retryable_func( + lambda: self.tos_client.get_object( bucket, key, version_id=version_id, range_start=range_start, **kwargs, - ) - return resp.content, resp.content_length - except (TosClientError, TosServerError) as e: - raise e - except Exception as e: - raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + ), + max_retry_num=self.max_retry_num, + ) + return resp.content, resp.content_length def _bucket_info(self, bucket: str) -> dict: """Get the information of a bucket. @@ -1345,12 +1411,15 @@ def _bucket_info(self, bucket: str) -> dict: """ try: - self.tos_client.head_bucket(bucket) + retryable_func( + lambda: self.tos_client.head_bucket(bucket), + max_retry_num=self.max_retry_num, + ) return self._fill_bucket_info(bucket) except TosClientError as e: raise e except TosServerError as e: - if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND: + if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: raise FileNotFoundError(bucket) from e else: raise e @@ -1395,7 +1464,10 @@ def _object_info( """ try: - out = self.tos_client.head_object(bucket, key, version_id=version_id) + out = retryable_func( + lambda: self.tos_client.head_object(bucket, key, version_id=version_id), + max_retry_num=self.max_retry_num, + ) return { "ETag": out.etag or "", "LastModified": out.last_modified or "", @@ -1409,7 +1481,7 @@ def _object_info( except TosClientError as e: raise e except TosServerError as e: - if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND: + if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: pass else: raise e @@ -1422,11 +1494,14 @@ def _try_dir_info(self, bucket: str, key: str, path: str, fullpath: str) -> dict try: # We check to see if the path is a directory by attempting to list its # contexts. If anything is found, it is indeed a directory - out = self.tos_client.list_objects_type2( - bucket, - prefix=key.rstrip("/") + "/" if key else "", - delimiter="/", - max_keys=1, + out = retryable_func( + lambda: self.tos_client.list_objects_type2( + bucket, + prefix=key.rstrip("/") + "/" if key else "", + delimiter="/", + max_keys=1, + ), + max_retry_num=self.max_retry_num, ) if out.key_count > 0 or out.contents or out.common_prefixes: @@ -1522,12 +1597,15 @@ def _exists_bucket(self, bucket: str) -> bool: """ try: - self.tos_client.head_bucket(bucket) + retryable_func( + lambda: self.tos_client.head_bucket(bucket), + max_retry_num=self.max_retry_num, + ) return True except TosClientError as e: raise e except TosServerError as e: - if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND: + if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: return False else: raise e @@ -1574,12 +1652,14 @@ def _exists_object( """ try: - self.tos_client.head_object(bucket, key) - return True + return retryable_func( + lambda: self.tos_client.head_object(bucket, key) or True, + max_retry_num=self.max_retry_num, + ) except TosClientError as e: raise e except TosServerError as e: - if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND: + if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: return False else: raise e @@ -1612,7 +1692,9 @@ def _lsbuckets(self) -> List[dict]: """ try: - resp = self.tos_client.list_buckets() + resp = retryable_func( + lambda: self.tos_client.list_buckets(), max_retry_num=self.max_retry_num + ) except (TosClientError, TosServerError) as e: raise e except Exception as e: @@ -1744,11 +1826,15 @@ def _listdir( all_results = [] is_truncated = True - try: - if self.version_aware: - key_marker, version_id_marker = None, None - while is_truncated: - resp = self.tos_client.list_object_versions( + if self.version_aware: + key_marker, version_id_marker = None, None + while is_truncated: + + def _call_list_object_versions( + key_marker: Optional[Any] = key_marker, + version_id_marker: Optional[Any] = version_id_marker, + ) -> ListObjectVersionsOutput: + return self.tos_client.list_object_versions( bucket, prefix, delimiter=delimiter, @@ -1756,18 +1842,28 @@ def _listdir( key_marker=key_marker, version_id_marker=version_id_marker, ) - is_truncated = resp.is_truncated - all_results.extend( - resp.versions + resp.common_prefixes + resp.delete_markers - ) - key_marker, version_id_marker = ( - resp.next_key_marker, - resp.next_version_id_marker, - ) - else: - continuation_token = "" - while is_truncated: - resp = self.tos_client.list_objects_type2( + + resp = retryable_func( + _call_list_object_versions, + args=(key_marker, version_id_marker), + max_retry_num=self.max_retry_num, + ) + is_truncated = resp.is_truncated + all_results.extend( + resp.versions + resp.common_prefixes + resp.delete_markers + ) + key_marker, version_id_marker = ( + resp.next_key_marker, + resp.next_version_id_marker, + ) + else: + continuation_token = "" + while is_truncated: + + def _call_list_objects_type2( + continuation_token: str = continuation_token, + ) -> ListObjectType2Output: + return self.tos_client.list_objects_type2( bucket, prefix, start_after=prefix if not include_self else None, @@ -1775,16 +1871,18 @@ def _listdir( max_keys=max_items, continuation_token=continuation_token, ) - is_truncated = resp.is_truncated - continuation_token = resp.next_continuation_token - all_results.extend(resp.contents + resp.common_prefixes) + resp = retryable_func( + _call_list_objects_type2, + args=(continuation_token,), + max_retry_num=self.max_retry_num, + ) + is_truncated = resp.is_truncated + continuation_token = resp.next_continuation_token - return all_results - except (TosClientError, TosServerError) as e: - raise e - except Exception as e: - raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + all_results.extend(resp.contents + resp.common_prefixes) + + return all_results def _rm(self, path: str) -> None: logger.info("Removing path: %s", path) @@ -1794,7 +1892,10 @@ def _rm(self, path: str) -> None: key = key.rstrip("/") + "/" try: - self.tos_client.delete_object(bucket, key) + retryable_func( + lambda: self.tos_client.delete_object(bucket, key), + max_retry_num=self.max_retry_num, + ) except (TosClientError, TosServerError) as e: raise e except Exception as e: @@ -2063,7 +2164,7 @@ def fetch() -> bytes: bucket, key, version_id, range_start=start, range_end=end ).read() - return retryable_func_wrapper(fetch, retries=RETRY_NUM) + return retryable_func(fetch, max_retry_num=self.fs.max_retry_num) def commit(self) -> None: """Complete multipart upload or PUT.""" diff --git a/tosfs/stability.py b/tosfs/stability.py new file mode 100644 index 0000000..9a01b68 --- /dev/null +++ b/tosfs/stability.py @@ -0,0 +1,127 @@ +# ByteDance Volcengine EMR, Copyright 2024. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The module contains utility functions for the tosfs stability.""" + +import time +from typing import Any, Optional, Tuple + +import requests +from requests.exceptions import ( + ConnectTimeout, + HTTPError, + ProxyError, + ReadTimeout, + RetryError, + SSLError, + StreamConsumedError, + Timeout, +) +from tos.exceptions import TosClientError, TosError, TosServerError + +from tosfs.exceptions import TosfsError + +TOS_SERVER_RETRYABLE_STATUS_CODES = { + "409", # CONFLICT + "429", # TOO_MANY_REQUESTS + "500", # INTERNAL_SERVER_ERROR +} + +TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES = { + "0026-00000013", # DELETE_NON_EMPTY_DIR + "0026-00000020", # LOCATED_UNDER_A_FILE + "0026-00000021", # COPY_BETWEEN_DIR_AND_FILE + "0026-00000022", # PATH_LOCK_CONFLICT + "0026-00000025", # RENAME_TO_AN_EXISTED_DIR + "0026-00000026", # RENAME_TO_SUB_DIR + "0026-00000027", # RENAME_BETWEEN_DIR_AND_FILE + "0017-00000208", # APPEND_OFFSET_NOT_MATCHED + "0017-00000209", # APPEND_NOT_APPENDABLE +} + +TOS_CLIENT_RETRYABLE_EXCEPTIONS = { + HTTPError, + requests.ConnectionError, + ProxyError, + SSLError, + Timeout, + ConnectTimeout, + ReadTimeout, + StreamConsumedError, + RetryError, + InterruptedError, + ConnectionResetError, + ConnectionError, +} + +MAX_RETRY_NUM = 20 + + +def retryable_func( + func: Any, + *, + args: Tuple[Any, ...] = (), + kwargs: Optional[Any] = None, + max_retry_num: int = MAX_RETRY_NUM, +) -> Any: + """Retry a function in case of catch errors.""" + if kwargs is None: + kwargs = {} + + attempt = 0 + + while attempt < max_retry_num: + attempt += 1 + try: + return func(*args, **kwargs) + except TosError as e: + from tosfs.core import logger + + if attempt >= max_retry_num: + logger.error("Retry exhausted after %d times.", max_retry_num) + raise e + + if is_retryable_exception(e): + logger.warn("Retry TOS request in the %d times, error: %s", attempt, e) + try: + time.sleep(min(1.7**attempt * 0.1, 15)) + except InterruptedError as ie: + raise TosfsError(f"Request {func} interrupted.") from ie + else: + raise e + except Exception as e: + raise TosfsError(f"{e}") from e + + +def is_retryable_exception(e: TosError) -> bool: + """Check if the exception is retryable.""" + return _is_retryable_tos_server_exception(e) or _is_retryable_tos_client_exception( + e + ) + + +def _is_retryable_tos_server_exception(e: TosError) -> bool: + return ( + isinstance(e, TosServerError) + and e.status_code in TOS_SERVER_RETRYABLE_STATUS_CODES + # exclude some special error code under 409(conflict) status code + # let it fast fail + and e.code not in TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES + ) + + +def _is_retryable_tos_client_exception(e: TosError) -> bool: + return isinstance(e, TosClientError) and any( + isinstance(e.cause, excp) for excp in TOS_CLIENT_RETRYABLE_EXCEPTIONS + ) diff --git a/tosfs/utils.py b/tosfs/utils.py index 36f1944..569d6d8 100644 --- a/tosfs/utils.py +++ b/tosfs/utils.py @@ -17,12 +17,7 @@ import re import string import tempfile -import time -from typing import Any, Generator, Optional, Tuple - -from tos.exceptions import TosServerError - -from tosfs.consts import TOS_SERVER_RETRYABLE_ERROR_CODE_SET +from typing import Generator, Tuple def random_str(length: int = 5) -> str: @@ -103,34 +98,3 @@ def get_brange(size: int, block: int) -> Generator[Tuple[int, int], None, None]: """ for offset in range(0, size, block): yield offset, min(offset + block - 1, size - 1) - - -def retryable_func_wrapper( - func: Any, *, args: tuple[()] = (), kwargs: Optional[Any] = None, retries: int = 5 -) -> Any: - """Retry a function in case of server errors.""" - if kwargs is None: - kwargs = {} - - err = None - - for i in range(retries): - try: - return func(*args, **kwargs) - except TosServerError as e: - err = e - from tosfs.core import logger - - logger.debug("Server error (maybe retryable): %s", e) - if e.code in TOS_SERVER_RETRYABLE_ERROR_CODE_SET: - time.sleep(min(1.7**i * 0.1, 15)) - else: - break - except Exception as e: - err = e - from tosfs.core import logger - - logger.debug("Nonretryable error: %s", e) - break - - raise err if err is not None else ""