From bc8818b1f767d426654a06fa5e168aad3644615f Mon Sep 17 00:00:00 2001 From: Sam Date: Wed, 7 Aug 2024 22:37:00 -0700 Subject: [PATCH] Support for decompressive transcoding (e.g. Content-Encoding: gzip ) This fixes https://github.com/fsspec/gcsfs/issues/461 and https://github.com/fsspec/gcsfs/issues/233 without needing users to change their existing code. The bugs were caused by assumptions in fsspec about the veracity and non-ambiguity of 'size' information returned by AbstractBufferedFile subclasses like GCSFile and AbstractFileSystem subclasses like GCSFileSystem (e.g. `self.size = self.details["size"]` in `AbstractBufferedFile`, which is used by all base caches to truncate requests and responses). Since in GCS if compression-at-rest/compression transcoding is used there's no way to retrieve the real size of the object's *content* without decompressing the whole thing either server or client side, fixing these issues required overriding some behaviors in the underlying base classes. Care was taken to preserve behavior for storage objects not using compression at rest, however. This commit: 1) adds a read() implementation in GCSFile which allows calls to succeed even when size isn't well-defined. It's 2) adds a TranscodingReadAheadCache, which is mostly identical to the readahead cache that GCSFile already uses but allows end = None to read until the end of the file, while still handling cached data prefixes. 3) changes FileSystem _info() to set size = None if contentEncoding is gzip. 4) changes _cat_file() to fetch information on the object we want to cat, and if it uses compressive transcoding then the resulting GCSFile uses the GCSTranscodingReadAhead cache instead of the incompatible ReadAhead cache. We could probably use the new cache for everything since it should function equivalently for files which have a well-defined size, but this lowers the risk of having missed an edge case. The fix keeps the data handling for GCS files which do not use compression at rest/compressive trnscoding identical, while adding new control flow to detect when transcoding is done and adding some logic for handling those edge-cases. This did unfortunately mean implementing implementing variant methods with only minor changes to how they perform underlying operations (e.g. read() in GCSFile) which were previously just inherited from AbstractBufferedFile. It does introduce one new semantic to GCSFs. [In line with fsspec's ArchiveFileSystem](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.archive.AbstractArchiveFileSystem.info) semantics, GCSFs will return size = None when the file can not be determined fully in advance. This allows us to distinguish known zero size andunknown size, which was a major issue. The only new performance overhead seen by non-users of compressive decoding is a single info() call resulting in a HEAD request done before the point where we create the GCSFile object in GCSFilesystem, because we need to swap out the cache to one compatible with the lack of concrete file size but do not yet have the information to make that control flow decision. This means we make two requests instead of one. We can probably switch to the new transcoding cache wholesale in the future when we have faith it holds up to eliminate this call though, but I made it work this way to keep the data and control flow the same for the base case where users are not using compressive transcoding. Since the compression-at-rest case was completely broken, doing it this way means that even if these changes end up disasterous (it shouldn't though) it'll only break something which is already broken. --- gcsfs/core.py | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/gcsfs/core.py b/gcsfs/core.py index 6fd05eae..66fa636f 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -64,6 +64,85 @@ "custom_time": "customTime", } +class GCSTranscodingReadAheadCache(fsspec.caching.BaseCache): + """Cache which reads only when we get beyond a block of data, but with additional support for transcoding. + + The default fsspec caches all *require* knowing the full file size ahead of time and will either error or truncate + aggressively if you don't provide meaningful values, so we implement our own cache which is mostly the same + but with special handling for the case where we can't know the file size ahead of time. + + In practice, this is done by allowing end=None to mean "read to end". Although that may look strange, + In fsspec ArchiveFileSystem, size = None means the file size is undeterminable, so re-using those semantics and + making end mean 'read to end' for reading/caching it allows for clearly semantically meaningful syntax like + `some_file.read(length=some_file.size)` to work as expected without throwing errors, but still error if you try + something like some_file.size-10, since that's concretely an undeterminable value. + + This is a much simpler version of BytesCache, and does not attempt to + fill holes in the cache or keep fragments alive. It is best suited to + many small reads in a sequential order (e.g., reading lines from a file). + """ + + name = "gcstranscodingreadahead" + + def __init__(self, blocksize: int, fetcher: fsspec.caching.Fetcher, size: int) -> None: + super().__init__(blocksize, fetcher, size) + self.cache = b"" + self.start = 0 + self.end = 0 + + def _fetch(self, start: int | None, end: int | None) -> bytes: + """Fetches data from a remote source, rounding up to a minimum block size for the amount of data to fetch + at once. When requesting the full object, caching isn't performed""" + if start is None: + start = 0 + # when we use the this cacher, end == None means "read to end" + # whenever end is not None, it means we're requesting a specific range, so we should honor it + if end is not None: + if start is None: + start = 0 + if end is None or end > self.size: + end = self.size + if start >= self.size or start >= end: + return b"" + l = end - start + if start >= self.start and end <= self.end: + # cache hit + self.hit_count += 1 + return self.cache[start - self.start: end - self.start] + elif self.start <= start < self.end: + # partial hit + self.miss_count += 1 + part = self.cache[start - self.start:] + l -= len(part) + start = self.end + else: + # miss + self.miss_count += 1 + part = b"" + end = min(self.size, end + self.blocksize) + self.total_requested_bytes += end - start + self.cache = self.fetcher(start, end) # new block replaces old + self.start = start + self.end = self.start + len(self.cache) + return part + self.cache[:l] + else: + # end=None -> read to end + # this may look wasteful, but it's equivalent to the above + # in the case where end = file.size, which is very common (e.g. open() followed by read()) + part = b"" + if start >= self.start and self.cache: + # cache hit, we have at least some prefix we don't have to re-download + self.hit_count += 1 + part = self.cache[start - self.start:] + start = self.end + self.cache = self.fetcher(start, None) + self.start = start + # since we're discarding previous data, we have to use the length of the cache to update self.end + self.end = self.start + len(self.cache) + return part + self.cache + + +fsspec.caching.register_cache(GCSTranscodingReadAheadCache) def quote(s): """ @@ -994,6 +1073,9 @@ async def _info(self, path, generation=None, **kwargs): exact = await self._get_object(path) # this condition finds a "placeholder" - still need to check if it's a directory if exact["size"] or not exact["name"].endswith("/"): + if exact["contentEncoding"] == "gzip": + #if the file is compressed at rest, we can't trust the size returned since that's the compressed size + exact["size"] = None return exact except FileNotFoundError: pass @@ -1052,6 +1134,7 @@ def url(self, path): f"&generation={generation}" if generation else "", ) + async def _cat_file(self, path, start=None, end=None, **kwargs): """Simple one-shot get of file data""" u2 = self.url(path) @@ -1581,12 +1664,18 @@ def _open( if block_size is None: block_size = self.default_block_size const = consistency or self.consistency + # if the file is compressed at rest (transcoding), if so, we can't use any of the standard fsspec read caches + # and have to fall back to a custom one which can handle unknown file sizes. + gcs_file_info = self.info(path) + # See GCSTranscodingReadAheadCache for implementation. gcstranscodingreadahead is the fsspec cache registry name. + cache_type = "gcstranscodingreadahead" if gcs_file_info.get("contentEncoding",None) == "gzip" else "readahead" return GCSFile( self, path, mode, block_size, cache_options=cache_options, + cache_type=cache_type, consistency=const, metadata=metadata, acl=acl, @@ -1914,6 +2003,39 @@ def _simple_upload(self): timeout=self.timeout, ) + def read(self, length=-1): + """ + Return data from cache, or fetch pieces as necessary. + This is almost equivalent to AbstractBufferedFile.read, but with special + handling for the case where we can not know the file size in advance (e.g. decompressive transcoding). + + Parameters + ---------- + length: int (-1) + Number of bytes to read; if <0, all remaining bytes. + """ + length = -1 if length is None else int(length) + if self.mode != "rb": + raise ValueError("File not in read mode") + if length < 0 and self.size is not None: + length = self.size - self.loc + if self.closed: + raise ValueError("I/O operation on closed file.") + if length == 0: + # don't even bother calling fetch + return b"" + out = self.cache._fetch(self.loc, self.loc + length if length > 0 else None) + + logger.debug( + "%s read: %i - %i %s", + self, + self.loc, + self.loc + length, + self.cache._log_stats(), + ) + self.loc += len(out) + return out + def _fetch_range(self, start=None, end=None): """Get data from GCS