diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index bfea605ee2..865c728a84 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -1,9 +1,23 @@ import io +import gzip import mimetypes import pathlib import posixpath from io import BytesIO -from typing import cast, Tuple, TypedDict, Optional, Union, Iterator, Any, IO, Dict, Callable +from typing import ( + Literal, + cast, + Tuple, + TypedDict, + Optional, + Union, + Iterator, + Any, + IO, + Dict, + Callable, + Sequence, +) from urllib.parse import urlparse from fsspec import AbstractFileSystem @@ -28,6 +42,7 @@ class FileItem(TypedDict, total=False): file_url: str file_name: str mime_type: str + encoding: Optional[str] modification_date: pendulum.DateTime size_in_bytes: int file_content: Optional[bytes] @@ -157,23 +172,48 @@ def fsspec(self) -> AbstractFileSystem: else: return fsspec_filesystem(self["file_url"], self.credentials)[0] - def open(self, mode: str = "rb", **kwargs: Any) -> IO[Any]: # noqa: A003 + def open( # noqa: A003 + self, + mode: str = "rb", + compression: Literal["auto", "disable", "enable"] = "auto", + **kwargs: Any, + ) -> IO[Any]: """Open the file as a fsspec file. This method opens the file represented by this dictionary as a file-like object using the fsspec library. Args: + mode (Optional[str]): Open mode. + compression (Optional[str]): A flag to enable/disable compression. + Can have one of three values: "disable" - no compression applied, + "enable" - gzip compression applied, "auto" (default) - + compression applied only for files compressed with gzip. **kwargs (Any): The arguments to pass to the fsspec open function. Returns: IOBase: The fsspec file. """ + if compression == "auto": + compression_arg = "gzip" if self["encoding"] == "gzip" else None + elif compression == "enable": + compression_arg = "gzip" + elif compression == "disable": + compression_arg = None + else: + raise ValueError("""The argument `compression` must have one of the following values: + "auto", "enable", "disable".""") + opened_file: IO[Any] - # if the user has already extracted the content, we use it so there will be no need to + # if the user has already extracted the content, we use it so there is no need to # download the file again. if "file_content" in self: - bytes_io = BytesIO(self["file_content"]) + content = ( + gzip.decompress(self["file_content"]) + if compression_arg == "gzip" + else self["file_content"] + ) + bytes_io = BytesIO(content) if "t" not in mode: return bytes_io @@ -185,7 +225,9 @@ def open(self, mode: str = "rb", **kwargs: Any) -> IO[Any]: # noqa: A003 **text_kwargs, ) else: - opened_file = self.fsspec.open(self["file_url"], mode=mode, **kwargs) + opened_file = self.fsspec.open( + self["file_url"], mode=mode, compression=compression_arg, **kwargs + ) return opened_file def read_bytes(self) -> bytes: @@ -194,7 +236,6 @@ def read_bytes(self) -> bytes: Returns: bytes: The file content. """ - content: bytes return ( # type: ignore self["file_content"] if "file_content" in self and self["file_content"] is not None @@ -202,10 +243,13 @@ def read_bytes(self) -> bytes: ) -def guess_mime_type(file_name: str) -> str: - return mimetypes.guess_type(posixpath.basename(file_name), strict=False)[ - 0 - ] or "application/" + (posixpath.splitext(file_name)[1][1:] or "octet-stream") +def guess_mime_type(file_name: str) -> Sequence[str]: + type_ = list(mimetypes.guess_type(posixpath.basename(file_name), strict=False)) + + if not type_[0]: + type_[0] = "application/" + (posixpath.splitext(file_name)[1][1:] or "octet-stream") + + return type_ def glob_files( @@ -237,22 +281,26 @@ def glob_files( glob_result = fs_client.glob(filter_url, detail=True) if isinstance(glob_result, list): raise NotImplementedError( - "Cannot request details when using fsspec.glob. For ADSL (Azure) please use version" + "Cannot request details when using fsspec.glob. For adlfs (Azure) please use version" " 2023.9.0 or later" ) for file, md in glob_result.items(): if md["type"] != "file": continue + # make that absolute path on a file:// if bucket_url_parsed.scheme == "file" and not file.startswith("/"): file = f"/{file}" file_name = posixpath.relpath(file, bucket_path) file_url = f"{bucket_url_parsed.scheme}://{file}" + + mime_type, encoding = guess_mime_type(file_name) yield FileItem( file_name=file_name, file_url=file_url, - mime_type=guess_mime_type(file_name), + mime_type=mime_type, + encoding=encoding, modification_date=MTIME_DISPATCH[bucket_url_parsed.scheme](md), size_in_bytes=int(md["size"]), ) diff --git a/tests/common/storages/samples/gzip/taxi.csv.gz b/tests/common/storages/samples/gzip/taxi.csv.gz new file mode 100644 index 0000000000..184c0740d4 Binary files /dev/null and b/tests/common/storages/samples/gzip/taxi.csv.gz differ diff --git a/tests/common/storages/test_local_filesystem.py b/tests/common/storages/test_local_filesystem.py index 3827535fbd..ea6adec2c7 100644 --- a/tests/common/storages/test_local_filesystem.py +++ b/tests/common/storages/test_local_filesystem.py @@ -4,7 +4,7 @@ import pathlib from dlt.common.storages import fsspec_from_config, FilesystemConfiguration -from dlt.common.storages.fsspec_filesystem import glob_files +from dlt.common.storages.fsspec_filesystem import FileItemDict, glob_files from tests.common.storages.utils import assert_sample_files @@ -32,3 +32,25 @@ def test_filesystem_dict_local(bucket_url: str, load_content: bool) -> None: assert_sample_files(all_file_items, filesystem, config, load_content) except NotImplementedError as ex: pytest.skip("Skipping due to " + str(ex)) + + +def test_filesystem_decompress() -> None: + config = FilesystemConfiguration(bucket_url=TEST_SAMPLE_FILES) + filesystem, _ = fsspec_from_config(config) + gzip_files = list(glob_files(filesystem, TEST_SAMPLE_FILES, "**/*.gz")) + assert len(gzip_files) > 0 + for file in gzip_files: + assert file["encoding"] == "gzip" + assert file["file_name"].endswith(".gz") + file_dict = FileItemDict(file, filesystem) + # read as is (compressed gzip) + with file_dict.open(compression="disable") as f: + assert f.read() == file_dict.read_bytes() + # read as uncompressed text + with file_dict.open(mode="tr") as f: + lines = f.readlines() + assert len(lines) > 1 + assert lines[0].startswith('"1200864931","2015-07-01 00:00:13"') + # read as uncompressed binary + with file_dict.open(compression="enable") as f: + assert f.read().startswith(b'"1200864931","2015-07-01 00:00:13"') diff --git a/tests/common/storages/utils.py b/tests/common/storages/utils.py index d8a0e16998..3146642536 100644 --- a/tests/common/storages/utils.py +++ b/tests/common/storages/utils.py @@ -1,4 +1,5 @@ import pytest +import gzip from typing import List, Sequence, Tuple from fsspec import AbstractFileSystem @@ -40,16 +41,19 @@ def assert_sample_files( "met_csv/A803/A803_20230919.csv", "met_csv/A803/A803_20230920.csv", "parquet/mlb_players.parquet", + "gzip/taxi.csv.gz", "sample.txt", } assert len(all_file_items) >= 10 - assert set([item["file_name"] for item in all_file_items]) >= minimally_expected_file_items for item in all_file_items: - # only run tests on file items we know - if item["file_name"] not in minimally_expected_file_items: + # skip pseudo files that look like folders + if item["file_url"].endswith("/"): continue + # only accept file items we know + assert item["file_name"] in minimally_expected_file_items + assert isinstance(item["file_name"], str) assert item["file_url"].endswith(item["file_name"]) assert item["file_url"].startswith(config.protocol) @@ -66,15 +70,20 @@ def assert_sample_files( dict_content = file_dict.read_bytes() assert content == dict_content with file_dict.open() as f: - assert content == f.read() + # content will be decompressed for gzip encoding + if item["encoding"] == "gzip": + content = gzip.decompress(content) + open_content = f.read() + assert content == open_content # read via various readers - print(item) if item["mime_type"] == "text/csv": # parse csv with file_dict.open(mode="rt") as f: from csv import DictReader - elements = list(DictReader(f)) + # fieldnames below are not really correct but allow to load first 3 columns + # even if first row does not have header names + elements = list(DictReader(f, fieldnames=["A", "B", "C"])) assert len(elements) > 0 if item["mime_type"] == "application/parquet": # verify it is a real parquet @@ -86,6 +95,8 @@ def assert_sample_files( lines = f_txt.readlines() assert len(lines) >= 1 assert isinstance(lines[0], str) + if item["file_name"].endswith(".gz"): + assert item["encoding"] == "gzip" def start_loading_file(