Skip to content

Commit

Permalink
Support for decompressive transcoding (e.g. Content-Encoding: gzip )
Browse files Browse the repository at this point in the history
This fixes fsspec#461 and fsspec#233  without needing users to change their existing code.

The bugs were caused by assumptions in fsspec about the veracity and non-ambiguity of 'size' information returned by AbstractBufferedFile subclasses like GCSFile and AbstractFileSystem subclasses like GCSFileSystem  (e.g. `self.size = self.details["size"]` in `AbstractBufferedFile`, which is used by all base caches to truncate requests and responses). Since in GCS if compression-at-rest/compression transcoding is used there's no way to retrieve the real size of the object's *content* without decompressing the whole thing either server or client side, fixing these issues required overriding some behaviors in the underlying base classes. Care was taken to preserve behavior for storage objects not using compression at rest, however.

This commit:
1) adds a read() implementation in GCSFile which allows calls to succeed even when size isn't well-defined. It's 
2) adds a TranscodingReadAheadCache, which is mostly identical to the readahead cache that GCSFile already uses but allows end = None to read until the end of the file, while still handling cached data prefixes.
3) changes FileSystem _info() to set size = None if contentEncoding is gzip.

The fix keeps the data handling for non-gzip GCS files identical, while adding new control flow to detect when transcoding is done and adding some logic for handling those edge-cases. This did unfortunately mean implementing implementing variant methods with only minor changes to how they perform underlying operations (e.g. read() in GCSFile) which were previously just inherited from AbstractBufferedFile.

It does introduce two new semantic changes though. First off, [in line with fsspec's ArchiveFileSystem](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.archive.AbstractArchiveFileSystem.info) semantics, GCSFs will return size = None when the file can not be determined fully in advance.

The only possible performance overhead seen by non-users of compressive decoding is a single HEAD get request done before the point where we create the GCSFile object in GCSFilesystem, because we need to swap out the cache to one compatible with the lack of concrete file size but do not yet have the information to make that control flow decision.
  • Loading branch information
the-xentropy authored Aug 8, 2024
1 parent 5cb4479 commit fc2f801
Showing 1 changed file with 122 additions and 0 deletions.
122 changes: 122 additions & 0 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,85 @@
"custom_time": "customTime",
}

class GCSTranscodingReadAheadCache(fsspec.caching.BaseCache):
"""Cache which reads only when we get beyond a block of data, but with additional support for transcoding.
The default fsspec caches all *require* knowing the full file size ahead of time and will either error or truncate
aggressively if you don't provide meaningful values, so we implement our own cache which is mostly the same
but with special handling for the case where we can't know the file size ahead of time.
In practice, this is done by allowing end=None to mean "read to end". Although that may look strange,
In fsspec ArchiveFileSystem, size = None means the file size is undeterminable, so re-using those semantics and
making end mean 'read to end' for reading/caching it allows for clearly semantically meaningful syntax like
`some_file.read(length=some_file.size)` to work as expected without throwing errors, but still error if you try
something like some_file.size-10, since that's concretely an undeterminable value.
This is a much simpler version of BytesCache, and does not attempt to
fill holes in the cache or keep fragments alive. It is best suited to
many small reads in a sequential order (e.g., reading lines from a file).
"""

name = "gcstranscodingreadahead"

def __init__(self, blocksize: int, fetcher: fsspec.caching.Fetcher, size: int) -> None:
super().__init__(blocksize, fetcher, size)
self.cache = b""
self.start = 0
self.end = 0

def _fetch(self, start: int | None, end: int | None) -> bytes:
"""Fetches data from a remote source, rounding up to a minimum block size for the amount of data to fetch
at once. When requesting the full object, caching isn't performed"""
if start is None:
start = 0
# when we use the this cacher, end == None means "read to end"
# whenever end is not None, it means we're requesting a specific range, so we should honor it
if end is not None:
if start is None:
start = 0
if end is None or end > self.size:
end = self.size
if start >= self.size or start >= end:
return b""
l = end - start
if start >= self.start and end <= self.end:
# cache hit
self.hit_count += 1
return self.cache[start - self.start: end - self.start]
elif self.start <= start < self.end:
# partial hit
self.miss_count += 1
part = self.cache[start - self.start:]
l -= len(part)
start = self.end
else:
# miss
self.miss_count += 1
part = b""
end = min(self.size, end + self.blocksize)
self.total_requested_bytes += end - start
self.cache = self.fetcher(start, end) # new block replaces old
self.start = start
self.end = self.start + len(self.cache)
return part + self.cache[:l]
else:
# end=None -> read to end
# this may look wasteful, but it's equivalent to the above
# in the case where end = file.size, which is very common (e.g. open() followed by read())
part = b""
if start >= self.start and self.cache:
# cache hit, we have at least some prefix we don't have to re-download
self.hit_count += 1
part = self.cache[start - self.start:]
start = self.end
self.cache = self.fetcher(start, None)
self.start = start
# since we're discarding previous data, we have to use the length of the cache to update self.end
self.end = self.start + len(self.cache)
return part + self.cache


fsspec.caching.register_cache(GCSTranscodingReadAheadCache)

def quote(s):
"""
Expand Down Expand Up @@ -994,6 +1073,9 @@ async def _info(self, path, generation=None, **kwargs):
exact = await self._get_object(path)
# this condition finds a "placeholder" - still need to check if it's a directory
if exact["size"] or not exact["name"].endswith("/"):
if exact["contentEncoding"] == "gzip":
#if the file is compressed at rest, we can't trust the size returned since that's the compressed size
exact["size"] = None
return exact
except FileNotFoundError:
pass
Expand Down Expand Up @@ -1052,6 +1134,7 @@ def url(self, path):
f"&generation={generation}" if generation else "",
)


async def _cat_file(self, path, start=None, end=None, **kwargs):
"""Simple one-shot get of file data"""
u2 = self.url(path)
Expand Down Expand Up @@ -1581,12 +1664,18 @@ def _open(
if block_size is None:
block_size = self.default_block_size
const = consistency or self.consistency
# if the file is compressed at rest (transcoding), if so, we can't use any of the standard fsspec read caches
# and have to fall back to a custom one which can handle unknown file sizes.
gcs_file_info = self.info(path)
# See GCSTranscodingReadAheadCache for implementation. gcstranscodingreadahead is the fsspec cache registry name.
cache_type = "gcstranscodingreadahead" if gcs_file_info.get("contentEncoding",None) == "gzip" else "readahead"
return GCSFile(
self,
path,
mode,
block_size,
cache_options=cache_options,
cache_type=cache_type,
consistency=const,
metadata=metadata,
acl=acl,
Expand Down Expand Up @@ -1914,6 +2003,39 @@ def _simple_upload(self):
timeout=self.timeout,
)

def read(self, length=-1):
"""
Return data from cache, or fetch pieces as necessary.
This is almost equivalent to AbstractBufferedFile.read, but with special
handling for the case where we can not know the file size in advance (e.g. decompressive transcoding).
Parameters
----------
length: int (-1)
Number of bytes to read; if <0, all remaining bytes.
"""
length = -1 if length is None else int(length)
if self.mode != "rb":
raise ValueError("File not in read mode")
if length < 0 and self.size is not None:
length = self.size - self.loc
if self.closed:
raise ValueError("I/O operation on closed file.")
if length == 0:
# don't even bother calling fetch
return b""
out = self.cache._fetch(self.loc, self.loc + length if length > 0 else None)

logger.debug(
"%s read: %i - %i %s",
self,
self.loc,
self.loc + length,
self.cache._log_stats(),
)
self.loc += len(out)
return out

def _fetch_range(self, start=None, end=None):
"""Get data from GCS
Expand Down

0 comments on commit fc2f801

Please sign in to comment.