Skip to content

Commit

Permalink
Support for decompressive transcoding (e.g. Content-Encoding: gzip )
Browse files Browse the repository at this point in the history
This fixes fsspec#461 and fsspec#233  without needing users to change their existing code.

The bugs were caused by assumptions in fsspec about the veracity and non-ambiguity of 'size' information returned by AbstractBufferedFile subclasses like GCSFile and AbstractFileSystem subclasses like GCSFileSystem  (e.g. `self.size = self.details["size"]` in `AbstractBufferedFile`, which is used by all base caches to truncate requests and responses). Since in GCS if compression-at-rest/compression transcoding is used there's no way to retrieve the real size of the object's *content* without decompressing the whole thing either server or client side, fixing these issues required overriding some behaviors in the underlying base classes. Care was taken to preserve behavior for storage objects not using compression at rest, however.

This commit:
1) adds a read() implementation in GCSFile which allows calls to succeed even when size isn't well-defined. It's
2) adds a TranscodingReadAheadCache, which is mostly identical to the readahead cache that GCSFile already uses but allows end = None to read until the end of the file, while still handling cached data prefixes.
3) changes FileSystem _info() to set size = None if contentEncoding is gzip.
4) changes _cat_file() to fetch information on the object we want to
   cat, and if it uses compressive transcoding then the resulting
   GCSFile uses the GCSTranscodingReadAhead cache instead of the incompatible ReadAhead
   cache. We could probably use the new cache for everything since it
   should function equivalently for files which have a well-defined size,
   but this lowers the risk of having missed an edge case.

The fix keeps the data handling for GCS files which do not use compression at rest/compressive trnscoding identical, while adding new control flow to detect when transcoding is done and adding some logic for handling those edge-cases. This did unfortunately mean implementing implementing variant methods with only minor changes to how they perform underlying operations (e.g. read() in GCSFile) which were previously just inherited from AbstractBufferedFile.

It does introduce one new semantic to GCSFs. [In line with fsspec's ArchiveFileSystem](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.archive.AbstractArchiveFileSystem.info) semantics, GCSFs will return size = None when the file can not be determined fully in advance. This allows us to distinguish known zero size andunknown size, which was a major issue.

The only new performance overhead seen by non-users of compressive decoding is a single info() call resulting in a HEAD request done before the point where we create the GCSFile object in GCSFilesystem, because we need to swap out the cache to one compatible with the lack of concrete file size but do not yet have the information to make that control flow decision. This means we make two requests instead of one. We can probably switch to the new transcoding cache wholesale in the future when we have faith it holds up to eliminate this call though, but I made it work this way to keep the data and control flow the same for the base case where users are not using compressive transcoding. Since the compression-at-rest case was completely broken, doing it this way means that even if these changes end up disasterous (it shouldn't though) it'll only break something which is already broken.
  • Loading branch information
the-xentropy committed Aug 8, 2024
1 parent 5cb4479 commit bc8818b
Showing 1 changed file with 122 additions and 0 deletions.
122 changes: 122 additions & 0 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,85 @@
"custom_time": "customTime",
}

class GCSTranscodingReadAheadCache(fsspec.caching.BaseCache):
"""Cache which reads only when we get beyond a block of data, but with additional support for transcoding.
The default fsspec caches all *require* knowing the full file size ahead of time and will either error or truncate
aggressively if you don't provide meaningful values, so we implement our own cache which is mostly the same
but with special handling for the case where we can't know the file size ahead of time.
In practice, this is done by allowing end=None to mean "read to end". Although that may look strange,
In fsspec ArchiveFileSystem, size = None means the file size is undeterminable, so re-using those semantics and
making end mean 'read to end' for reading/caching it allows for clearly semantically meaningful syntax like
`some_file.read(length=some_file.size)` to work as expected without throwing errors, but still error if you try
something like some_file.size-10, since that's concretely an undeterminable value.
This is a much simpler version of BytesCache, and does not attempt to
fill holes in the cache or keep fragments alive. It is best suited to
many small reads in a sequential order (e.g., reading lines from a file).
"""

name = "gcstranscodingreadahead"

def __init__(self, blocksize: int, fetcher: fsspec.caching.Fetcher, size: int) -> None:
super().__init__(blocksize, fetcher, size)
self.cache = b""
self.start = 0
self.end = 0

def _fetch(self, start: int | None, end: int | None) -> bytes:
"""Fetches data from a remote source, rounding up to a minimum block size for the amount of data to fetch
at once. When requesting the full object, caching isn't performed"""
if start is None:
start = 0
# when we use the this cacher, end == None means "read to end"
# whenever end is not None, it means we're requesting a specific range, so we should honor it
if end is not None:
if start is None:
start = 0
if end is None or end > self.size:
end = self.size
if start >= self.size or start >= end:
return b""
l = end - start
if start >= self.start and end <= self.end:
# cache hit
self.hit_count += 1
return self.cache[start - self.start: end - self.start]
elif self.start <= start < self.end:
# partial hit
self.miss_count += 1
part = self.cache[start - self.start:]
l -= len(part)
start = self.end
else:
# miss
self.miss_count += 1
part = b""
end = min(self.size, end + self.blocksize)
self.total_requested_bytes += end - start
self.cache = self.fetcher(start, end) # new block replaces old
self.start = start
self.end = self.start + len(self.cache)
return part + self.cache[:l]
else:
# end=None -> read to end
# this may look wasteful, but it's equivalent to the above
# in the case where end = file.size, which is very common (e.g. open() followed by read())
part = b""
if start >= self.start and self.cache:
# cache hit, we have at least some prefix we don't have to re-download
self.hit_count += 1
part = self.cache[start - self.start:]
start = self.end
self.cache = self.fetcher(start, None)
self.start = start
# since we're discarding previous data, we have to use the length of the cache to update self.end
self.end = self.start + len(self.cache)
return part + self.cache


fsspec.caching.register_cache(GCSTranscodingReadAheadCache)

def quote(s):
"""
Expand Down Expand Up @@ -994,6 +1073,9 @@ async def _info(self, path, generation=None, **kwargs):
exact = await self._get_object(path)
# this condition finds a "placeholder" - still need to check if it's a directory
if exact["size"] or not exact["name"].endswith("/"):
if exact["contentEncoding"] == "gzip":
#if the file is compressed at rest, we can't trust the size returned since that's the compressed size
exact["size"] = None
return exact
except FileNotFoundError:
pass
Expand Down Expand Up @@ -1052,6 +1134,7 @@ def url(self, path):
f"&generation={generation}" if generation else "",
)


async def _cat_file(self, path, start=None, end=None, **kwargs):
"""Simple one-shot get of file data"""
u2 = self.url(path)
Expand Down Expand Up @@ -1581,12 +1664,18 @@ def _open(
if block_size is None:
block_size = self.default_block_size
const = consistency or self.consistency
# if the file is compressed at rest (transcoding), if so, we can't use any of the standard fsspec read caches
# and have to fall back to a custom one which can handle unknown file sizes.
gcs_file_info = self.info(path)
# See GCSTranscodingReadAheadCache for implementation. gcstranscodingreadahead is the fsspec cache registry name.
cache_type = "gcstranscodingreadahead" if gcs_file_info.get("contentEncoding",None) == "gzip" else "readahead"
return GCSFile(
self,
path,
mode,
block_size,
cache_options=cache_options,
cache_type=cache_type,
consistency=const,
metadata=metadata,
acl=acl,
Expand Down Expand Up @@ -1914,6 +2003,39 @@ def _simple_upload(self):
timeout=self.timeout,
)

def read(self, length=-1):
"""
Return data from cache, or fetch pieces as necessary.
This is almost equivalent to AbstractBufferedFile.read, but with special
handling for the case where we can not know the file size in advance (e.g. decompressive transcoding).
Parameters
----------
length: int (-1)
Number of bytes to read; if <0, all remaining bytes.
"""
length = -1 if length is None else int(length)
if self.mode != "rb":
raise ValueError("File not in read mode")
if length < 0 and self.size is not None:
length = self.size - self.loc
if self.closed:
raise ValueError("I/O operation on closed file.")
if length == 0:
# don't even bother calling fetch
return b""
out = self.cache._fetch(self.loc, self.loc + length if length > 0 else None)

logger.debug(
"%s read: %i - %i %s",
self,
self.loc,
self.loc + length,
self.cache._log_stats(),
)
self.loc += len(out)
return out

def _fetch_range(self, start=None, end=None):
"""Get data from GCS
Expand Down

0 comments on commit bc8818b

Please sign in to comment.