Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filesystem): implement a csv reader with duckdb engine #319

Merged
merged 23 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5,883 changes: 3,047 additions & 2,836 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ readme = "README.md"
packages = [{include = "sources"}]

[tool.poetry.dependencies]
python = "^3.8.1"
dlt = {version = "^0.3.23", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]}
python = ">=3.8.1,<3.13"
dlt = {version = "0.4.3a0", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]}

[tool.poetry.group.dev.dependencies]
mypy = "1.6.1"
Expand Down
18 changes: 9 additions & 9 deletions sources/filesystem/README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
# Readers Source & Filesystem

This verified source easily streams files from AWS S3, GCS, Azure, or local filesystem using the
reader source.
This verified source easily streams files from AWS S3, GCS, Azure, or local filesystem using the reader source.

Sources and resources that can be used with this verified source are:


| Name | Type | Description |
|--------------|----------------------|---------------------------------------------------------------------------|
| readers | Source | Lists and reads files with resource `filesystem` and readers transformers |
| filesystem | Resource | Lists files in `bucket_url` using `file_glob` pattern |
| read_csv | Resource-transformer | Reads CSV file with "Pandas" chunk by chunk |
| read_jsonl | Resource-transformer | Reads JSONL file content and extracts the data |
| read_parquet | Resource-transformer | Reads Parquet file content and extracts the data with "Pyarrow" |
| Name | Type | Description |
|-----------------|----------------------|---------------------------------------------------------------------------|
| readers | Source | Lists and reads files with resource `filesystem` and readers transformers |
| filesystem | Resource | Lists files in `bucket_url` using `file_glob` pattern |
| read_csv | Resource-transformer | Reads CSV file with "Pandas" chunk by chunk |
| read_csv_duckdb | Resource-transformer | Reads CSV file with DuckDB engine chunk by chunk |
| read_jsonl | Resource-transformer | Reads JSONL file content and extracts the data |
| read_parquet | Resource-transformer | Reads Parquet file content and extracts the data with "Pyarrow" |


## Initialize the source
Expand Down
12 changes: 10 additions & 2 deletions sources/filesystem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@
from .helpers import (
AbstractFileSystem,
FilesystemConfigurationResource,
fsspec_from_resource,
)
from .readers import ReadersSource, _read_csv, _read_jsonl, _read_parquet
from .readers import (
ReadersSource,
_read_csv,
_read_csv_duckdb,
_read_jsonl,
_read_parquet,
)
from .settings import DEFAULT_CHUNK_SIZE


Expand All @@ -39,6 +44,8 @@ def readers(
| dlt.transformer(name="read_jsonl")(_read_jsonl),
filesystem(bucket_url, credentials, file_glob=file_glob)
| dlt.transformer(name="read_parquet")(_read_parquet),
filesystem(bucket_url, credentials, file_glob=file_glob)
| dlt.transformer(name="read_csv_duckdb")(_read_csv_duckdb),
)


Expand Down Expand Up @@ -89,3 +96,4 @@ def filesystem(
read_csv = dlt.transformer(standalone=True)(_read_csv)
read_jsonl = dlt.transformer(standalone=True)(_read_jsonl)
read_parquet = dlt.transformer(standalone=True)(_read_parquet)
read_csv_duckdb = dlt.transformer(standalone=True)(_read_csv_duckdb)
54 changes: 52 additions & 2 deletions sources/filesystem/helpers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Helpers for the filesystem resource."""
from typing import Optional, Type, Union, TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Type, Union
from fsspec import AbstractFileSystem # type: ignore

from dlt.common.configuration import resolve_type
from dlt.common.typing import TDataItem

from dlt.sources import DltResource, DltSource
from dlt.sources import DltResource
from dlt.sources.filesystem import fsspec_filesystem
from dlt.sources.config import configspec, with_config
from dlt.sources.credentials import (
Expand Down Expand Up @@ -46,3 +47,52 @@ def _get_fsspec(
filesystem_instance.explicit_args.get("bucket_url", None),
filesystem_instance.explicit_args.get("credentials", None),
)


def add_columns(columns: List[str], rows: List[List[Any]]) -> List[Dict[str, Any]]:
"""Adds column names to the given rows.

Args:
columns (List[str]): The column names.
rows (List[List[Any]]): The rows.

Returns:
List[Dict[str, Any]]: The rows with column names.
"""
result = []
for row in rows:
result.append(dict(zip(columns, row)))

return result


def fetch_arrow(file_data, chunk_size: int) -> Iterable[TDataItem]: # type: ignore
"""Fetches data from the given CSV file.

Args:
file_data (DuckDBPyRelation): The CSV file data.
chunk_size (int): The number of rows to read at once.

Yields:
Iterable[TDataItem]: Data items, read from the given CSV file.
"""
batcher = file_data.fetch_arrow_reader(batch_size=chunk_size)
yield from batcher


def fetch_json(file_data, chunk_size: int) -> List[Dict[str, Any]]: # type: ignore
"""Fetches data from the given CSV file.

Args:
file_data (DuckDBPyRelation): The CSV file data.
chunk_size (int): The number of rows to read at once.

Yields:
Iterable[TDataItem]: Data items, read from the given CSV file.
"""
while True:
batch = file_data.fetchmany(chunk_size)
if not batch:
break

yield add_columns(file_data.columns, batch)
42 changes: 41 additions & 1 deletion sources/filesystem/readers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from typing import TYPE_CHECKING, Any, Iterator
from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional

from dlt.common import json
from dlt.common.typing import copy_sig
from dlt.sources import TDataItems, DltResource, DltSource
from dlt.sources.filesystem import FileItemDict

from .helpers import fetch_arrow, fetch_json


def _read_csv(
items: Iterator[FileItemDict], chunksize: int = 10000, **pandas_kwargs: Any
Expand Down Expand Up @@ -74,6 +76,40 @@ def _read_parquet(
yield rows.to_pylist()


def _read_csv_duckdb(
items: Iterator[FileItemDict],
chunk_size: Optional[int] = 5000,
use_pyarrow: bool = False,
**duckdb_kwargs: Any
) -> Iterator[TDataItems]:
"""A resource to extract data from the given CSV files.

Uses DuckDB engine to import and cast CSV data.

Args:
items (Iterator[FileItemDict]): CSV files to read.
chunk_size (Optional[int]):
The number of rows to read at once. Defaults to 5000.
use_pyarrow (bool):
Whether to use `pyarrow` to read the data and designate
data schema. If set to False (by default), JSON is used.
duckdb_kwargs (Dict):
Additional keyword arguments to pass to the `read_csv()`.

Returns:
Iterable[TDataItem]: Data items, read from the given CSV files.
"""
import duckdb

helper = fetch_arrow if use_pyarrow else fetch_json

for item in items:
with item.open() as f:
file_data = duckdb.from_csv_auto(f, **duckdb_kwargs) # type: ignore

yield from helper(file_data, chunk_size)


if TYPE_CHECKING:

class ReadersSource(DltSource):
Expand All @@ -91,5 +127,9 @@ def read_jsonl(self) -> DltResource:
def read_parquet(self) -> DltResource:
...

@copy_sig(_read_csv_duckdb)
def read_csv_duckdb(self) -> DltResource:
...

else:
ReadersSource = DltSource
45 changes: 40 additions & 5 deletions sources/filesystem_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import json
import os
import posixpath
from typing import Iterator

import dlt
from dlt.sources import TDataItem, TDataItems
from dlt.sources import TDataItems

try:
from .filesystem import FileItemDict, filesystem, readers, read_csv, read_jsonl, read_parquet # type: ignore
from .filesystem import FileItemDict, filesystem, readers, read_csv # type: ignore
except ImportError:
from filesystem import (
FileItemDict,
filesystem,
readers,
read_csv,
read_jsonl,
read_parquet,
)


Expand Down Expand Up @@ -54,6 +51,42 @@ def stream_and_merge_csv() -> None:
print(pipeline.last_trace.last_normalize_info)


def read_csv_with_duckdb() -> None:
pipeline = dlt.pipeline(
pipeline_name="standard_filesystem",
destination="duckdb",
dataset_name="met_data_duckdb",
)

# load all the CSV data, excluding headers
met_files = readers(
bucket_url=TESTS_BUCKET_URL, file_glob="met_csv/A801/*.csv"
).read_csv_duckdb(chunk_size=1000, header=True)

load_info = pipeline.run(met_files)

print(load_info)
print(pipeline.last_trace.last_normalize_info)


def read_csv_duckdb_compressed() -> None:
pipeline = dlt.pipeline(
pipeline_name="standard_filesystem",
destination="duckdb",
dataset_name="taxi_data",
full_refresh=True,
)

met_files = readers(
bucket_url=TESTS_BUCKET_URL,
file_glob="gzip/*",
).read_csv_duckdb()

load_info = pipeline.run(met_files)
print(load_info)
print(pipeline.last_trace.last_normalize_info)


def read_parquet_and_jsonl_chunked() -> None:
pipeline = dlt.pipeline(
pipeline_name="standard_filesystem",
Expand Down Expand Up @@ -177,3 +210,5 @@ def read_files_incrementally_mtime() -> None:
read_parquet_and_jsonl_chunked()
read_custom_file_type_excel()
read_files_incrementally_mtime()
read_csv_with_duckdb()
read_csv_duckdb_compressed()
2 changes: 1 addition & 1 deletion tests/filesystem/samples/csv/freshman_lbs.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Sex,Weight(lbs,Sep),Weight(lbs,Apr),BMI(Sep),BMI(Apr)
"Sex","Weight(lbs,Sep)","Weight(lbs,Apr)","BMI(Sep)","BMI(Apr)"
M,159,130,22.02,18.14
M,214,190,19.70,17.44
M,163,152,24.09,22.43
Expand Down
Binary file added tests/filesystem/samples/gzip/taxi.csv.gz
Binary file not shown.
1 change: 1 addition & 0 deletions tests/filesystem/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"csv/freshman_lbs.csv",
"csv/mlb_players.csv",
"csv/mlb_teams_2012.csv",
"gzip/taxi.csv.gz",
"jsonl/mlb_players.jsonl",
"parquet/mlb_players.parquet",
],
Expand Down
21 changes: 16 additions & 5 deletions tests/filesystem/test_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from sources.filesystem import (
filesystem,
readers,
fsspec_from_resource,
FileItem,
FileItemDict,
)
from sources.filesystem.helpers import fsspec_from_resource
from tests.utils import (
assert_load_info,
load_table_counts,
Expand Down Expand Up @@ -144,9 +144,11 @@ def test_standard_readers(bucket_url: str) -> None:
# extract pipes with standard readers
jsonl_reader = readers(bucket_url, file_glob="**/*.jsonl").read_jsonl()
parquet_reader = readers(bucket_url, file_glob="**/*.parquet").read_parquet()
csv_reader = readers(bucket_url, file_glob="**/*.csv").read_csv(
# also read zipped csvs
csv_reader = readers(bucket_url, file_glob="**/*.csv*").read_csv(
float_precision="high"
)
csv_duckdb_reader = readers(bucket_url, file_glob="**/*.csv*").read_csv_duckdb()

# a step that copies files into test storage
def _copy(item: FileItemDict):
Expand Down Expand Up @@ -174,16 +176,25 @@ def _copy(item: FileItemDict):
parquet_reader.with_name("parquet_example"),
downloader.with_name("listing"),
csv_reader.with_name("csv_example"),
csv_duckdb_reader.with_name("csv_duckdb_example"),
]
)
# pandas incorrectly guesses that taxi dataset has headers so it skips one row
# so we have 1 less row in csv_example than in csv_duckdb_example
assert_load_info(load_info)
assert load_table_counts(
pipeline, "jsonl_example", "parquet_example", "listing", "csv_example"
pipeline,
"jsonl_example",
"parquet_example",
"listing",
"csv_example",
"csv_duckdb_example",
) == {
"jsonl_example": 1034,
"parquet_example": 1034,
"listing": 10,
"csv_example": 1270,
"listing": 11,
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
"csv_example": 1279,
"csv_duckdb_example": 1280,
}
# print(pipeline.last_trace.last_normalize_info)
# print(pipeline.default_schema.to_pretty_yaml())
Expand Down
Loading