diff --git a/tosfs/core.py b/tosfs/core.py index 9584bb1..2969308 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -2012,7 +2012,10 @@ def __init__( self.buffer: Optional[io.BytesIO] = io.BytesIO() if "a" in mode and fs.exists(path): - head = self.fs.tos_client.head_object(bucket, key) + head = retryable_func_executor( + lambda: self.fs.tos_client.head_object(bucket, key), + max_retry_num=self.fs.max_retry_num, + ) loc = head.content_length if loc < APPEND_OPERATION_SMALL_FILE_THRESHOLD: @@ -2030,16 +2033,22 @@ def _initiate_upload(self) -> None: logger.debug("Initiate upload for %s", self) self.parts = [] - self.mpu = self.fs.tos_client.create_multipart_upload(self.bucket, self.key) + self.mpu = retryable_func_executor( + lambda: self.fs.tos_client.create_multipart_upload(self.bucket, self.key), + max_retry_num=self.fs.max_retry_num, + ) if self.append_block: # use existing data in key when appending, # and block is big enough - out = self.fs.tos_client.upload_part_copy( - bucket=self.bucket, - key=self.key, - part_number=1, - upload_id=self.mpu.upload_id, + out = retryable_func_executor( + lambda: self.fs.tos_client.upload_part_copy( + bucket=self.bucket, + key=self.key, + part_number=1, + upload_id=self.mpu.upload_id, + ), + max_retry_num=self.fs.max_retry_num, ) self.parts.append({"PartNumber": out.part_number, "ETag": out.etag}) @@ -2123,12 +2132,19 @@ def handle_remainder( part = len(self.parts) + 1 if self.parts is not None else 1 logger.debug("Upload chunk %s, %s", self, part) - out: UploadPartOutput = self.fs.tos_client.upload_part( - bucket=bucket, - key=key, - part_number=part, - upload_id=self.mpu.upload_id, - content=previous_chunk, + def _call_upload_part( + part: int = part, previous_chunk: Optional[bytes] = previous_chunk + ) -> UploadPartOutput: + self.fs.tos_client.upload_part( + bucket=bucket, + key=key, + part_number=part, + upload_id=self.mpu.upload_id, + content=previous_chunk, + ) + + out = retryable_func_executor( + _call_upload_part, max_retry_num=self.fs.max_retry_num ) ( @@ -2179,15 +2195,24 @@ def commit(self) -> None: logger.debug("One-shot upload of %s", self) self.buffer.seek(0) data = self.buffer.read() - write_result = self.fs.tos_client.put_object( - self.bucket, self.key, content=data + write_result = retryable_func_executor( + lambda: self.fs.tos_client.put_object( + self.bucket, self.key, content=data + ), + max_retry_num=self.fs.max_retry_num, ) else: raise RuntimeError else: logger.debug("Complete multi-part upload for %s ", self) - write_result = self.fs.tos_client.complete_multipart_upload( - self.bucket, self.key, upload_id=self.mpu.upload_id, parts=self.parts + write_result = retryable_func_executor( + lambda: self.fs.tos_client.complete_multipart_upload( + self.bucket, + self.key, + upload_id=self.mpu.upload_id, + parts=self.parts, + ), + max_retry_num=self.fs.max_retry_num, ) if self.fs.version_aware: @@ -2202,7 +2227,10 @@ def discard(self) -> None: def _abort_mpu(self) -> None: if self.mpu: - self.fs.tos_client.abort_multipart_upload( - self.bucket, self.key, self.mpu.upload_id + retryable_func_executor( + lambda: self.fs.tos_client.abort_multipart_upload( + self.bucket, self.key, self.mpu.upload_id + ), + max_retry_num=self.fs.max_retry_num, ) self.mpu = None diff --git a/tosfs/tests/test_stability.py b/tosfs/tests/test_stability.py new file mode 100644 index 0000000..e243faa --- /dev/null +++ b/tosfs/tests/test_stability.py @@ -0,0 +1,13 @@ +# ByteDance Volcengine EMR, Copyright 2024. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License.