Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filesystem): add compression flag if the read file is GZ #912

Merged
merged 13 commits into from
Jan 30, 2024
77 changes: 64 additions & 13 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
import io
import gzip
import mimetypes
import pathlib
import posixpath
from io import BytesIO
from typing import cast, Tuple, TypedDict, Optional, Union, Iterator, Any, IO, Dict, Callable
from typing import (
Literal,
cast,
Tuple,
TypedDict,
Optional,
Union,
Iterator,
Any,
IO,
Dict,
Callable,
Sequence,
)
from urllib.parse import urlparse

from fsspec import AbstractFileSystem
Expand All @@ -28,6 +42,7 @@ class FileItem(TypedDict, total=False):
file_url: str
file_name: str
mime_type: str
encoding: Optional[str]
modification_date: pendulum.DateTime
size_in_bytes: int
file_content: Optional[bytes]
Expand Down Expand Up @@ -157,23 +172,48 @@ def fsspec(self) -> AbstractFileSystem:
else:
return fsspec_filesystem(self["file_url"], self.credentials)[0]

def open(self, mode: str = "rb", **kwargs: Any) -> IO[Any]: # noqa: A003
def open( # noqa: A003
self,
mode: str = "rb",
compression: Literal["auto", "disable", "enable"] = "auto",
**kwargs: Any,
) -> IO[Any]:
"""Open the file as a fsspec file.

This method opens the file represented by this dictionary as a file-like object using
the fsspec library.

Args:
mode (Optional[str]): Open mode.
compression (Optional[str]): A flag to enable/disable compression.
Can have one of three values: "disable" - no compression applied,
"enable" - gzip compression applied, "auto" (default) -
compression applied only for files compressed with gzip.
**kwargs (Any): The arguments to pass to the fsspec open function.

Returns:
IOBase: The fsspec file.
"""
if compression == "auto":
compression_arg = "gzip" if self["encoding"] == "gzip" else None
elif compression == "enable":
compression_arg = "gzip"
elif compression == "disable":
compression_arg = None
else:
raise ValueError("""The argument `compression` must have one of the following values:
"auto", "enable", "disable".""")
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

opened_file: IO[Any]
# if the user has already extracted the content, we use it so there will be no need to
# if the user has already extracted the content, we use it so there is no need to
# download the file again.
if "file_content" in self:
bytes_io = BytesIO(self["file_content"])
content = (
gzip.decompress(self["file_content"])
if compression_arg == "gzip"
else self["file_content"]
)
bytes_io = BytesIO(content)

if "t" not in mode:
return bytes_io
Expand All @@ -185,7 +225,9 @@ def open(self, mode: str = "rb", **kwargs: Any) -> IO[Any]: # noqa: A003
**text_kwargs,
)
else:
opened_file = self.fsspec.open(self["file_url"], mode=mode, **kwargs)
opened_file = self.fsspec.open(
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
self["file_url"], mode=mode, compression=compression_arg, **kwargs
)
return opened_file

def read_bytes(self) -> bytes:
Expand All @@ -194,18 +236,20 @@ def read_bytes(self) -> bytes:
Returns:
bytes: The file content.
"""
content: bytes
return ( # type: ignore
self["file_content"]
if "file_content" in self and self["file_content"] is not None
else self.fsspec.read_bytes(self["file_url"])
)


def guess_mime_type(file_name: str) -> str:
return mimetypes.guess_type(posixpath.basename(file_name), strict=False)[
0
] or "application/" + (posixpath.splitext(file_name)[1][1:] or "octet-stream")
def guess_mime_type(file_name: str) -> Sequence[str]:
type_ = list(mimetypes.guess_type(posixpath.basename(file_name), strict=False))

if not type_[0]:
type_[0] = "application/" + (posixpath.splitext(file_name)[1][1:] or "octet-stream")

return type_


def glob_files(
Expand Down Expand Up @@ -237,22 +281,29 @@ def glob_files(
glob_result = fs_client.glob(filter_url, detail=True)
if isinstance(glob_result, list):
raise NotImplementedError(
"Cannot request details when using fsspec.glob. For ADSL (Azure) please use version"
"Cannot request details when using fsspec.glob. For adlfs (Azure) please use version"
" 2023.9.0 or later"
)

for file, md in glob_result.items():
if md["type"] != "file":
continue
# filter out weird files that look like folders
size = int(md["size"])
if file.endswith("/") and size == 0:
continue
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
# make that absolute path on a file://
if bucket_url_parsed.scheme == "file" and not file.startswith("/"):
file = f"/{file}"
file_name = posixpath.relpath(file, bucket_path)
file_url = f"{bucket_url_parsed.scheme}://{file}"

mime_type, encoding = guess_mime_type(file_name)
yield FileItem(
file_name=file_name,
file_url=file_url,
mime_type=guess_mime_type(file_name),
mime_type=mime_type,
encoding=encoding,
modification_date=MTIME_DISPATCH[bucket_url_parsed.scheme](md),
size_in_bytes=int(md["size"]),
size_in_bytes=size,
)
Binary file added tests/common/storages/samples/gzip/taxi.csv.gz
Binary file not shown.
24 changes: 23 additions & 1 deletion tests/common/storages/test_local_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pathlib

from dlt.common.storages import fsspec_from_config, FilesystemConfiguration
from dlt.common.storages.fsspec_filesystem import glob_files
from dlt.common.storages.fsspec_filesystem import FileItemDict, glob_files

from tests.common.storages.utils import assert_sample_files

Expand Down Expand Up @@ -32,3 +32,25 @@ def test_filesystem_dict_local(bucket_url: str, load_content: bool) -> None:
assert_sample_files(all_file_items, filesystem, config, load_content)
except NotImplementedError as ex:
pytest.skip("Skipping due to " + str(ex))


def test_filesystem_decompress() -> None:
config = FilesystemConfiguration(bucket_url=TEST_SAMPLE_FILES)
filesystem, _ = fsspec_from_config(config)
gzip_files = list(glob_files(filesystem, TEST_SAMPLE_FILES, "**/*.gz"))
assert len(gzip_files) > 0
for file in gzip_files:
assert file["encoding"] == "gzip"
assert file["file_name"].endswith(".gz")
file_dict = FileItemDict(file, filesystem)
# read as is (compressed gzip)
with file_dict.open(compression="disable") as f:
assert f.read() == file_dict.read_bytes()
# read as uncompressed text
with file_dict.open(mode="tr") as f:
lines = f.readlines()
assert len(lines) > 1
assert lines[0].startswith('"1200864931","2015-07-01 00:00:13"')
# read as uncompressed binary
with file_dict.open(compression="enable") as f:
assert f.read().startswith(b'"1200864931","2015-07-01 00:00:13"')
13 changes: 11 additions & 2 deletions tests/common/storages/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import gzip
from typing import List, Sequence, Tuple
from fsspec import AbstractFileSystem

Expand Down Expand Up @@ -47,14 +48,20 @@ def assert_sample_files(
dict_content = file_dict.read_bytes()
assert content == dict_content
with file_dict.open() as f:
assert content == f.read()
# content will be decompressed for gzip encoding
if item["encoding"] == "gzip":
content = gzip.decompress(content)
open_content = f.read()
assert content == open_content
# read via various readers
if item["mime_type"] == "text/csv":
# parse csv
with file_dict.open(mode="rt") as f:
from csv import DictReader

elements = list(DictReader(f))
# fieldnames below are not really correct but allow to load first 3 columns
# even if first row does not have header names
elements = list(DictReader(f, fieldnames=["A", "B", "C"]))
assert len(elements) > 0
if item["mime_type"] == "application/parquet":
# verify it is a real parquet
Expand All @@ -66,6 +73,8 @@ def assert_sample_files(
lines = f_txt.readlines()
assert len(lines) >= 1
assert isinstance(lines[0], str)
if item["file_name"].endswith(".gz"):
assert item["encoding"] == "gzip"

assert len(all_file_items) >= 10
assert set([item["file_name"] for item in all_file_items]) >= {
Expand Down
Loading