From 77fe0910e91f5972a780d792ec75fdafa953c253 Mon Sep 17 00:00:00 2001 From: Alex Rogozhnikov Date: Wed, 31 Jul 2024 21:19:06 -0700 Subject: [PATCH 01/12] add support for cloudflare's R2 storage --- s3fs/core.py | 50 ++++++++++++++++++++------------------------------ 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 2da6f0bd..aec14ba0 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -2331,41 +2331,31 @@ def _upload_chunk(self, final=False): and self.tell() < self.blocksize ): # only happens when closing small file, use on-shot PUT - data1 = False + pass else: self.buffer.seek(0) - (data0, data1) = (None, self.buffer.read(self.blocksize)) - - while data1: - (data0, data1) = (data1, self.buffer.read(self.blocksize)) - data1_size = len(data1) - - if 0 < data1_size < self.blocksize: - remainder = data0 + data1 - remainder_size = self.blocksize + data1_size - - if remainder_size <= self.part_max: - (data0, data1) = (remainder, None) - else: - partition = remainder_size // 2 - (data0, data1) = (remainder[:partition], remainder[partition:]) - part = len(self.parts) + 1 - logger.debug("Upload chunk %s, %s" % (self, part)) + # previously last small chunk was merged into previous one + # (and in some cases two chunks were equalized) + # However, R2 imposes additional restriction that all but last chunk + # are of same size, and last chunk can't be larger. + while data := self.buffer.read(self.blocksize): + part = len(self.parts) + 1 + logger.debug("Upload chunk %s, %s" % (self, part)) - out = self._call_s3( - "upload_part", - Bucket=bucket, - PartNumber=part, - UploadId=self.mpu["UploadId"], - Body=data0, - Key=key, - ) + out = self._call_s3( + "upload_part", + Bucket=bucket, + PartNumber=part, + UploadId=self.mpu["UploadId"], + Body=data, + Key=key, + ) - part_header = {"PartNumber": part, "ETag": out["ETag"]} - if "ChecksumSHA256" in out: - part_header["ChecksumSHA256"] = out["ChecksumSHA256"] - self.parts.append(part_header) + part_header = {"PartNumber": part, "ETag": out["ETag"]} + if "ChecksumSHA256" in out: + part_header["ChecksumSHA256"] = out["ChecksumSHA256"] + self.parts.append(part_header) if self.autocommit and final: self.commit() From 02765ca7fcbc89b93005394b0c3eb514a9c2c023 Mon Sep 17 00:00:00 2001 From: Alex Rogozhnikov Date: Fri, 2 Aug 2024 10:10:03 -0700 Subject: [PATCH 02/12] remove trailing whitespace --- s3fs/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3fs/core.py b/s3fs/core.py index aec14ba0..bd0006e2 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -2335,7 +2335,7 @@ def _upload_chunk(self, final=False): else: self.buffer.seek(0) - # previously last small chunk was merged into previous one + # previously last small chunk was merged into previous one # (and in some cases two chunks were equalized) # However, R2 imposes additional restriction that all but last chunk # are of same size, and last chunk can't be larger. From 45091933a56e8d5858952b806673f813169dc779 Mon Sep 17 00:00:00 2001 From: Alex Rogozhnikov Date: Fri, 2 Aug 2024 18:03:08 -0700 Subject: [PATCH 03/12] account for multiple calls of upload_chunk --- s3fs/core.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index bd0006e2..09b0ceac 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -2161,6 +2161,7 @@ def __init__( raise ValueError("ACL not in %s", key_acls) self.mpu = None self.parts = None + self.awaits_upload = b'' # for multi-part uploads self.fill_cache = fill_cache self.s3_additional_kwargs = s3_additional_kwargs or {} self.req_kw = {"RequestPayer": "requester"} if requester_pays else {} @@ -2323,12 +2324,13 @@ def _upload_chunk(self, final=False): logger.debug( "Upload for %s, final=%s, loc=%s, buffer loc=%s" % (self, final, self.loc, self.buffer.tell()) - ) + ) if ( self.autocommit and not self.append_block and final and self.tell() < self.blocksize + and not self.awaits_upload ): # only happens when closing small file, use on-shot PUT pass @@ -2339,7 +2341,10 @@ def _upload_chunk(self, final=False): # (and in some cases two chunks were equalized) # However, R2 imposes additional restriction that all but last chunk # are of same size, and last chunk can't be larger. - while data := self.buffer.read(self.blocksize): + + def upload_part(part_data: bytes): + if len(part_data) == 0: + return part = len(self.parts) + 1 logger.debug("Upload chunk %s, %s" % (self, part)) @@ -2348,7 +2353,7 @@ def _upload_chunk(self, final=False): Bucket=bucket, PartNumber=part, UploadId=self.mpu["UploadId"], - Body=data, + Body=part_data, Key=key, ) @@ -2357,6 +2362,19 @@ def _upload_chunk(self, final=False): part_header["ChecksumSHA256"] = out["ChecksumSHA256"] self.parts.append(part_header) + + while received := self.buffer.read(self.blocksize): + # send in chunks + self.awaits_upload += received + while len(self.awaits_upload) >= self.blocksize: + upload_part(self.awaits_upload[:self.blocksize]) + self.awaits_upload = self.awaits_upload[self.blocksize:] + + if final: + # send anything in the buffer + upload_part(self.awaits_upload) + self.awaits_upload = b'' + if self.autocommit and final: self.commit() return not final From b13c2d9ac9ef699689a5bb8b87365ddabea7c3d2 Mon Sep 17 00:00:00 2001 From: Alex Rogozhnikov Date: Fri, 2 Aug 2024 18:11:30 -0700 Subject: [PATCH 04/12] satisfy pre-commit requirements --- s3fs/core.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 09b0ceac..3a52e320 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -2161,7 +2161,7 @@ def __init__( raise ValueError("ACL not in %s", key_acls) self.mpu = None self.parts = None - self.awaits_upload = b'' # for multi-part uploads + self.awaits_upload = b"" # for multi-part uploads self.fill_cache = fill_cache self.s3_additional_kwargs = s3_additional_kwargs or {} self.req_kw = {"RequestPayer": "requester"} if requester_pays else {} @@ -2324,7 +2324,7 @@ def _upload_chunk(self, final=False): logger.debug( "Upload for %s, final=%s, loc=%s, buffer loc=%s" % (self, final, self.loc, self.buffer.tell()) - ) + ) if ( self.autocommit and not self.append_block @@ -2362,18 +2362,17 @@ def upload_part(part_data: bytes): part_header["ChecksumSHA256"] = out["ChecksumSHA256"] self.parts.append(part_header) - while received := self.buffer.read(self.blocksize): - # send in chunks + # send in chunks of same size self.awaits_upload += received while len(self.awaits_upload) >= self.blocksize: - upload_part(self.awaits_upload[:self.blocksize]) - self.awaits_upload = self.awaits_upload[self.blocksize:] - + upload_part(self.awaits_upload[: self.blocksize]) + self.awaits_upload = self.awaits_upload[self.blocksize :] + if final: # send anything in the buffer upload_part(self.awaits_upload) - self.awaits_upload = b'' + self.awaits_upload = b"" if self.autocommit and final: self.commit() From 39534068b3afae74504c6656bb779ac436eebb0c Mon Sep 17 00:00:00 2001 From: Alex Rogozhnikov Date: Sat, 3 Aug 2024 15:15:02 -0700 Subject: [PATCH 05/12] use `fixed_upload_size` argument to switch between old and new behavior --- s3fs/core.py | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 3a52e320..3fa97dd4 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -296,6 +296,7 @@ def __init__( asynchronous=False, loop=None, max_concurrency=1, + fixed_upload_size: int | None = None, **kwargs, ): if key and username: @@ -333,6 +334,9 @@ def __init__( self.cache_regions = cache_regions self._s3 = None self.session = session + if fixed_upload_size is not None and fixed_upload_size < self.blocksize: + raise ValueError(f"{fixed_upload_size=} is less than {self.blocksize=}") + self.fixed_upload_size = fixed_upload_size if max_concurrency < 1: raise ValueError("max_concurrency must be >= 1") self.max_concurrency = max_concurrency @@ -2161,7 +2165,6 @@ def __init__( raise ValueError("ACL not in %s", key_acls) self.mpu = None self.parts = None - self.awaits_upload = b"" # for multi-part uploads self.fill_cache = fill_cache self.s3_additional_kwargs = s3_additional_kwargs or {} self.req_kw = {"RequestPayer": "requester"} if requester_pays else {} @@ -2330,18 +2333,12 @@ def _upload_chunk(self, final=False): and not self.append_block and final and self.tell() < self.blocksize - and not self.awaits_upload ): # only happens when closing small file, use on-shot PUT pass else: self.buffer.seek(0) - # previously last small chunk was merged into previous one - # (and in some cases two chunks were equalized) - # However, R2 imposes additional restriction that all but last chunk - # are of same size, and last chunk can't be larger. - def upload_part(part_data: bytes): if len(part_data) == 0: return @@ -2362,17 +2359,18 @@ def upload_part(part_data: bytes): part_header["ChecksumSHA256"] = out["ChecksumSHA256"] self.parts.append(part_header) - while received := self.buffer.read(self.blocksize): - # send in chunks of same size - self.awaits_upload += received - while len(self.awaits_upload) >= self.blocksize: - upload_part(self.awaits_upload[: self.blocksize]) - self.awaits_upload = self.awaits_upload[self.blocksize :] - - if final: - # send anything in the buffer - upload_part(self.awaits_upload) - self.awaits_upload = b"" + def n_bytes_left() -> int: + return len(self.buffer.getbuffer()) - self.buffer.tell() + + if self.fs.fixed_upload_size is not None: + # all chunks have fixed size, exception: last one can be smaller + min_chunk = 1 if final else self.fs.fixed_upload_size + while n_bytes_left() >= self.fs.fixed_upload_size: + upload_part(self.buffer.read(self.fixed_upload_size)) + else: + min_chunk = 1 if final else self.blocksize + while n_bytes_left() >= min_chunk: + upload_part(self.buffer.read(self.part_max)) if self.autocommit and final: self.commit() From 270930861c5484a5b59cae7235a90ec1cdd910f7 Mon Sep 17 00:00:00 2001 From: Alex Rogozhnikov Date: Tue, 6 Aug 2024 13:33:06 -0700 Subject: [PATCH 06/12] use boolean fixed_upload_size --- s3fs/core.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 3fa97dd4..a238c13a 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -296,7 +296,7 @@ def __init__( asynchronous=False, loop=None, max_concurrency=1, - fixed_upload_size: int | None = None, + fixed_upload_size: bool = False, **kwargs, ): if key and username: @@ -334,8 +334,6 @@ def __init__( self.cache_regions = cache_regions self._s3 = None self.session = session - if fixed_upload_size is not None and fixed_upload_size < self.blocksize: - raise ValueError(f"{fixed_upload_size=} is less than {self.blocksize=}") self.fixed_upload_size = fixed_upload_size if max_concurrency < 1: raise ValueError("max_concurrency must be >= 1") @@ -2362,13 +2360,12 @@ def upload_part(part_data: bytes): def n_bytes_left() -> int: return len(self.buffer.getbuffer()) - self.buffer.tell() - if self.fs.fixed_upload_size is not None: + min_chunk = 1 if final else self.blocksize + if self.fs.fixed_upload_size: # all chunks have fixed size, exception: last one can be smaller - min_chunk = 1 if final else self.fs.fixed_upload_size - while n_bytes_left() >= self.fs.fixed_upload_size: - upload_part(self.buffer.read(self.fixed_upload_size)) + while n_bytes_left() >= min_chunk: + upload_part(self.buffer.read(self.blocksize)) else: - min_chunk = 1 if final else self.blocksize while n_bytes_left() >= min_chunk: upload_part(self.buffer.read(self.part_max)) From 6058a5c0ac750de535386142bd3a9a87615497ba Mon Sep 17 00:00:00 2001 From: Alex Rogozhnikov Date: Thu, 8 Aug 2024 09:44:13 -0700 Subject: [PATCH 07/12] add fixed_upload_size to docstring --- s3fs/core.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/s3fs/core.py b/s3fs/core.py index a238c13a..3a403c72 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -248,6 +248,9 @@ class S3FileSystem(AsyncFileSystem): this parameter to affect ``pipe()``, ``cat()`` and ``get()``. Increasing this value will result in higher memory usage during multipart upload operations (by ``max_concurrency * chunksize`` bytes per file). + fixed_upload_size : bool (False) + Use same chunk size for all parts in multipart upload (last part can be smaller). + Cloudflare R2 storage requires fixed_upload_size=True for multipart uploads. The following parameters are passed on to fsspec: From 34eeabc25c227b7e14906cf4e2b24ccce6b4b0f5 Mon Sep 17 00:00:00 2001 From: Alex Rogozhnikov Date: Sun, 22 Sep 2024 01:07:02 -0700 Subject: [PATCH 08/12] typo --- s3fs/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3fs/core.py b/s3fs/core.py index 3a403c72..558d790c 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -2335,7 +2335,7 @@ def _upload_chunk(self, final=False): and final and self.tell() < self.blocksize ): - # only happens when closing small file, use on-shot PUT + # only happens when closing small file, use one-shot PUT pass else: self.buffer.seek(0) From 61fa0e26d01da918a5876a974c15dff13c4765b2 Mon Sep 17 00:00:00 2001 From: Alex Rogozhnikov Date: Sun, 22 Sep 2024 01:08:02 -0700 Subject: [PATCH 09/12] manually recreate buffer in multipart upload, do not rely on fsspec.flush --- s3fs/core.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/s3fs/core.py b/s3fs/core.py index 558d790c..43acd603 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import asyncio import errno +import io import logging import mimetypes import os @@ -2374,7 +2375,14 @@ def n_bytes_left() -> int: if self.autocommit and final: self.commit() - return not final + else: + # update 'upload offset' + self.offset += self.buffer.tell() + # create new smaller buffer, seek to file end + self.buffer = io.BytesIO(self.buffer.read()) + self.buffer.seek(0, 2) + + return False # instruct fsspec.flush to NOT clear self.buffer def commit(self): logger.debug("Commit %s" % self) From ccb8698c7ab10da4241193441605159f7d220712 Mon Sep 17 00:00:00 2001 From: Alex Rogozhnikov Date: Sun, 22 Sep 2024 01:09:08 -0700 Subject: [PATCH 10/12] add test for fs with fixed_upload_size=True --- s3fs/tests/test_s3fs.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/s3fs/tests/test_s3fs.py b/s3fs/tests/test_s3fs.py index d3d90899..2a2df2cf 100644 --- a/s3fs/tests/test_s3fs.py +++ b/s3fs/tests/test_s3fs.py @@ -884,6 +884,9 @@ def test_seek(s3): with s3.open(a, "wb") as f: f.write(b"123") + with s3.open(a) as f: + assert f.read() == b"123" + with s3.open(a) as f: f.seek(1000) with pytest.raises(ValueError): @@ -2749,3 +2752,32 @@ def test_bucket_versioning(s3): assert s3.is_bucket_versioned("maybe_versioned") s3.make_bucket_versioned("maybe_versioned", False) assert not s3.is_bucket_versioned("maybe_versioned") + + +@pytest.fixture() +def s3_fixed_upload_size(s3): + s3_fixed = S3FileSystem( + anon=False, + client_kwargs={"endpoint_url": endpoint_uri}, + fixed_upload_size=True, + ) + s3_fixed.invalidate_cache() + yield s3_fixed + + +def test_upload_parts(s3_fixed_upload_size): + with s3_fixed_upload_size.open(a, "wb", block_size=6_000_000) as f: + f.write(b" " * 6_001_000) + assert len(f.buffer.getbuffer()) == 1000 + # check we are at the right position + assert f.tell() == 6_001_000 + # offset is introduced in fsspec.core, but never used. + # apparently it should keep offset for part that is already uploaded + assert f.offset == 6_000_000 + f.write(b" " * 6_001_000) + assert len(f.buffer.getbuffer()) == 2000 + assert f.tell() == 2 * 6_001_000 + assert f.offset == 2 * 6_000_000 + + with s3_fixed_upload_size.open(a, "r") as f: + assert len(f.read()) == 6_001_000 * 2 From 1f4370f8383a25ca9766a30a26679a93fea8c84e Mon Sep 17 00:00:00 2001 From: Alex Rogozhnikov Date: Tue, 24 Sep 2024 10:23:34 -0700 Subject: [PATCH 11/12] add a test with prime pad sizes to exclude cases of divisibility by block_size, min_upload_size --- s3fs/tests/test_s3fs.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/s3fs/tests/test_s3fs.py b/s3fs/tests/test_s3fs.py index 2a2df2cf..2275519a 100644 --- a/s3fs/tests/test_s3fs.py +++ b/s3fs/tests/test_s3fs.py @@ -2781,3 +2781,21 @@ def test_upload_parts(s3_fixed_upload_size): with s3_fixed_upload_size.open(a, "r") as f: assert len(f.read()) == 6_001_000 * 2 + + +def test_upload_part_with_prime_pads(s3_fixed_upload_size): + block = 6_000_000 + pad1, pad2 = 1013, 1019 # prime pad sizes to exclude divisibility + with s3_fixed_upload_size.open(a, "wb", block_size=block) as f: + f.write(b" " * (block + pad1)) + assert len(f.buffer.getbuffer()) == pad1 + # check we are at the right position + assert f.tell() == block + pad1 + assert f.offset == block + f.write(b" " * (block + pad2)) + assert len(f.buffer.getbuffer()) == pad1 + pad2 + assert f.tell() == 2 * block + pad1 + pad2 + assert f.offset == 2 * block + + with s3_fixed_upload_size.open(a, "r") as f: + assert len(f.read()) == 2 * block + pad1 + pad2 From 7c31604ec8eb340daf36907a9ccf3200b83f4e49 Mon Sep 17 00:00:00 2001 From: Alex Rogozhnikov Date: Sun, 29 Sep 2024 09:22:53 -0700 Subject: [PATCH 12/12] add Martin's patch --- .github/workflows/ci.yml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d5d28eee..99d11beb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,11 +24,10 @@ jobs: fetch-depth: 0 - name: Setup conda - uses: mamba-org/setup-micromamba@v1 + uses: conda-incubator/setup-miniconda@v3 with: environment-file: ci/env.yaml - create-args: >- - python=${{ matrix.PY }} + python-version: ${{ matrix.PY }} - name: Install shell: bash -l {0}