Skip to content

Commit

Permalink
feat(filesystem): add compression flag if the read file is GZ (#912)
Browse files Browse the repository at this point in the history
* feat(filesystem): add compression flag if the read file is GZ

* skips pseudo files, applies compression flag consistently

* make glob test strict on file list

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
IlyaFaer and rudolfix authored Jan 30, 2024
1 parent 93ddd19 commit 0fa9cbe
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 19 deletions.
72 changes: 60 additions & 12 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
import io
import gzip
import mimetypes
import pathlib
import posixpath
from io import BytesIO
from typing import cast, Tuple, TypedDict, Optional, Union, Iterator, Any, IO, Dict, Callable
from typing import (
Literal,
cast,
Tuple,
TypedDict,
Optional,
Union,
Iterator,
Any,
IO,
Dict,
Callable,
Sequence,
)
from urllib.parse import urlparse

from fsspec import AbstractFileSystem
Expand All @@ -28,6 +42,7 @@ class FileItem(TypedDict, total=False):
file_url: str
file_name: str
mime_type: str
encoding: Optional[str]
modification_date: pendulum.DateTime
size_in_bytes: int
file_content: Optional[bytes]
Expand Down Expand Up @@ -157,23 +172,48 @@ def fsspec(self) -> AbstractFileSystem:
else:
return fsspec_filesystem(self["file_url"], self.credentials)[0]

def open(self, mode: str = "rb", **kwargs: Any) -> IO[Any]: # noqa: A003
def open( # noqa: A003
self,
mode: str = "rb",
compression: Literal["auto", "disable", "enable"] = "auto",
**kwargs: Any,
) -> IO[Any]:
"""Open the file as a fsspec file.
This method opens the file represented by this dictionary as a file-like object using
the fsspec library.
Args:
mode (Optional[str]): Open mode.
compression (Optional[str]): A flag to enable/disable compression.
Can have one of three values: "disable" - no compression applied,
"enable" - gzip compression applied, "auto" (default) -
compression applied only for files compressed with gzip.
**kwargs (Any): The arguments to pass to the fsspec open function.
Returns:
IOBase: The fsspec file.
"""
if compression == "auto":
compression_arg = "gzip" if self["encoding"] == "gzip" else None
elif compression == "enable":
compression_arg = "gzip"
elif compression == "disable":
compression_arg = None
else:
raise ValueError("""The argument `compression` must have one of the following values:
"auto", "enable", "disable".""")

opened_file: IO[Any]
# if the user has already extracted the content, we use it so there will be no need to
# if the user has already extracted the content, we use it so there is no need to
# download the file again.
if "file_content" in self:
bytes_io = BytesIO(self["file_content"])
content = (
gzip.decompress(self["file_content"])
if compression_arg == "gzip"
else self["file_content"]
)
bytes_io = BytesIO(content)

if "t" not in mode:
return bytes_io
Expand All @@ -185,7 +225,9 @@ def open(self, mode: str = "rb", **kwargs: Any) -> IO[Any]: # noqa: A003
**text_kwargs,
)
else:
opened_file = self.fsspec.open(self["file_url"], mode=mode, **kwargs)
opened_file = self.fsspec.open(
self["file_url"], mode=mode, compression=compression_arg, **kwargs
)
return opened_file

def read_bytes(self) -> bytes:
Expand All @@ -194,18 +236,20 @@ def read_bytes(self) -> bytes:
Returns:
bytes: The file content.
"""
content: bytes
return ( # type: ignore
self["file_content"]
if "file_content" in self and self["file_content"] is not None
else self.fsspec.read_bytes(self["file_url"])
)


def guess_mime_type(file_name: str) -> str:
return mimetypes.guess_type(posixpath.basename(file_name), strict=False)[
0
] or "application/" + (posixpath.splitext(file_name)[1][1:] or "octet-stream")
def guess_mime_type(file_name: str) -> Sequence[str]:
type_ = list(mimetypes.guess_type(posixpath.basename(file_name), strict=False))

if not type_[0]:
type_[0] = "application/" + (posixpath.splitext(file_name)[1][1:] or "octet-stream")

return type_


def glob_files(
Expand Down Expand Up @@ -237,22 +281,26 @@ def glob_files(
glob_result = fs_client.glob(filter_url, detail=True)
if isinstance(glob_result, list):
raise NotImplementedError(
"Cannot request details when using fsspec.glob. For ADSL (Azure) please use version"
"Cannot request details when using fsspec.glob. For adlfs (Azure) please use version"
" 2023.9.0 or later"
)

for file, md in glob_result.items():
if md["type"] != "file":
continue

# make that absolute path on a file://
if bucket_url_parsed.scheme == "file" and not file.startswith("/"):
file = f"/{file}"
file_name = posixpath.relpath(file, bucket_path)
file_url = f"{bucket_url_parsed.scheme}://{file}"

mime_type, encoding = guess_mime_type(file_name)
yield FileItem(
file_name=file_name,
file_url=file_url,
mime_type=guess_mime_type(file_name),
mime_type=mime_type,
encoding=encoding,
modification_date=MTIME_DISPATCH[bucket_url_parsed.scheme](md),
size_in_bytes=int(md["size"]),
)
Binary file added tests/common/storages/samples/gzip/taxi.csv.gz
Binary file not shown.
24 changes: 23 additions & 1 deletion tests/common/storages/test_local_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pathlib

from dlt.common.storages import fsspec_from_config, FilesystemConfiguration
from dlt.common.storages.fsspec_filesystem import glob_files
from dlt.common.storages.fsspec_filesystem import FileItemDict, glob_files

from tests.common.storages.utils import assert_sample_files

Expand Down Expand Up @@ -32,3 +32,25 @@ def test_filesystem_dict_local(bucket_url: str, load_content: bool) -> None:
assert_sample_files(all_file_items, filesystem, config, load_content)
except NotImplementedError as ex:
pytest.skip("Skipping due to " + str(ex))


def test_filesystem_decompress() -> None:
config = FilesystemConfiguration(bucket_url=TEST_SAMPLE_FILES)
filesystem, _ = fsspec_from_config(config)
gzip_files = list(glob_files(filesystem, TEST_SAMPLE_FILES, "**/*.gz"))
assert len(gzip_files) > 0
for file in gzip_files:
assert file["encoding"] == "gzip"
assert file["file_name"].endswith(".gz")
file_dict = FileItemDict(file, filesystem)
# read as is (compressed gzip)
with file_dict.open(compression="disable") as f:
assert f.read() == file_dict.read_bytes()
# read as uncompressed text
with file_dict.open(mode="tr") as f:
lines = f.readlines()
assert len(lines) > 1
assert lines[0].startswith('"1200864931","2015-07-01 00:00:13"')
# read as uncompressed binary
with file_dict.open(compression="enable") as f:
assert f.read().startswith(b'"1200864931","2015-07-01 00:00:13"')
23 changes: 17 additions & 6 deletions tests/common/storages/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import gzip
from typing import List, Sequence, Tuple
from fsspec import AbstractFileSystem

Expand Down Expand Up @@ -40,16 +41,19 @@ def assert_sample_files(
"met_csv/A803/A803_20230919.csv",
"met_csv/A803/A803_20230920.csv",
"parquet/mlb_players.parquet",
"gzip/taxi.csv.gz",
"sample.txt",
}

assert len(all_file_items) >= 10
assert set([item["file_name"] for item in all_file_items]) >= minimally_expected_file_items

for item in all_file_items:
# only run tests on file items we know
if item["file_name"] not in minimally_expected_file_items:
# skip pseudo files that look like folders
if item["file_url"].endswith("/"):
continue
# only accept file items we know
assert item["file_name"] in minimally_expected_file_items

assert isinstance(item["file_name"], str)
assert item["file_url"].endswith(item["file_name"])
assert item["file_url"].startswith(config.protocol)
Expand All @@ -66,15 +70,20 @@ def assert_sample_files(
dict_content = file_dict.read_bytes()
assert content == dict_content
with file_dict.open() as f:
assert content == f.read()
# content will be decompressed for gzip encoding
if item["encoding"] == "gzip":
content = gzip.decompress(content)
open_content = f.read()
assert content == open_content
# read via various readers
print(item)
if item["mime_type"] == "text/csv":
# parse csv
with file_dict.open(mode="rt") as f:
from csv import DictReader

elements = list(DictReader(f))
# fieldnames below are not really correct but allow to load first 3 columns
# even if first row does not have header names
elements = list(DictReader(f, fieldnames=["A", "B", "C"]))
assert len(elements) > 0
if item["mime_type"] == "application/parquet":
# verify it is a real parquet
Expand All @@ -86,6 +95,8 @@ def assert_sample_files(
lines = f_txt.readlines()
assert len(lines) >= 1
assert isinstance(lines[0], str)
if item["file_name"].endswith(".gz"):
assert item["encoding"] == "gzip"


def start_loading_file(
Expand Down

0 comments on commit 0fa9cbe

Please sign in to comment.