diff --git a/tosfs/consts.py b/tosfs/consts.py index fc82585..31ef130 100644 --- a/tosfs/consts.py +++ b/tosfs/consts.py @@ -27,8 +27,19 @@ "ServiceUnavailable", } -MANAGED_COPY_THRESHOLD = 5 * 2**30 +MANAGED_COPY_MAX_THRESHOLD = 5 * 2**30 +MANAGED_COPY_MIN_THRESHOLD = 5 * 2**20 RETRY_NUM = 5 PART_MIN_SIZE = 5 * 2**20 PART_MAX_SIZE = 5 * 2**30 + +FILE_OPERATION_READ_WRITE_BUFFER_SIZE = 5 * 2**20 # 5MB +PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD = 5 * 2**30 # 5GB +GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE = 2**16 # 64KB +APPEND_OPERATION_SMALL_FILE_THRESHOLD = 5 * 2**20 # 5MB + +LS_OPERATION_DEFAULT_MAX_ITEMS = 1000 + +# environment variable names +ENV_NAME_TOSFS_LOGGING_LEVEL = "TOSFS_LOGGING_LEVEL" diff --git a/tosfs/core.py b/tosfs/core.py index 34f6857..f74f117 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -35,8 +35,15 @@ ) from tosfs.consts import ( - MANAGED_COPY_THRESHOLD, + APPEND_OPERATION_SMALL_FILE_THRESHOLD, + ENV_NAME_TOSFS_LOGGING_LEVEL, + FILE_OPERATION_READ_WRITE_BUFFER_SIZE, + GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE, + LS_OPERATION_DEFAULT_MAX_ITEMS, + MANAGED_COPY_MAX_THRESHOLD, + MANAGED_COPY_MIN_THRESHOLD, PART_MAX_SIZE, + PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, RETRY_NUM, TOS_SERVER_RESPONSE_CODE_NOT_FOUND, ) @@ -44,9 +51,6 @@ from tosfs.fsspec_utils import glob_translate from tosfs.utils import find_bucket_key, get_brange, retryable_func_wrapper -# environment variable names -ENV_NAME_TOSFS_LOGGING_LEVEL = "TOSFS_LOGGING_LEVEL" - logger = logging.getLogger("tosfs") @@ -73,7 +77,6 @@ class TosFileSystem(AbstractFileSystem): """ protocol = ("tos", "tosfs") - default_block_size = 5 * 2**20 def __init__( self, @@ -97,7 +100,9 @@ def __init__( credentials_provider=credentials_provider, ) self.version_aware = version_aware - self.default_block_size = default_block_size or self.default_block_size + self.default_block_size = ( + default_block_size or FILE_OPERATION_READ_WRITE_BUFFER_SIZE + ) self.default_fill_cache = default_fill_cache self.default_cache_type = default_cache_type @@ -519,7 +524,7 @@ def put_file( self, lpath: str, rpath: str, - chunksize: int = 5 * 2**20, + chunksize: int = FILE_OPERATION_READ_WRITE_BUFFER_SIZE, **kwargs: Any, ) -> None: """Put a file from local to TOS. @@ -578,7 +583,7 @@ def put_file( bucket, key, _ = self._split_path(rpath) with open(lpath, "rb") as f: - if size < min(5 * 2**30, 2 * chunksize): + if size < min(PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, 2 * chunksize): chunk = f.read() self.tos_client.put_object( bucket, @@ -642,10 +647,10 @@ def _read_chunks(body: BinaryIO, f: BinaryIO) -> None: bytes_read = 0 while True: try: - chunk = body.read(2**16) + chunk = body.read(GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE) except tos.exceptions.TosClientError as e: failed_reads += 1 - if failed_reads >= self.RETRY_NUM: + if failed_reads >= RETRY_NUM: raise e try: body.close() @@ -848,7 +853,7 @@ def cp_file( path1: str, path2: str, preserve_etag: Optional[bool] = None, - managed_copy_threshold: Optional[int] = MANAGED_COPY_THRESHOLD, + managed_copy_threshold: Optional[int] = MANAGED_COPY_MAX_THRESHOLD, **kwargs: Any, ) -> None: """Copy file between locations on tos. @@ -897,11 +902,11 @@ def cp_file( if preserve_etag and parts_suffix: self._copy_etag_preserved(path1, path2, size, total_parts=int(parts_suffix)) elif size <= min( - MANAGED_COPY_THRESHOLD, + MANAGED_COPY_MAX_THRESHOLD, ( managed_copy_threshold if managed_copy_threshold - else MANAGED_COPY_THRESHOLD + else MANAGED_COPY_MAX_THRESHOLD ), ): self._copy_basic(path1, path2, **kwargs) @@ -1065,17 +1070,17 @@ def _copy_managed( path1: str, path2: str, size: int, - block: int = MANAGED_COPY_THRESHOLD, + block: int = MANAGED_COPY_MAX_THRESHOLD, **kwargs: Any, ) -> None: """Copy file between locations on tos as multiple-part. block: int The size of the pieces, must be larger than 5MB and at - most MANAGED_COPY_THRESHOLD. + most MANAGED_COPY_MAX_THRESHOLD. Smaller blocks mean more calls, only useful for testing. """ - if block < 5 * 2**20 or block > MANAGED_COPY_THRESHOLD: + if block < MANAGED_COPY_MIN_THRESHOLD or block > MANAGED_COPY_MAX_THRESHOLD: raise ValueError("Copy block size must be 5MB<=block<=5GB") bucket1, key1, version1 = self._split_path(path1) @@ -1485,7 +1490,7 @@ def _lsbuckets(self) -> List[dict]: def _lsdir( self, path: str, - max_items: int = 1000, + max_items: int = LS_OPERATION_DEFAULT_MAX_ITEMS, delimiter: str = "/", prefix: str = "", include_self: bool = False, @@ -1555,7 +1560,7 @@ def _lsdir( def _listdir( self, bucket: str, - max_items: int = 1000, + max_items: int = LS_OPERATION_DEFAULT_MAX_ITEMS, delimiter: str = "/", prefix: str = "", include_self: bool = False, @@ -1779,7 +1784,7 @@ def __init__( head = self.fs.tos_client.head_object(bucket, key) loc = head.content_length - if loc < 5 * 2**20: + if loc < APPEND_OPERATION_SMALL_FILE_THRESHOLD: # existing file too small for multi-upload: download self.write(self.fs.cat(self.path)) else: