Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full support for decompressive transcoding (e.g. Content-Encoding: gzip ) during reading #635

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,85 @@
"custom_time": "customTime",
}

class GCSTranscodingReadAheadCache(fsspec.caching.BaseCache):
"""Cache which reads only when we get beyond a block of data, but with additional support for transcoding.

The default fsspec caches all *require* knowing the full file size ahead of time and will either error or truncate
aggressively if you don't provide meaningful values, so we implement our own cache which is mostly the same
but with special handling for the case where we can't know the file size ahead of time.

In practice, this is done by allowing end=None to mean "read to end". Although that may look strange,
In fsspec ArchiveFileSystem, size = None means the file size is undeterminable, so re-using those semantics and
making end mean 'read to end' for reading/caching it allows for clearly semantically meaningful syntax like
`some_file.read(length=some_file.size)` to work as expected without throwing errors, but still error if you try
something like some_file.size-10, since that's concretely an undeterminable value.

This is a much simpler version of BytesCache, and does not attempt to
fill holes in the cache or keep fragments alive. It is best suited to
many small reads in a sequential order (e.g., reading lines from a file).
"""

name = "gcstranscodingreadahead"

def __init__(self, blocksize: int, fetcher: fsspec.caching.Fetcher, size: int) -> None:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide some examples of how I can pass fetcher? I see it is of type.

Fetcher = Callable[[int, int], bytes]  # Maps (start, end) to bytes

what implementation can one pass? I agree that having unit tests would be good.

super().__init__(blocksize, fetcher, size)
self.cache = b""
self.start = 0
self.end = 0

def _fetch(self, start: int | None, end: int | None) -> bytes:
"""Fetches data from a remote source, rounding up to a minimum block size for the amount of data to fetch
at once. When requesting the full object, caching isn't performed"""
if start is None:
start = 0
# when we use the this cacher, end == None means "read to end"
# whenever end is not None, it means we're requesting a specific range, so we should honor it
if end is not None:
if start is None:
start = 0
if end is None or end > self.size:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if end is None will ever be true since in the outer block we mentioned if end is not None

end = self.size
if start >= self.size or start >= end:
return b""
l = end - start
if start >= self.start and end <= self.end:
# cache hit
self.hit_count += 1
return self.cache[start - self.start: end - self.start]
Comment on lines +108 to +111
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, this took me some time to understand. So the cache only contains one contiguous segment and cannot hold more than one segment, as evidenced by self.start and self.end. Maybe it should be named self.cache_start and self.cache_end?

The cache holds a subset of data as a byte array and stores the cursor location within the actual content. Will it make sense to have multi-segment cache?

elif self.start <= start < self.end:
# partial hit
self.miss_count += 1
part = self.cache[start - self.start:]
l -= len(part)
start = self.end
else:
# miss
self.miss_count += 1
part = b""
end = min(self.size, end + self.blocksize)
self.total_requested_bytes += end - start
self.cache = self.fetcher(start, end) # new block replaces old
self.start = start
self.end = self.start + len(self.cache)
return part + self.cache[:l]
else:
# end=None -> read to end
# this may look wasteful, but it's equivalent to the above
# in the case where end = file.size, which is very common (e.g. open() followed by read())
part = b""
if start >= self.start and self.cache:
# cache hit, we have at least some prefix we don't have to re-download
self.hit_count += 1
part = self.cache[start - self.start:]
start = self.end
self.cache = self.fetcher(start, None)
self.start = start
# since we're discarding previous data, we have to use the length of the cache to update self.end
self.end = self.start + len(self.cache)
return part + self.cache


fsspec.caching.register_cache(GCSTranscodingReadAheadCache)

def quote(s):
"""
Expand Down Expand Up @@ -994,6 +1073,9 @@ async def _info(self, path, generation=None, **kwargs):
exact = await self._get_object(path)
# this condition finds a "placeholder" - still need to check if it's a directory
if exact["size"] or not exact["name"].endswith("/"):
if exact["contentEncoding"] == "gzip":
#if the file is compressed at rest, we can't trust the size returned since that's the compressed size
exact["size"] = None
return exact
except FileNotFoundError:
pass
Expand Down Expand Up @@ -1052,6 +1134,7 @@ def url(self, path):
f"&generation={generation}" if generation else "",
)


async def _cat_file(self, path, start=None, end=None, **kwargs):
"""Simple one-shot get of file data"""
u2 = self.url(path)
Expand Down Expand Up @@ -1581,12 +1664,18 @@ def _open(
if block_size is None:
block_size = self.default_block_size
const = consistency or self.consistency
# if the file is compressed at rest (transcoding), if so, we can't use any of the standard fsspec read caches
# and have to fall back to a custom one which can handle unknown file sizes.
gcs_file_info = self.info(path)
# See GCSTranscodingReadAheadCache for implementation. gcstranscodingreadahead is the fsspec cache registry name.
cache_type = "gcstranscodingreadahead" if gcs_file_info.get("contentEncoding",None) == "gzip" else "readahead"
return GCSFile(
self,
path,
mode,
block_size,
cache_options=cache_options,
cache_type=cache_type,
consistency=const,
metadata=metadata,
acl=acl,
Expand Down Expand Up @@ -1914,6 +2003,41 @@ def _simple_upload(self):
timeout=self.timeout,
)

def read(self, length=-1):
"""
Return data from cache, or fetch pieces as necessary.
This is almost equivalent to AbstractBufferedFile.read, but with special
handling for the case where we can not know the file size in advance (e.g. decompressive transcoding).

Parameters
----------
length: int (-1)
Number of bytes to read; if <0, all remaining bytes.
"""
length = -1 if length is None else int(length)
if self.mode != "rb":
raise ValueError("File not in read mode")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to provide more helpful error message

raise ValueError(f"File '{self.path}' is not in read mode")

if length < 0 and self.size is not None:
length = self.size - self.loc
if self.closed:
raise ValueError("I/O operation on closed file.")
Comment on lines +2022 to +2023
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be at the top? If the file is closed, nothing else should be done

if length == 0:
# don't even bother calling fetch
return b""
out = self.cache._fetch(self.loc, self.loc + length if length > 0 else None)

logger.debug(
"%s read: %i - %s %s",
self,
self.loc,
# this allows the debug message to be similar to the Range: bytes={start}-{end} you may see if
# you are observing the resulting HTTP requests.
str(self.loc + length) if length > 0 else "",
self.cache._log_stats(),
)
self.loc += len(out)
return out

def _fetch_range(self, start=None, end=None):
"""Get data from GCS

Expand Down