Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for cloudflare's R2 storage #888

Merged
merged 12 commits into from
Sep 30, 2024
50 changes: 20 additions & 30 deletions s3fs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2331,41 +2331,31 @@ def _upload_chunk(self, final=False):
and self.tell() < self.blocksize
):
# only happens when closing small file, use on-shot PUT
data1 = False
pass
else:
self.buffer.seek(0)
(data0, data1) = (None, self.buffer.read(self.blocksize))

while data1:
(data0, data1) = (data1, self.buffer.read(self.blocksize))
data1_size = len(data1)

if 0 < data1_size < self.blocksize:
remainder = data0 + data1
remainder_size = self.blocksize + data1_size

if remainder_size <= self.part_max:
(data0, data1) = (remainder, None)
else:
partition = remainder_size // 2
(data0, data1) = (remainder[:partition], remainder[partition:])

part = len(self.parts) + 1
logger.debug("Upload chunk %s, %s" % (self, part))
# previously last small chunk was merged into previous one
# (and in some cases two chunks were equalized)
# However, R2 imposes additional restriction that all but last chunk
# are of same size, and last chunk can't be larger.
while data := self.buffer.read(self.blocksize):
martindurant marked this conversation as resolved.
Show resolved Hide resolved
part = len(self.parts) + 1
logger.debug("Upload chunk %s, %s" % (self, part))

out = self._call_s3(
"upload_part",
Bucket=bucket,
PartNumber=part,
UploadId=self.mpu["UploadId"],
Body=data0,
Key=key,
)
out = self._call_s3(
"upload_part",
Bucket=bucket,
PartNumber=part,
UploadId=self.mpu["UploadId"],
Body=data,
Key=key,
)

part_header = {"PartNumber": part, "ETag": out["ETag"]}
if "ChecksumSHA256" in out:
part_header["ChecksumSHA256"] = out["ChecksumSHA256"]
self.parts.append(part_header)
part_header = {"PartNumber": part, "ETag": out["ETag"]}
if "ChecksumSHA256" in out:
part_header["ChecksumSHA256"] = out["ChecksumSHA256"]
self.parts.append(part_header)

if self.autocommit and final:
self.commit()
Expand Down
Loading