diff --git a/gcsfs/core.py b/gcsfs/core.py index 6fd05eae..54d94f19 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -64,6 +64,85 @@ "custom_time": "customTime", } +class GCSTranscodingReadAheadCache(fsspec.caching.BaseCache): + """Cache which reads only when we get beyond a block of data, but with additional support for transcoding. + + The default fsspec caches all *require* knowing the full file size ahead of time and will either error or truncate + aggressively if you don't provide meaningful values, so we implement our own cache which is mostly the same + but with special handling for the case where we can't know the file size ahead of time. + + In practice, this is done by allowing end=None to mean "read to end". Although that may look strange, + In fsspec ArchiveFileSystem, size = None means the file size is undeterminable, so re-using those semantics and + making end mean 'read to end' for reading/caching it allows for clearly semantically meaningful syntax like + `some_file.read(length=some_file.size)` to work as expected without throwing errors, but still error if you try + something like some_file.size-10, since that's concretely an undeterminable value. + + This is a much simpler version of BytesCache, and does not attempt to + fill holes in the cache or keep fragments alive. It is best suited to + many small reads in a sequential order (e.g., reading lines from a file). + """ + + name = "gcstranscodingreadahead" + + def __init__(self, blocksize: int, fetcher: fsspec.caching.Fetcher, size: int) -> None: + super().__init__(blocksize, fetcher, size) + self.cache = b"" + self.start = 0 + self.end = 0 + + def _fetch(self, start: int | None, end: int | None) -> bytes: + """Fetches data from a remote source, rounding up to a minimum block size for the amount of data to fetch + at once. When requesting the full object, caching isn't performed""" + if start is None: + start = 0 + # when we use the this cacher, end == None means "read to end" + # whenever end is not None, it means we're requesting a specific range, so we should honor it + if end is not None: + if start is None: + start = 0 + if end is None or end > self.size: + end = self.size + if start >= self.size or start >= end: + return b"" + l = end - start + if start >= self.start and end <= self.end: + # cache hit + self.hit_count += 1 + return self.cache[start - self.start: end - self.start] + elif self.start <= start < self.end: + # partial hit + self.miss_count += 1 + part = self.cache[start - self.start:] + l -= len(part) + start = self.end + else: + # miss + self.miss_count += 1 + part = b"" + end = min(self.size, end + self.blocksize) + self.total_requested_bytes += end - start + self.cache = self.fetcher(start, end) # new block replaces old + self.start = start + self.end = self.start + len(self.cache) + return part + self.cache[:l] + else: + # end=None -> read to end + # this may look wasteful, but it's equivalent to the above + # in the case where end = file.size, which is very common (e.g. open() followed by read()) + part = b"" + if start >= self.start and self.cache: + # cache hit, we have at least some prefix we don't have to re-download + self.hit_count += 1 + part = self.cache[start - self.start:] + start = self.end + self.cache = self.fetcher(start, None) + self.start = start + # since we're discarding previous data, we have to use the length of the cache to update self.end + self.end = self.start + len(self.cache) + return part + self.cache + + +fsspec.caching.register_cache(GCSTranscodingReadAheadCache) def quote(s): """ @@ -994,6 +1073,9 @@ async def _info(self, path, generation=None, **kwargs): exact = await self._get_object(path) # this condition finds a "placeholder" - still need to check if it's a directory if exact["size"] or not exact["name"].endswith("/"): + if exact["contentEncoding"] == "gzip": + #if the file is compressed at rest, we can't trust the size returned since that's the compressed size + exact["size"] = None return exact except FileNotFoundError: pass @@ -1052,6 +1134,7 @@ def url(self, path): f"&generation={generation}" if generation else "", ) + async def _cat_file(self, path, start=None, end=None, **kwargs): """Simple one-shot get of file data""" u2 = self.url(path) @@ -1581,12 +1664,18 @@ def _open( if block_size is None: block_size = self.default_block_size const = consistency or self.consistency + # if the file is compressed at rest (transcoding), if so, we can't use any of the standard fsspec read caches + # and have to fall back to a custom one which can handle unknown file sizes. + gcs_file_info = self.info(path) + # See GCSTranscodingReadAheadCache for implementation. gcstranscodingreadahead is the fsspec cache registry name. + cache_type = "gcstranscodingreadahead" if gcs_file_info.get("contentEncoding",None) == "gzip" else "readahead" return GCSFile( self, path, mode, block_size, cache_options=cache_options, + cache_type=cache_type, consistency=const, metadata=metadata, acl=acl, @@ -1914,6 +2003,41 @@ def _simple_upload(self): timeout=self.timeout, ) + def read(self, length=-1): + """ + Return data from cache, or fetch pieces as necessary. + This is almost equivalent to AbstractBufferedFile.read, but with special + handling for the case where we can not know the file size in advance (e.g. decompressive transcoding). + + Parameters + ---------- + length: int (-1) + Number of bytes to read; if <0, all remaining bytes. + """ + length = -1 if length is None else int(length) + if self.mode != "rb": + raise ValueError("File not in read mode") + if length < 0 and self.size is not None: + length = self.size - self.loc + if self.closed: + raise ValueError("I/O operation on closed file.") + if length == 0: + # don't even bother calling fetch + return b"" + out = self.cache._fetch(self.loc, self.loc + length if length > 0 else None) + + logger.debug( + "%s read: %i - %s %s", + self, + self.loc, + # this allows the debug message to be similar to the Range: bytes={start}-{end} you may see if + # you are observing the resulting HTTP requests. + str(self.loc + length) if length > 0 else "", + self.cache._log_stats(), + ) + self.loc += len(out) + return out + def _fetch_range(self, start=None, end=None): """Get data from GCS