From 0fa9cbe8274f3b0e673339c39ffd3e329c206924 Mon Sep 17 00:00:00 2001 From: Ilya Gurov Date: Tue, 30 Jan 2024 22:30:48 +0400 Subject: [PATCH] feat(filesystem): add compression flag if the read file is GZ (#912) * feat(filesystem): add compression flag if the read file is GZ * skips pseudo files, applies compression flag consistently * make glob test strict on file list --------- Co-authored-by: Marcin Rudolf --- dlt/common/storages/fsspec_filesystem.py | 72 +++++++++++++++--- .../common/storages/samples/gzip/taxi.csv.gz | Bin 0 -> 899 bytes .../common/storages/test_local_filesystem.py | 24 +++++- tests/common/storages/utils.py | 23 ++++-- 4 files changed, 100 insertions(+), 19 deletions(-) create mode 100644 tests/common/storages/samples/gzip/taxi.csv.gz diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index bfea605ee2..865c728a84 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -1,9 +1,23 @@ import io +import gzip import mimetypes import pathlib import posixpath from io import BytesIO -from typing import cast, Tuple, TypedDict, Optional, Union, Iterator, Any, IO, Dict, Callable +from typing import ( + Literal, + cast, + Tuple, + TypedDict, + Optional, + Union, + Iterator, + Any, + IO, + Dict, + Callable, + Sequence, +) from urllib.parse import urlparse from fsspec import AbstractFileSystem @@ -28,6 +42,7 @@ class FileItem(TypedDict, total=False): file_url: str file_name: str mime_type: str + encoding: Optional[str] modification_date: pendulum.DateTime size_in_bytes: int file_content: Optional[bytes] @@ -157,23 +172,48 @@ def fsspec(self) -> AbstractFileSystem: else: return fsspec_filesystem(self["file_url"], self.credentials)[0] - def open(self, mode: str = "rb", **kwargs: Any) -> IO[Any]: # noqa: A003 + def open( # noqa: A003 + self, + mode: str = "rb", + compression: Literal["auto", "disable", "enable"] = "auto", + **kwargs: Any, + ) -> IO[Any]: """Open the file as a fsspec file. This method opens the file represented by this dictionary as a file-like object using the fsspec library. Args: + mode (Optional[str]): Open mode. + compression (Optional[str]): A flag to enable/disable compression. + Can have one of three values: "disable" - no compression applied, + "enable" - gzip compression applied, "auto" (default) - + compression applied only for files compressed with gzip. **kwargs (Any): The arguments to pass to the fsspec open function. Returns: IOBase: The fsspec file. """ + if compression == "auto": + compression_arg = "gzip" if self["encoding"] == "gzip" else None + elif compression == "enable": + compression_arg = "gzip" + elif compression == "disable": + compression_arg = None + else: + raise ValueError("""The argument `compression` must have one of the following values: + "auto", "enable", "disable".""") + opened_file: IO[Any] - # if the user has already extracted the content, we use it so there will be no need to + # if the user has already extracted the content, we use it so there is no need to # download the file again. if "file_content" in self: - bytes_io = BytesIO(self["file_content"]) + content = ( + gzip.decompress(self["file_content"]) + if compression_arg == "gzip" + else self["file_content"] + ) + bytes_io = BytesIO(content) if "t" not in mode: return bytes_io @@ -185,7 +225,9 @@ def open(self, mode: str = "rb", **kwargs: Any) -> IO[Any]: # noqa: A003 **text_kwargs, ) else: - opened_file = self.fsspec.open(self["file_url"], mode=mode, **kwargs) + opened_file = self.fsspec.open( + self["file_url"], mode=mode, compression=compression_arg, **kwargs + ) return opened_file def read_bytes(self) -> bytes: @@ -194,7 +236,6 @@ def read_bytes(self) -> bytes: Returns: bytes: The file content. """ - content: bytes return ( # type: ignore self["file_content"] if "file_content" in self and self["file_content"] is not None @@ -202,10 +243,13 @@ def read_bytes(self) -> bytes: ) -def guess_mime_type(file_name: str) -> str: - return mimetypes.guess_type(posixpath.basename(file_name), strict=False)[ - 0 - ] or "application/" + (posixpath.splitext(file_name)[1][1:] or "octet-stream") +def guess_mime_type(file_name: str) -> Sequence[str]: + type_ = list(mimetypes.guess_type(posixpath.basename(file_name), strict=False)) + + if not type_[0]: + type_[0] = "application/" + (posixpath.splitext(file_name)[1][1:] or "octet-stream") + + return type_ def glob_files( @@ -237,22 +281,26 @@ def glob_files( glob_result = fs_client.glob(filter_url, detail=True) if isinstance(glob_result, list): raise NotImplementedError( - "Cannot request details when using fsspec.glob. For ADSL (Azure) please use version" + "Cannot request details when using fsspec.glob. For adlfs (Azure) please use version" " 2023.9.0 or later" ) for file, md in glob_result.items(): if md["type"] != "file": continue + # make that absolute path on a file:// if bucket_url_parsed.scheme == "file" and not file.startswith("/"): file = f"/{file}" file_name = posixpath.relpath(file, bucket_path) file_url = f"{bucket_url_parsed.scheme}://{file}" + + mime_type, encoding = guess_mime_type(file_name) yield FileItem( file_name=file_name, file_url=file_url, - mime_type=guess_mime_type(file_name), + mime_type=mime_type, + encoding=encoding, modification_date=MTIME_DISPATCH[bucket_url_parsed.scheme](md), size_in_bytes=int(md["size"]), ) diff --git a/tests/common/storages/samples/gzip/taxi.csv.gz b/tests/common/storages/samples/gzip/taxi.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..184c0740d47bfefe12c14ed8728d63fa59a4620a GIT binary patch literal 899 zcmV-}1AP1+iwFo2P_tzK19V|{X)a@Pb^w)CU2EG&6n*cnFnJz{x!*H>YfYCFQdrj8 zUHT{*(TEx;m6ewK`@JK{Nolmu;tyHlk$uj&=Nw%D#+Z~d!Bx1z2vS9>BJ5(^`xEHJ zD)64m-pZ)XIkFOpCX&p7V{NPes+3?^ns-@-WD&spAoH%oY9cuSvrv#gwQvA`57A!` zVgRXAI2^CT`|frczW3F7+#QF9=`MV@zi-FgzwLS29lP81)m7mU!I`8@9{O?>_hKs# zbFhgWuc3+-g#J5fmQXU9(ZVA|WXzB=UU(>*`n4v=3{D_G%@TAb5&dB`g~MNO!|QIm zAI9l&oY4aNRT8(*;>%TleY~K}Ni1_zql5`&JsqU$X=D`H94yx{azd~Io>tMxIGJV4 zTHw3PwR24u&Fid1cBoBf-QPB@?LXb=)O>EkEcALZqet?Y8*ZWJ4NkIWozWLy=+H$a zgJoNQC8C%ZqPiVc^y&ivb;O0xY%B-uW@=1b>q-iANtO}Jbry>#??EC+_xLvaosn(llMqZ(C&iSrrG;DB zq|sa;F6&WC!fU_yDJZgz@Nnw-Y3R2@zYI#r4Uo5hZ>M`0|Hh<#)13eb6gSf5q&KD|d%(-I1RT7(4_x@A zdQriPPRSd3r7Y<2Q(;R2%Sg>PfH$3A4mMNus6Lmot=?Q(cy5Z)!quS}`}T9!?yfzB z;o~rV`Q{(igt;Bu4O<*=BX{}7#z^P|n@YK@gLCgvBqx?jxvhPNZbNTzi?ozt^gm>O z_j8oFoDKfJ?|v;j;Y~C7+T-reX8f`{bkk#XGkkXu-Z%Z7f7SFeSwgQg-HaE%92oK* z^{Gx=_BAHB^tP2ODVin`StbVF;X+EL+3%lozTC6Eg><>e`c1V)&skpS`kAxEh3ZY$ Ze;Q6tW_*2~hOuj2{RgnA?CcB(006}u&7}YU literal 0 HcmV?d00001 diff --git a/tests/common/storages/test_local_filesystem.py b/tests/common/storages/test_local_filesystem.py index 3827535fbd..ea6adec2c7 100644 --- a/tests/common/storages/test_local_filesystem.py +++ b/tests/common/storages/test_local_filesystem.py @@ -4,7 +4,7 @@ import pathlib from dlt.common.storages import fsspec_from_config, FilesystemConfiguration -from dlt.common.storages.fsspec_filesystem import glob_files +from dlt.common.storages.fsspec_filesystem import FileItemDict, glob_files from tests.common.storages.utils import assert_sample_files @@ -32,3 +32,25 @@ def test_filesystem_dict_local(bucket_url: str, load_content: bool) -> None: assert_sample_files(all_file_items, filesystem, config, load_content) except NotImplementedError as ex: pytest.skip("Skipping due to " + str(ex)) + + +def test_filesystem_decompress() -> None: + config = FilesystemConfiguration(bucket_url=TEST_SAMPLE_FILES) + filesystem, _ = fsspec_from_config(config) + gzip_files = list(glob_files(filesystem, TEST_SAMPLE_FILES, "**/*.gz")) + assert len(gzip_files) > 0 + for file in gzip_files: + assert file["encoding"] == "gzip" + assert file["file_name"].endswith(".gz") + file_dict = FileItemDict(file, filesystem) + # read as is (compressed gzip) + with file_dict.open(compression="disable") as f: + assert f.read() == file_dict.read_bytes() + # read as uncompressed text + with file_dict.open(mode="tr") as f: + lines = f.readlines() + assert len(lines) > 1 + assert lines[0].startswith('"1200864931","2015-07-01 00:00:13"') + # read as uncompressed binary + with file_dict.open(compression="enable") as f: + assert f.read().startswith(b'"1200864931","2015-07-01 00:00:13"') diff --git a/tests/common/storages/utils.py b/tests/common/storages/utils.py index d8a0e16998..3146642536 100644 --- a/tests/common/storages/utils.py +++ b/tests/common/storages/utils.py @@ -1,4 +1,5 @@ import pytest +import gzip from typing import List, Sequence, Tuple from fsspec import AbstractFileSystem @@ -40,16 +41,19 @@ def assert_sample_files( "met_csv/A803/A803_20230919.csv", "met_csv/A803/A803_20230920.csv", "parquet/mlb_players.parquet", + "gzip/taxi.csv.gz", "sample.txt", } assert len(all_file_items) >= 10 - assert set([item["file_name"] for item in all_file_items]) >= minimally_expected_file_items for item in all_file_items: - # only run tests on file items we know - if item["file_name"] not in minimally_expected_file_items: + # skip pseudo files that look like folders + if item["file_url"].endswith("/"): continue + # only accept file items we know + assert item["file_name"] in minimally_expected_file_items + assert isinstance(item["file_name"], str) assert item["file_url"].endswith(item["file_name"]) assert item["file_url"].startswith(config.protocol) @@ -66,15 +70,20 @@ def assert_sample_files( dict_content = file_dict.read_bytes() assert content == dict_content with file_dict.open() as f: - assert content == f.read() + # content will be decompressed for gzip encoding + if item["encoding"] == "gzip": + content = gzip.decompress(content) + open_content = f.read() + assert content == open_content # read via various readers - print(item) if item["mime_type"] == "text/csv": # parse csv with file_dict.open(mode="rt") as f: from csv import DictReader - elements = list(DictReader(f)) + # fieldnames below are not really correct but allow to load first 3 columns + # even if first row does not have header names + elements = list(DictReader(f, fieldnames=["A", "B", "C"])) assert len(elements) > 0 if item["mime_type"] == "application/parquet": # verify it is a real parquet @@ -86,6 +95,8 @@ def assert_sample_files( lines = f_txt.readlines() assert len(lines) >= 1 assert isinstance(lines[0], str) + if item["file_name"].endswith(".gz"): + assert item["encoding"] == "gzip" def start_loading_file(