Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filesystem): implement a csv reader with duckdb engine #319

Merged
merged 23 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions sources/csv_reader/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
Source, which uses the `filesystem` source and DuckDB
to extract CSV files data from the given locations.
"""
from typing import Iterable, List

import duckdb
import fsspec
import pendulum

import dlt
from dlt.common.typing import TDataItem, TAnyDateTime
from dlt.common.storages.fsspec_filesystem import prepare_fsspec_args
from dlt.common.storages.configuration import FilesystemConfiguration
from sources.filesystem.helpers import FilesystemConfigurationResource

try:
from filesystem import filesystem
except ImportError:
from sources.filesystem import filesystem

from .helpers import add_columns


@dlt.resource(spec=FilesystemConfigurationResource)
def read_location(files, bucket, credentials=dlt.secrets.value):
"""A resource to extract data from the given CSV files.

Args:
files (List[FileItem]): A list of files to read.

Returns:
Iterable[TDataItem]: Data items, read from the given CSV files.
"""
config = FilesystemConfiguration(bucket, credentials)
kwargs = prepare_fsspec_args(config)

state = dlt.current.resource_state()
start_from = state.setdefault("last_modified", pendulum.datetime(1970, 1, 1))
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

results = []
connection = duckdb.connect()

for file in files:
if file["modification_date"] <= start_from:
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
continue
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

with fsspec.open(file["file_url"], mode="rb", **kwargs) as f:
file_res = connection.read_csv(f)
results += add_columns(file_res.columns, file_res.fetchall())

state["last_modified"] = max(file["modification_date"], state["last_modified"])
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved

yield results


@dlt.source
def csv_reader(bucket: str, globs: List[str] = ("*",)) -> Iterable[TDataItem]:
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
"""
A source to extract data from CSV files from
one or several locations.

Args:
bucket (str): A bucket URL.
globs (Optional[List[str]]):
A list of glob patterns to match files.
Every glob will be extracted into a separate table.

Returns:
Iterable[TDataItem]:
Data items, read from the matched CSV files.
"""
for glob in globs:
files = filesystem(bucket_url=bucket, file_glob=glob)
yield dlt.resource(read_location(files, bucket))
16 changes: 16 additions & 0 deletions sources/csv_reader/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
def add_columns(columns, rows):
"""Add column names to the given rows.

Args:
columns (List[str]): A list of column names.
rows (List[Tuple[Any]]): A list of rows.

Returns:
List[Dict[str, Any]]:
A list of rows with column names.
"""
result = []
for row in rows:
result.append(dict(zip(columns, row)))

return result
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
35 changes: 35 additions & 0 deletions sources/csv_reader_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import dlt

from csv_reader import csv_reader


def csv_with_duck_db():
pipeline = dlt.pipeline(
pipeline_name="csv_to_duckdb",
destination="postgres",
dataset_name="files",
full_refresh=True,
)

reader = csv_reader("protocol:///bucket_url", ("*file*.csv",))
load_info = pipeline.run(reader)
print(load_info)


def csv_with_duck_db_hints():
pipeline = dlt.pipeline(
pipeline_name="csv_to_duckdb",
destination="postgres",
dataset_name="files",
full_refresh=True,
)

reader = csv_reader("protocol:///bucket_url", ("*file*.csv",))
reader.resources["read_location"].apply_hints(primary_key="col1")
load_info = pipeline.run(reader)
print(load_info)


if __name__ == "__main__":
# csv_with_duck_db()
csv_with_duck_db_hints()
91 changes: 91 additions & 0 deletions tests/csv_reader/test_csv_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import pytest
from unittest import mock

import pendulum

import dlt
from sources.csv_reader import csv_reader
from tests.utils import assert_load_info, assert_query_data, load_table_counts


TESTS_BUCKET_URLS = [
(
"file://tests/filesystem/samples",
("test*.csv",),
pendulum.datetime(2024, 1, 19, 8, 56, 56),
),
(
"s3://dlt-ci-test-bucket/standard_source/samples",
("test*.csv",),
pendulum.datetime(2024, 1, 19, 10, 49, 20),
),
# ("gs://ci-test-bucket/standard_source/samples", ("*",)),
# ("az://dlt-ci-test-bucket/standard_source/samples", ("*",)),
]


@pytest.mark.parametrize("globs", TESTS_BUCKET_URLS)
def test_extract_data(globs):
bucket_url = globs[0]
globs = globs[1]

pipeline = dlt.pipeline(
pipeline_name="csv_to_duckdb",
destination="postgres",
dataset_name="files",
)

res = csv_reader(bucket_url, globs)
load_info = pipeline.run(res)

assert_load_info(load_info)

table_names = [t["name"] for t in pipeline.default_schema.data_tables()]
table_counts = load_table_counts(pipeline, *table_names)

assert table_counts["read_location"] == 3

assert_query_data(
pipeline, f"SELECT col1 FROM read_location ORDER BY col1", (1, 2, 3)
)

assert_query_data(
pipeline, f"SELECT col2 FROM read_location ORDER BY col1", ("yes", "yes", "no")
)

assert_query_data(
pipeline, f"SELECT col3 FROM read_location ORDER BY col1", (3, 66, 8)
)


@pytest.mark.parametrize("globs", TESTS_BUCKET_URLS)
def test_extract_incremental(globs):
bucket_url = globs[0]
date = globs[2]
globs = globs[1]

pipeline = dlt.pipeline(
pipeline_name="csv_to_duckdb",
destination="postgres",
dataset_name="files",
)

res = csv_reader(bucket_url, globs)

with mock.patch("dlt.current.resource_state", return_value={"last_modified": date}):
load_info = pipeline.run(res)

assert_load_info(load_info)

table_names = [t["name"] for t in pipeline.default_schema.data_tables()]
table_counts = load_table_counts(pipeline, *table_names)

assert table_counts["read_location"] == 2

assert_query_data(
pipeline, f"SELECT col2 FROM read_location ORDER BY col1", ("yes", "no")
)

assert_query_data(
pipeline, f"SELECT col3 FROM read_location ORDER BY col1", (66, 8)
)
2 changes: 2 additions & 0 deletions tests/filesystem/samples/test1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
col1,col2,col3
1,"yes",3
3 changes: 3 additions & 0 deletions tests/filesystem/samples/test2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
col1,col2,col3
2,"yes",66
3,"no",8
Loading