From 428072464ceaeb139909006261e7dfee2de2c82b Mon Sep 17 00:00:00 2001 From: yanghua Date: Thu, 12 Sep 2024 16:14:48 +0800 Subject: [PATCH] Infra: introduce retry func warpper --- tosfs/core.py | 246 ++++++++++++++++++++++++++++++--------------- tosfs/stability.py | 4 +- 2 files changed, 168 insertions(+), 82 deletions(-) diff --git a/tosfs/core.py b/tosfs/core.py index 8bc9eaf..1533f5f 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -31,6 +31,8 @@ CreateMultipartUploadOutput, ListedObject, ListedObjectVersion, + ListObjectType2Output, + ListObjectVersionsOutput, PartInfo, UploadPartOutput, ) @@ -686,12 +688,6 @@ def put_file( with open(lpath, "rb") as f: if size < min(PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, 2 * chunksize): chunk = f.read() - self.tos_client.put_object( - bucket, - key, - content=chunk, - content_type=content_type, - ) retryable_func( lambda: self.tos_client.put_object( bucket, @@ -1120,12 +1116,22 @@ def __init__(self, key: str, version_id: Optional[str] = None): self.version_id = version_id while is_truncated: - resp = self.tos_client.list_objects_type2( - bucket, - prefix=key.rstrip("/") + "/", - delimiter="/", - max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS, - continuation_token=continuation_token, + + def _call_list_objects_type2( + continuation_token: str = continuation_token, + ) -> ListObjectType2Output: + return self.tos_client.list_objects_type2( + bucket, + prefix=key.rstrip("/") + "/", + delimiter="/", + max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS, + continuation_token=continuation_token, + ) + + resp = retryable_func( + _call_list_objects_type2, + args=(continuation_token,), + max_retry_num=self.max_retry_num, ) is_truncated = resp.is_truncated continuation_token = resp.next_continuation_token @@ -1137,8 +1143,11 @@ def __init__(self, key: str, version_id: Optional[str] = None): ] if deleting_objects: - delete_resp = self.tos_client.delete_multi_objects( - bucket, deleting_objects, quiet=True + delete_resp = retryable_func( + lambda: self.tos_client.delete_multi_objects( + bucket, deleting_objects, quiet=True + ), + max_retry_num=self.max_retry_num, ) if delete_resp.error: for d in delete_resp.error: @@ -1153,18 +1162,17 @@ def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None: buc2, key2, ver2 = self._split_path(path2) if ver2: raise ValueError("Cannot copy to a versioned file!") - try: - self.tos_client.copy_object( + + retryable_func( + lambda: self.tos_client.copy_object( bucket=buc2, key=key2, src_bucket=buc1, src_key=key1, src_version_id=ver1, - ) - except (TosClientError, TosServerError) as e: - raise e - except Exception as e: - raise TosfsError("Copy failed (%r -> %r): %s" % (path1, path2, e)) from e + ), + max_retry_num=self.max_retry_num, + ) def _copy_etag_preserved( self, path1: str, path2: str, size: int, total_parts: int, **kwargs: Any @@ -1176,7 +1184,10 @@ def _copy_etag_preserved( upload_id = None try: - mpu = self.tos_client.create_multipart_upload(bucket2, key2) + mpu = retryable_func( + lambda: self.tos_client.create_multipart_upload(bucket2, key2), + max_retry_num=self.max_retry_num, + ) upload_id = mpu.upload_id parts = [] @@ -1188,15 +1199,26 @@ def _copy_etag_preserved( if brange_last > size: brange_last = size - 1 - part = self.tos_client.upload_part_copy( - bucket=bucket2, - key=key2, - part_number=i, - upload_id=upload_id, - src_bucket=bucket1, - src_key=key1, - copy_source_range_start=brange_first, - copy_source_range_end=brange_last, + def _call_upload_part_copy( + i: int = i, + brange_first: int = brange_first, + brange_last: int = brange_last, + ) -> None: + self.tos_client.upload_part_copy( + bucket=bucket2, + key=key2, + part_number=i, + upload_id=upload_id, + src_bucket=bucket1, + src_key=key1, + copy_source_range_start=brange_first, + copy_source_range_end=brange_last, + ) + + part = retryable_func( + _call_upload_part_copy, + args=(i, brange_first, brange_last), + max_retry_num=self.max_retry_num, ) parts.append( PartInfo( @@ -1210,9 +1232,19 @@ def _copy_etag_preserved( ) brange_first += part_size - self.tos_client.complete_multipart_upload(bucket2, key2, upload_id, parts) + retryable_func( + lambda: self.tos_client.complete_multipart_upload( + bucket2, key2, upload_id, parts + ), + max_retry_num=self.max_retry_num, + ) except Exception as e: - self.tos_client.abort_multipart_upload(bucket2, key2, upload_id) + retryable_func( + lambda: self.tos_client.abort_multipart_upload( + bucket2, key2, upload_id + ), + max_retry_num=self.max_retry_num, + ) raise TosfsError(f"Copy failed ({path1} -> {path2}): {e}") from e def _copy_managed( @@ -1239,9 +1271,15 @@ def _copy_managed( upload_id = None try: - mpu = self.tos_client.create_multipart_upload(bucket2, key2) + mpu = retryable_func( + lambda: self.tos_client.create_multipart_upload(bucket2, key2), + max_retry_num=self.max_retry_num, + ) upload_id = mpu.upload_id - out = [ + + def _call_upload_part_copy( + i: int, brange_first: int, brange_last: int + ) -> None: self.tos_client.upload_part_copy( bucket=bucket2, key=key2, @@ -1252,6 +1290,13 @@ def _copy_managed( copy_source_range_start=brange_first, copy_source_range_end=brange_last, ) + + out = [ + retryable_func( + _call_upload_part_copy, + args=(i, brange_first, brange_last), + max_retry_num=self.max_retry_num, + ) for i, (brange_first, brange_last) in enumerate(get_brange(size, block)) ] @@ -1267,9 +1312,19 @@ def _copy_managed( for i, o in enumerate(out) ] - self.tos_client.complete_multipart_upload(bucket2, key2, upload_id, parts) + retryable_func( + lambda: self.tos_client.complete_multipart_upload( + bucket2, key2, upload_id, parts + ), + max_retry_num=self.max_retry_num, + ) except Exception as e: - self.tos_client.abort_multipart_upload(bucket2, key2, upload_id) + retryable_func( + lambda: self.tos_client.abort_multipart_upload( + bucket2, key2, upload_id + ), + max_retry_num=self.max_retry_num, + ) raise TosfsError(f"Copy failed ({path1} -> {path2}): {e}") from e def _find_file_dir( @@ -1311,19 +1366,17 @@ def _open_remote_file( range_start: int, **kwargs: Any, ) -> Tuple[BinaryIO, int]: - try: - resp = self.tos_client.get_object( + resp = retryable_func( + lambda: self.tos_client.get_object( bucket, key, version_id=version_id, range_start=range_start, **kwargs, - ) - return resp.content, resp.content_length - except (TosClientError, TosServerError) as e: - raise e - except Exception as e: - raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + ), + max_retry_num=self.max_retry_num, + ) + return resp.content, resp.content_length def _bucket_info(self, bucket: str) -> dict: """Get the information of a bucket. @@ -1357,7 +1410,10 @@ def _bucket_info(self, bucket: str) -> dict: """ try: - self.tos_client.head_bucket(bucket) + retryable_func( + lambda: self.tos_client.head_bucket(bucket), + max_retry_num=self.max_retry_num, + ) return self._fill_bucket_info(bucket) except TosClientError as e: raise e @@ -1407,7 +1463,10 @@ def _object_info( """ try: - out = self.tos_client.head_object(bucket, key, version_id=version_id) + out = retryable_func( + lambda: self.tos_client.head_object(bucket, key, version_id=version_id), + max_retry_num=self.max_retry_num, + ) return { "ETag": out.etag or "", "LastModified": out.last_modified or "", @@ -1434,11 +1493,14 @@ def _try_dir_info(self, bucket: str, key: str, path: str, fullpath: str) -> dict try: # We check to see if the path is a directory by attempting to list its # contexts. If anything is found, it is indeed a directory - out = self.tos_client.list_objects_type2( - bucket, - prefix=key.rstrip("/") + "/" if key else "", - delimiter="/", - max_keys=1, + out = retryable_func( + lambda: self.tos_client.list_objects_type2( + bucket, + prefix=key.rstrip("/") + "/" if key else "", + delimiter="/", + max_keys=1, + ), + max_retry_num=self.max_retry_num, ) if out.key_count > 0 or out.contents or out.common_prefixes: @@ -1534,7 +1596,10 @@ def _exists_bucket(self, bucket: str) -> bool: """ try: - self.tos_client.head_bucket(bucket) + retryable_func( + lambda: self.tos_client.head_bucket(bucket), + max_retry_num=self.max_retry_num, + ) return True except TosClientError as e: raise e @@ -1626,7 +1691,9 @@ def _lsbuckets(self) -> List[dict]: """ try: - resp = self.tos_client.list_buckets() + resp = retryable_func( + lambda: self.tos_client.list_buckets(), max_retry_num=self.max_retry_num + ) except (TosClientError, TosServerError) as e: raise e except Exception as e: @@ -1758,11 +1825,15 @@ def _listdir( all_results = [] is_truncated = True - try: - if self.version_aware: - key_marker, version_id_marker = None, None - while is_truncated: - resp = self.tos_client.list_object_versions( + if self.version_aware: + key_marker, version_id_marker = None, None + while is_truncated: + + def _call_list_object_versions( + key_marker: Optional[Any] = key_marker, + version_id_marker: Optional[Any] = version_id_marker, + ) -> ListObjectVersionsOutput: + return self.tos_client.list_object_versions( bucket, prefix, delimiter=delimiter, @@ -1770,18 +1841,28 @@ def _listdir( key_marker=key_marker, version_id_marker=version_id_marker, ) - is_truncated = resp.is_truncated - all_results.extend( - resp.versions + resp.common_prefixes + resp.delete_markers - ) - key_marker, version_id_marker = ( - resp.next_key_marker, - resp.next_version_id_marker, - ) - else: - continuation_token = "" - while is_truncated: - resp = self.tos_client.list_objects_type2( + + resp = retryable_func( + _call_list_object_versions, + args=(key_marker, version_id_marker), + max_retry_num=self.max_retry_num, + ) + is_truncated = resp.is_truncated + all_results.extend( + resp.versions + resp.common_prefixes + resp.delete_markers + ) + key_marker, version_id_marker = ( + resp.next_key_marker, + resp.next_version_id_marker, + ) + else: + continuation_token = "" + while is_truncated: + + def _call_list_objects_type2( + continuation_token: str = continuation_token, + ) -> ListObjectType2Output: + return self.tos_client.list_objects_type2( bucket, prefix, start_after=prefix if not include_self else None, @@ -1789,16 +1870,18 @@ def _listdir( max_keys=max_items, continuation_token=continuation_token, ) - is_truncated = resp.is_truncated - continuation_token = resp.next_continuation_token - all_results.extend(resp.contents + resp.common_prefixes) + resp = retryable_func( + _call_list_objects_type2, + args=(continuation_token,), + max_retry_num=self.max_retry_num, + ) + is_truncated = resp.is_truncated + continuation_token = resp.next_continuation_token - return all_results - except (TosClientError, TosServerError) as e: - raise e - except Exception as e: - raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + all_results.extend(resp.contents + resp.common_prefixes) + + return all_results def _rm(self, path: str) -> None: logger.info("Removing path: %s", path) @@ -1808,7 +1891,10 @@ def _rm(self, path: str) -> None: key = key.rstrip("/") + "/" try: - self.tos_client.delete_object(bucket, key) + retryable_func( + lambda: self.tos_client.delete_object(bucket, key), + max_retry_num=self.max_retry_num, + ) except (TosClientError, TosServerError) as e: raise e except Exception as e: diff --git a/tosfs/stability.py b/tosfs/stability.py index e53dc6e..e06373b 100644 --- a/tosfs/stability.py +++ b/tosfs/stability.py @@ -15,7 +15,7 @@ """The module contains utility functions for the tosfs stability.""" import time -from typing import Any, Optional +from typing import Any, Optional, Tuple import requests from requests.exceptions import ( @@ -71,7 +71,7 @@ def retryable_func( func: Any, *, - args: tuple[()] = (), + args: Tuple[Any, ...] = (), kwargs: Optional[Any] = None, max_retry_num: int = MAX_RETRY_NUM, ) -> Any: