From 611ee30793e6e8c6967a52cbbc0578e4ff43b312 Mon Sep 17 00:00:00 2001 From: yanghua Date: Fri, 27 Sep 2024 19:35:11 +0800 Subject: [PATCH] Run test cases on hns --- .DS_Store | Bin 0 -> 8196 bytes .github/workflows/ci.yml | 4 +- .github/workflows/ci_hns.yml | 66 +++++++ tosfs/consts.py | 8 +- tosfs/core.py | 337 +++++++++++++++++++++++++++++------ tosfs/retry.py | 9 +- tosfs/tests/test_tosfs.py | 134 ++++++++------ tosfs/utils.py | 16 +- 8 files changed, 444 insertions(+), 130 deletions(-) create mode 100644 .DS_Store create mode 100644 .github/workflows/ci_hns.yml diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5a61d3d50f1b520975ee32c7940daf78ed7e71f2 GIT binary patch literal 8196 zcmeHMYit!o6rOKsVRxZ)O3Tt#s#j`wwcv$H%S*lV!BZZgPs-zR@7)Dfw!7u--dm6m zYYd4GG@7XYL}S$G4@uN$jFATa5r6o@G)8>z$HYg}XyPvu44&E9Vju9sgcuTbl9@B- zoH?^IXTGy@&tAqDS_|e1#;O@(Dpf)GOlq!EgkIDsMTsP(L_zi}JLC^$9WP7#M^i?} zg4hGG2VxJz9*8{@n)hoEBAKG?Gg{rAq znpdAZ*_3Q)Osr^Gb*eFOa?Q$QW8%K1=2NFMb#DE=>vpG)Is>lvl-vTM*8nSw0BXZ_)PctYhuT_&LipZTE2ZkZlG> z`pI){U>~xryrxx@Hg~z10c*6Zd^R`c88s@eSNjTC`&hvaPK@%|b6HsBvFPmg`BQ+sR}t zN8_e6YtmO4y+!Fmjc=2})%seaFXs&m(2{+v0*z99zww}PKs@=Oj34YAv|O>O5vgl6 z+KqnyNG5OX93m8{{Gya^B6)Z=q!DHvrX6^BT64JdI%B)4?h#XNFip#p!d=F0m4Bhq zG1zX=ChXGjN(igCW5C_A!eMvJ>nKdzqbO@30Tq2)oEW zXP4O5>>GBOeb0Vnzp>xhAM7tEC#!aj*otk~jveU5ejLC- zWbiQZ@KHb!kKzeDi8FWxFX0WmiMQ}J-otr}-~v9yXZRN1;WB>2Pxu*s;tKx8RfQ>) zN|jQlEKnNcvX!xz=$i;HrIGk+F-|cNeWO#3T`wkTH*VT|YsAE*St1H5=2TYAy;Fon z^SabE6iL{PA#FT)LfyHS*N6WXogs(>t?5skikfl6rS zr4hSu5wY9&bjj8>G^!L?TnSmdHn~Eh_~FwcwqX^qkc_@&twxc=OGazvRU7N|2XxLv zBx})3ZS6GE2v`3*NZ(~2vkL^$Q3B}?>=%OORj35aS|qR#iwK$xScYb_5IEO{(A!op$LAw+`^qQiI$kK;6+#dCNbFW^O-#VdFfuiqH>p{CFT1JM*sNxra$6fLqD}=Fuf|i20K=6DNteo92OOa;9pL|99N@`~OXQUOZ9k zf!G7Lo(E9U-r3$t>l*t3F?oq1*G^D9NL3UuZdlULgqkoNCmN>XM6dl}Nc{v26*j4e d4ogZBD*yY3fPa5&%9}R6{}JE+Arx<`<{xww6ITEL literal 0 HcmV?d00001 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2ccb597..7b8caab 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -name: CI +name: CI-FNS on: push: @@ -36,6 +36,7 @@ jobs: matrix: python-version: ["3.9", "3.10", "3.11", "3.12"] fsspec-version: ["2023.5.0", "2024.9.0", "2024.10.0"] + bucket-name: ["proton-ci"] steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} @@ -60,5 +61,6 @@ jobs: echo "TOS_ENDPOINT=${{ vars.TOS_ENDPOINT }}" >> $GITHUB_ENV echo "TOSFS_LOGGING_LEVEL=${{ vars.TOSFS_LOGGING_LEVEL }}" >> $GITHUB_ENV echo "TOS_SDK_LOGGING_LEVEL=${{ vars.TOS_SDK_LOGGING_LEVEL }}" >> $GITHUB_ENV + echo "TOS_BUCKET=${{ matrix.bucket-name }}" >> $GITHUB_ENV - name: Run tests run: make test diff --git a/.github/workflows/ci_hns.yml b/.github/workflows/ci_hns.yml new file mode 100644 index 0000000..d927974 --- /dev/null +++ b/.github/workflows/ci_hns.yml @@ -0,0 +1,66 @@ +# ByteDance Volcengine EMR, Copyright 2024. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: CI-HNS + +on: + push: + paths-ignore: + - '*.md' + - 'README.md' + - 'pyproject.toml' + - 'poetry.lock' + - 'Makefile' + - 'LICENSE' + - '.github/ISSUE_TEMPLATE/**' + - '.gitignore' + - 'docs/**' + - 'examples/**' + +jobs: + build: + runs-on: ubuntu-latest + timeout-minutes: 60 + strategy: + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12"] + fsspec-version: ["2023.5.0", "2024.9.0", "2024.10.0"] + bucket-name: ["proton-ci-hns"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + set -x + python -m pip install --upgrade pip + pip install --user poetry + poetry lock + poetry install --with dev + poetry add fsspec==${{ matrix.fsspec-version }} + poetry show fsspec + - name: Prepare Env + run: | + echo "Preparing environment variables" + echo "TOS_ACCESS_KEY=${{ secrets.TOS_ACCESS_KEY }}" >> $GITHUB_ENV + echo "TOS_SECRET_KEY=${{ secrets.TOS_SECRET_KEY }}" >> $GITHUB_ENV + echo "TOS_REGION=${{ vars.TOS_REGION }}" >> $GITHUB_ENV + echo "TOS_ENDPOINT=${{ vars.TOS_ENDPOINT }}" >> $GITHUB_ENV + echo "TOSFS_LOGGING_LEVEL=${{ vars.TOSFS_LOGGING_LEVEL }}" >> $GITHUB_ENV + echo "TOS_SDK_LOGGING_LEVEL=${{ vars.TOS_SDK_LOGGING_LEVEL }}" >> $GITHUB_ENV + echo "TOS_BUCKET=${{ matrix.bucket-name }}" >> $GITHUB_ENV + - name: Run tests + run: make test diff --git a/tosfs/consts.py b/tosfs/consts.py index 398f3bf..4f405b8 100644 --- a/tosfs/consts.py +++ b/tosfs/consts.py @@ -17,8 +17,12 @@ # Tos server response status codes TOS_SERVER_STATUS_CODE_NOT_FOUND = 404 -MANAGED_COPY_MAX_THRESHOLD = 5 * 2**30 # 5GB -MANAGED_COPY_MIN_THRESHOLD = 5 * 2**20 # 5MB +# tos bucket type (hns, fns) +TOS_BUCKET_TYPE_HNS = "hns" +TOS_BUCKET_TYPE_FNS = "fns" + +MANAGED_COPY_MAX_THRESHOLD = 5 * 2**30 +MANAGED_COPY_MIN_THRESHOLD = 5 * 2**20 RETRY_NUM = 5 PART_MIN_SIZE = 5 * 2**20 diff --git a/tosfs/core.py b/tosfs/core.py index 144278f..81b875d 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -48,6 +48,8 @@ MPU_PART_SIZE_THRESHOLD, PART_MAX_SIZE, PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, + TOS_BUCKET_TYPE_FNS, + TOS_BUCKET_TYPE_HNS, TOS_SERVER_STATUS_CODE_NOT_FOUND, TOSFS_LOG_FORMAT, ) @@ -55,9 +57,9 @@ from tosfs.fsspec_utils import glob_translate from tosfs.models import DeletingObject from tosfs.mpu import MultipartUploader -from tosfs.retry import INVALID_RANGE_CODE, retryable_func_executor +from tosfs.retry import CONFLICT_CODE, INVALID_RANGE_CODE, retryable_func_executor from tosfs.tag import BucketTagMgr -from tosfs.utils import find_bucket_key, get_brange, is_dir +from tosfs.utils import find_bucket_key, get_brange from tosfs.version import Version logger = logging.getLogger("tosfs") @@ -445,6 +447,10 @@ def ls_iterate( path = self._strip_protocol(path) bucket, key, _ = self._split_path(path) + + if recursive and self._is_hns_bucket(bucket): + raise ValueError("Recursive listing is not supported for HNS bucket.") + prefix = key.lstrip("/") + "/" if key else "" continuation_token = "" is_truncated = True @@ -524,10 +530,23 @@ def info( if not key: return self._bucket_info(bucket) - if info := self._object_info(bucket, key, version_id): - return info + bucket_type = self._get_bucket_type(bucket) + if bucket_type == TOS_BUCKET_TYPE_FNS: + result = self._object_info(bucket, key, version_id) + + if not result: + result = self._get_dir_info(bucket, key, fullpath) + else: + # Priority is given to judging dir, followed by file. + result = self._get_dir_info(bucket, key, fullpath) + + if not result: + result = self._object_info(bucket, key, version_id) + + if not result: + raise FileNotFoundError(f"Can not get information for path: {path}") - return self._get_dir_info(bucket, key, path, fullpath) + return result def exists(self, path: str, **kwargs: Any) -> bool: """Check if a path exists in the TOS. @@ -816,14 +835,19 @@ def isdir(self, path: str) -> bool: if not key: return False - key = key.rstrip("/") + "/" - try: - resp = retryable_func_executor( - lambda: self.tos_client.get_file_status(bucket, key), - max_retry_num=self.max_retry_num, - ) - return resp.key is not None and is_dir(resp.key, key) + if self._is_fns_bucket(bucket): + resp = retryable_func_executor( + lambda: self.tos_client.get_file_status(bucket, key), + max_retry_num=self.max_retry_num, + ) + return resp.key != key + else: + resp = retryable_func_executor( + lambda: self.tos_client.head_object(bucket, key), + max_retry_num=self.max_retry_num, + ) + return resp.is_directory except TosServerError as e: if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: return False @@ -851,10 +875,14 @@ def isfile(self, path: str) -> bool: return False try: - return retryable_func_executor( - lambda: self.tos_client.head_object(bucket, key) and True, + resp = retryable_func_executor( + lambda: self.tos_client.head_object(bucket, key), max_retry_num=self.max_retry_num, ) + if self._is_fns_bucket(bucket): + return True + else: + return not resp.is_directory except TosClientError as e: raise e except TosServerError as e: @@ -1233,14 +1261,21 @@ def cp_file( If there is an unknown error while copying the file. """ + path1 = self._strip_protocol(path1) + path2 = self._strip_protocol(path2) if path1 == path2: logger.warning("Source and destination are the same: %s", path1) return - path1 = self._strip_protocol(path1) + if self.isdir(path1) and self.isdir(path2): + return + bucket, key, vers = self._split_path(path1) info = self.info(path1, bucket, key, version_id=vers) + if not info: + raise FileNotFoundError(f"Can not get information for path: {path1}") + if info["type"] == "directory": logger.warning("Do not support copy directory %s.", path1) return @@ -1361,11 +1396,53 @@ def _rm(self, path: str) -> None: ######################## private methods ######################## def _list_and_batch_delete_objs(self, bucket: str, key: str) -> None: + bucket_type = self._get_bucket_type(bucket) is_truncated = True continuation_token = "" all_results = [] - def delete_objects(deleting_objects: List[DeletingObject]) -> None: + if bucket_type == TOS_BUCKET_TYPE_FNS: + + def _call_list_objects( + continuation_token: str = "", + ) -> ListObjectType2Output: + return retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + prefix=key.rstrip("/") + "/", + max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS, + continuation_token=continuation_token, + ), + max_retry_num=self.max_retry_num, + ) + + while is_truncated: + resp = _call_list_objects(continuation_token) + is_truncated = resp.is_truncated + continuation_token = resp.next_continuation_token + all_results = resp.contents + + deleting_objects = [ + DeletingObject(o.key if hasattr(o, "key") else o.prefix) + for o in all_results + ] + + if deleting_objects: + self._delete_objects(bucket, deleting_objects) + elif bucket_type == TOS_BUCKET_TYPE_HNS: + all_results = self._list_and_collect_objects( + bucket, bucket_type, key.rstrip("/") + "/" + ) + if all_results: + self._delete_objects(bucket, all_results) + else: + raise ValueError(f"Unsupported bucket type: {bucket_type}") + + def _delete_objects( + self, bucket: str, deleting_objects: list[DeletingObject] + ) -> None: + bucket_type = self._get_bucket_type(bucket) + if bucket_type == TOS_BUCKET_TYPE_FNS: delete_resp = retryable_func_executor( lambda: self.tos_client.delete_multi_objects( bucket, deleting_objects, quiet=True @@ -1375,35 +1452,71 @@ def delete_objects(deleting_objects: List[DeletingObject]) -> None: if delete_resp.error: for d in delete_resp.error: logger.warning("Deleted object: %s failed", d) + else: + + def _call_delete_object(obj: DeletingObject) -> None: + retryable_func_executor( + lambda: self.tos_client.delete_object(bucket, obj.key), + max_retry_num=self.max_retry_num, + ) + + # Preferentially delete subpaths with longer keys + for obj in sorted(deleting_objects, key=lambda x: len(x.key), reverse=True): + _call_delete_object(obj) + + def _list_and_collect_objects( + self, + bucket: str, + bucket_type: str, + prefix: str, + collected_objects: Optional[List[DeletingObject]] = None, + ) -> List[DeletingObject]: + + if collected_objects is None: + collected_objects = [] + + collected_keys = {obj.key for obj in collected_objects} + + is_truncated = True + continuation_token = "" while is_truncated: def _call_list_objects_type2( - continuation_token: str = continuation_token, + continuation_token: str = continuation_token, prefix: str = prefix ) -> ListObjectType2Output: return self.tos_client.list_objects_type2( bucket, - prefix=key.rstrip("/") + "/", + prefix=prefix, max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS, continuation_token=continuation_token, + delimiter="/" if bucket_type == TOS_BUCKET_TYPE_HNS else None, ) resp = retryable_func_executor( _call_list_objects_type2, - args=(continuation_token,), max_retry_num=self.max_retry_num, ) is_truncated = resp.is_truncated continuation_token = resp.next_continuation_token - all_results = resp.contents - deleting_objects = [ - DeletingObject(o.key if hasattr(o, "key") else o.prefix) - for o in all_results - ] + for obj in resp.contents: + key = obj.key if hasattr(obj, "key") else obj.prefix + if key not in collected_keys: + collected_objects.append(DeletingObject(key=key)) + collected_keys.add(key) + + for common_prefix in resp.common_prefixes: + key = common_prefix.prefix + if key not in collected_keys: + collected_objects.append(DeletingObject(key=key)) + collected_keys.add(key) + if bucket_type == TOS_BUCKET_TYPE_HNS: + self._list_and_collect_objects( + bucket, bucket_type, common_prefix.prefix, collected_objects + ) - if deleting_objects: - delete_objects(deleting_objects) + return collected_objects def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None: """Copy file between locations on tos. @@ -1583,13 +1696,18 @@ def _find_file_dir( self, key: str, path: str, prefix: str, withdirs: bool, kwargs: Any ) -> List[dict]: out = self._ls_dirs_and_files( - path, delimiter="", include_self=True, prefix=prefix, **kwargs + path, + delimiter="", + include_self=True, + prefix=prefix, + recursive=True, ) if not out and key: try: out = [self.info(path)] except FileNotFoundError: out = [] + dirs = { self._parent(o["name"]): { "Key": self._parent(o["name"]).rstrip("/"), @@ -1635,7 +1753,9 @@ def _open_remote_file( except TosServerError as e: if e.status_code == INVALID_RANGE_CODE: obj_info = self._object_info(bucket=bucket, key=key) - if obj_info["size"] == 0 or range_start == obj_info["size"]: + if obj_info and ( + obj_info["size"] == 0 or range_start == obj_info["size"] + ): return io.BytesIO(), 0 else: raise e @@ -1691,7 +1811,7 @@ def _bucket_info(self, bucket: str) -> dict: def _object_info( self, bucket: str, key: str, version_id: Optional[str] = None - ) -> dict: + ) -> Optional[dict]: """Get the information of an object. Parameters @@ -1744,16 +1864,20 @@ def _object_info( except TosClientError as e: raise e except TosServerError as e: - if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: + if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND or ( + self._get_bucket_type(bucket) == TOS_BUCKET_TYPE_HNS + and e.status_code == CONFLICT_CODE + and e.header._store["x-tos-ec"][1] == "0026-00000020" + ): pass else: raise e except Exception as e: raise TosfsError(f"Tosfs failed with unknown error: {e}") from e - return {} + return None - def _get_dir_info(self, bucket: str, key: str, path: str, fullpath: str) -> dict: + def _get_dir_info(self, bucket: str, key: str, fullpath: str) -> Optional[dict]: try: # We check to see if the path is a directory by attempting to list its # contexts. If anything is found, it is indeed a directory @@ -1776,8 +1900,8 @@ def _get_dir_info(self, bucket: str, key: str, path: str, fullpath: str) -> dict "type": "directory", } - raise FileNotFoundError(path) - except (TosClientError, TosServerError, FileNotFoundError) as e: + return None + except (TosClientError, TosServerError) as e: raise e except Exception as e: raise TosfsError(f"Tosfs failed with unknown error: {e}") from e @@ -1873,6 +1997,7 @@ def _ls_dirs_and_files( prefix: str = "", include_self: bool = False, versions: bool = False, + recursive: bool = False, ) -> List[dict]: bucket, key, _ = self._split_path(path) if not prefix: @@ -1883,6 +2008,8 @@ def _ls_dirs_and_files( logger.debug("Get directory listing for %s", path) dirs = [] files = [] + seen_names = set() + for obj in self._ls_objects( bucket, max_items=max_items, @@ -1890,13 +2017,26 @@ def _ls_dirs_and_files( prefix=prefix, include_self=include_self, versions=versions, + recursive=recursive, ): - if isinstance(obj, CommonPrefixInfo) and delimiter == "/": - dirs.append(self._fill_dir_info(bucket, obj)) + if isinstance(obj, CommonPrefixInfo): + dir_info = self._fill_dir_info(bucket, obj) + dir_name = dir_info["name"] + if dir_name not in seen_names: + dirs.append(dir_info) + seen_names.add(dir_name) elif obj.key.endswith("/"): - dirs.append(self._fill_dir_info(bucket, None, obj.key)) + dir_info = self._fill_dir_info(bucket, None, obj.key) + dir_name = dir_info["name"] + if dir_name not in seen_names: + dirs.append(dir_info) + seen_names.add(dir_name) else: - files.append(self._fill_file_info(obj, bucket, versions)) + file_info = self._fill_file_info(obj, bucket, versions) + file_name = file_info["name"] + if file_name not in seen_names: + files.append(file_info) + seen_names.add(file_name) files += dirs return files @@ -1909,6 +2049,7 @@ def _ls_objects( prefix: str = "", include_self: bool = False, versions: bool = False, + recursive: bool = False, ) -> List[Union[CommonPrefixInfo, ListedObject, ListedObjectVersion]]: if versions: raise ValueError( @@ -1916,35 +2057,104 @@ def _ls_objects( "not version aware." ) + bucket_type = self._get_bucket_type(bucket) all_results = [] - is_truncated = True - continuation_token = "" - while is_truncated: + if recursive and bucket_type == TOS_BUCKET_TYPE_HNS: - def _call_list_objects_type2( - continuation_token: str = continuation_token, - ) -> ListObjectType2Output: - return self.tos_client.list_objects_type2( - bucket, - prefix, - start_after=prefix if not include_self else None, - delimiter=delimiter, - max_keys=max_items, - continuation_token=continuation_token, + def _recursive_list(bucket: str, prefix: str) -> None: + resp = retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + prefix=prefix, + delimiter="/", + max_keys=max_items, + ), + max_retry_num=self.max_retry_num, ) + all_results.extend(resp.contents + resp.common_prefixes) + for common_prefix in resp.common_prefixes: + _recursive_list(bucket, common_prefix.prefix) + + _recursive_list(bucket, prefix) + else: + is_truncated = True + + continuation_token = "" + while is_truncated: + + def _call_list_objects_type2( + continuation_token: str = continuation_token, + ) -> ListObjectType2Output: + return self.tos_client.list_objects_type2( + bucket, + prefix, + start_after=prefix if not include_self else None, + delimiter=delimiter, + max_keys=max_items, + continuation_token=continuation_token, + ) + + resp = retryable_func_executor( + _call_list_objects_type2, + args=(continuation_token,), + max_retry_num=self.max_retry_num, + ) + is_truncated = resp.is_truncated + continuation_token = resp.next_continuation_token + + all_results.extend(resp.contents + resp.common_prefixes) + + return all_results + + def _prefix_search_for_exists(self, bucket: str, key: str) -> bool: + bucket_type = self._get_bucket_type(bucket) + + if bucket_type == TOS_BUCKET_TYPE_FNS: resp = retryable_func_executor( - _call_list_objects_type2, - args=(continuation_token,), + lambda: self.tos_client.list_objects_type2( + bucket, + key.rstrip("/") + "/", + start_after=key.rstrip("/") + "/", + max_keys=1, + ), max_retry_num=self.max_retry_num, ) - is_truncated = resp.is_truncated - continuation_token = resp.next_continuation_token + return len(resp.contents) > 0 + elif bucket_type == TOS_BUCKET_TYPE_HNS: - all_results.extend(resp.contents + resp.common_prefixes) + def search_in_common_prefixes(bucket: str, prefix: str) -> bool: + resp = retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + prefix, + delimiter="/", + max_keys=1, + ), + max_retry_num=self.max_retry_num, + ) + if len(resp.contents) > 0: + return True + for common_prefix in resp.common_prefixes: + if search_in_common_prefixes(bucket, common_prefix): + return True + return False - return all_results + resp = retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + key.rstrip("/") + "/", + delimiter="/", + max_keys=1, + ), + max_retry_num=self.max_retry_num, + ) + if len(resp.contents) > 0: + return True + return search_in_common_prefixes(bucket, key.rstrip("/") + "/") + else: + raise ValueError(f"Unsupported bucket type {bucket_type}") def _split_path(self, path: str) -> Tuple[str, str, Optional[str]]: """Normalise tos path string into bucket and key. @@ -1980,6 +2190,19 @@ def _split_path(self, path: str) -> Tuple[str, str, Optional[str]]: version_id if self.version_aware and version_id else None, ) + def _get_bucket_type(self, bucket: str) -> str: + bucket_type = self.tos_client._get_bucket_type(bucket) + if not bucket_type: + return TOS_BUCKET_TYPE_FNS + + return bucket_type + + def _is_hns_bucket(self, bucket: str) -> bool: + return self._get_bucket_type(bucket) == TOS_BUCKET_TYPE_HNS + + def _is_fns_bucket(self, bucket: str) -> bool: + return self._get_bucket_type(bucket) == TOS_BUCKET_TYPE_FNS + def _init_tag_manager(self) -> None: auth = self.tos_client.auth if isinstance(auth, CredentialProviderAuth): diff --git a/tosfs/retry.py b/tosfs/retry.py index 5842a56..4bab0f1 100644 --- a/tosfs/retry.py +++ b/tosfs/retry.py @@ -139,7 +139,14 @@ def _is_retryable_tos_server_exception(e: TosError) -> bool: # not all conflict errors are retryable if e.status_code == CONFLICT_CODE: - return e.ec not in TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES + return ( + e.ec not in TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES + and + # TODO: currently, hack for supporting hns, + # need to refactor when tos python sdk GA + e.header._store["x-tos-ec"][1] + not in TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES + ) return e.status_code in TOS_SERVER_RETRYABLE_STATUS_CODES diff --git a/tosfs/tests/test_tosfs.py b/tosfs/tests/test_tosfs.py index 4313638..efdc344 100644 --- a/tosfs/tests/test_tosfs.py +++ b/tosfs/tests/test_tosfs.py @@ -129,19 +129,22 @@ def test_ls_iterate( assert len(result) == len([dir_name, another_dir_name]) # Test list recursively - expected = [ - f"{bucket}/{temporary_workspace}/{dir_name}", - f"{bucket}/{temporary_workspace}/{dir_name}/{file_name}", - f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}", - f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}", - f"{bucket}/{temporary_workspace}/{another_dir_name}", - ] - result = [ - item - for batch in tosfs.ls_iterate(f"{bucket}/{temporary_workspace}", recursive=True) - for item in batch - ] - assert sorted(result) == sorted(expected) + if tosfs._is_fns_bucket(bucket): + expected = [ + f"{bucket}/{temporary_workspace}/{dir_name}", + f"{bucket}/{temporary_workspace}/{dir_name}/{file_name}", + f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}", + f"{bucket}/{temporary_workspace}/{dir_name}/{sub_dir_name}/{sub_file_name}", + f"{bucket}/{temporary_workspace}/{another_dir_name}", + ] + result = [ + item + for batch in tosfs.ls_iterate( + f"{bucket}/{temporary_workspace}", recursive=True + ) + for item in batch + ] + assert sorted(result) == sorted(expected) def test_inner_rm(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None: @@ -160,10 +163,10 @@ def test_info(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> No assert tosfs.info("") == {"name": "", "size": 0, "type": "directory"} assert tosfs.info("/") == {"name": "/", "size": 0, "type": "directory"} assert tosfs.info(bucket) == { - "Key": "proton-ci", + "Key": bucket, "Size": 0, "StorageClass": "BUCKET", - "name": "proton-ci", + "name": bucket, "size": 0, "type": "directory", } @@ -424,43 +427,44 @@ def test_put(tosfs: TosFileSystem, bucket: str, temporary_workspace: str): ) as file: assert file.read() == "hello world" - with tempfile.TemporaryDirectory() as local_temp_dir: - dir_2 = f"{local_temp_dir}/生技??174号文/" - dir_3 = f"{local_temp_dir}/生技**174号文/" - dir_4 = f"{local_temp_dir}/生技_=+&^%#174号文/" - os.makedirs(dir_2) - os.makedirs(dir_3) - os.makedirs(dir_4) - with open(f"{dir_2}/test.txt", "w") as f: - f.write("hello world") - tosfs.put( - local_temp_dir, - f"{bucket}/{temporary_workspace}", - recursive=True, - disable_glob=True, - ) - assert tosfs.exists( - f"{bucket}/{temporary_workspace}/{os.path.basename(local_temp_dir)}" - f"/生技??174号文/" - ) - assert tosfs.exists( - f"{bucket}/{temporary_workspace}/{os.path.basename(local_temp_dir)}" - f"/生技??174号文/test.txt" - ) - assert tosfs.exists( - f"{bucket}/{temporary_workspace}/{os.path.basename(local_temp_dir)}" - f"/生技**174号文/" - ) - assert tosfs.exists( - f"{bucket}/{temporary_workspace}/{os.path.basename(local_temp_dir)}" - f"/生技_=+&^%#174号文/" - ) - with tosfs.open( - f"{bucket}/{temporary_workspace}/" - f"{os.path.basename(local_temp_dir)}/生技??174号文/test.txt", - mode="r", - ) as file: - assert file.read() == "hello world" + if tosfs._is_fns_bucket(bucket): + with tempfile.TemporaryDirectory() as local_temp_dir: + dir_2 = f"{local_temp_dir}/生技??174号文/" + dir_3 = f"{local_temp_dir}/生技**174号文/" + dir_4 = f"{local_temp_dir}/生技_=+&^%#174号文/" + os.makedirs(dir_2) + os.makedirs(dir_3) + os.makedirs(dir_4) + with open(f"{dir_2}/test.txt", "w") as f: + f.write("hello world") + tosfs.put( + local_temp_dir, + f"{bucket}/{temporary_workspace}", + recursive=True, + disable_glob=True, + ) + assert tosfs.exists( + f"{bucket}/{temporary_workspace}/{os.path.basename(local_temp_dir)}" + f"/生技??174号文/" + ) + assert tosfs.exists( + f"{bucket}/{temporary_workspace}/{os.path.basename(local_temp_dir)}" + f"/生技??174号文/test.txt" + ) + assert tosfs.exists( + f"{bucket}/{temporary_workspace}/{os.path.basename(local_temp_dir)}" + f"/生技**174号文/" + ) + assert tosfs.exists( + f"{bucket}/{temporary_workspace}/{os.path.basename(local_temp_dir)}" + f"/生技_=+&^%#174号文/" + ) + with tosfs.open( + f"{bucket}/{temporary_workspace}/" + f"{os.path.basename(local_temp_dir)}/生技??174号文/test.txt", + mode="r", + ) as file: + assert file.read() == "hello world" # test let special-char dir as the lpath with tempfile.TemporaryDirectory() as local_temp_dir: @@ -996,11 +1000,24 @@ def test_file_write_append( content = "hello world" with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "w") as f: f.write(content) - with pytest.raises(TosServerError): + + if tosfs._is_fns_bucket(bucket): + with pytest.raises(TosServerError): + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "a") as f: + f.write(content) + else: with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "a") as f: f.write(content) + assert tosfs.info(f"{bucket}/{temporary_workspace}/{file_name}")[ + "size" + ] == 2 * len(content) + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "r") as f: + assert f.read() == content + content another_file = random_str() + if tosfs._is_hns_bucket(bucket): + tosfs.touch(f"{bucket}/{temporary_workspace}/{another_file}") + with tosfs.open(f"{bucket}/{temporary_workspace}/{another_file}", "a") as f: f.write(content) with tosfs.open(f"{bucket}/{temporary_workspace}/{another_file}", "a") as f: @@ -1021,11 +1038,20 @@ def test_big_file_append( f.write(content) append_content = "a" * 1024 * 1024 - with pytest.raises(TosServerError): + if tosfs._is_fns_bucket(bucket): + with pytest.raises(TosServerError): + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "a") as f: + f.write(append_content) + else: with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "a") as f: f.write(append_content) + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "r") as f: + assert f.read() == content + append_content + another_file = random_str() + if tosfs._is_hns_bucket(bucket): + tosfs.touch(f"{bucket}/{temporary_workspace}/{another_file}") with tosfs.open(f"{bucket}/{temporary_workspace}/{another_file}", "a") as f: f.write(content) diff --git a/tosfs/utils.py b/tosfs/utils.py index 878a9e5..7f39e5b 100644 --- a/tosfs/utils.py +++ b/tosfs/utils.py @@ -17,7 +17,7 @@ import re import string import tempfile -from typing import Generator, Optional, Tuple +from typing import Generator, Tuple def random_str(length: int = 5) -> str: @@ -99,17 +99,3 @@ def get_brange(size: int, block: int) -> Generator[Tuple[int, int], None, None]: """ for offset in range(0, size, block): yield offset, min(offset + block - 1, size - 1) - - -def is_dir(key: str, prefix: Optional[str]) -> bool: - """Check if the key is a directory. - - Parameters - ---------- - key : str - The key to check. - prefix: str, optional - The key prefix. - - """ - return key.endswith("/") or (key.startswith(prefix) if prefix is not None else True)