From 1b6c93f203e21020cef21394854e4ad5ebb5a4eb Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 12 Jan 2024 11:48:14 +0400 Subject: [PATCH 01/22] WIP: a prototype of a csv reader with duckdb engine --- sources/csv_reader/__init__.py | 64 ++++++++++++++++++++++++++++++++++ sources/csv_reader/helpers.py | 16 +++++++++ sources/csv_reader_pipeline.py | 22 ++++++++++++ 3 files changed, 102 insertions(+) create mode 100644 sources/csv_reader/__init__.py create mode 100644 sources/csv_reader/helpers.py create mode 100644 sources/csv_reader_pipeline.py diff --git a/sources/csv_reader/__init__.py b/sources/csv_reader/__init__.py new file mode 100644 index 000000000..c6428ac61 --- /dev/null +++ b/sources/csv_reader/__init__.py @@ -0,0 +1,64 @@ +""" +Source, which uses the `filesystem` source and DuckDB +to extract CSV files data from the given locations. +""" +from typing import Iterable, List + +import duckdb +import pendulum + +import dlt +from dlt.common.typing import TDataItem, TAnyDateTime +from filesystem import filesystem +from fsspec.implementations.local import LocalFileSystem + +from .helpers import add_columns + + +@dlt.resource +def read_location(files): + """A resource to extract data from the given CSV files. + + Args: + files (List[FileItem]): A list of files to read. + + Returns: + Iterable[TDataItem]: Data items, read from the given CSV files. + """ + state = dlt.current.resource_state() + start_from = state.setdefault("last_modified", pendulum.datetime(1970, 1, 1)) + + results = [] + for file in files: + if file["modification_date"] <= start_from: + continue + + file_res = duckdb.sql(f"""SELECT * FROM read_csv_auto('{file["file_url"]}')""") + results += add_columns(file_res.columns, file_res.fetchall()) + + state["last_modified"] = max(file["modification_date"], state["last_modified"]) + + yield results + + +@dlt.source +def csv_reader(bucket: str, globs: List[str] = ("*",)) -> Iterable[TDataItem]: + """ + A source to extract data from CSV files from + one or several locations. + + Args: + bucket (str): A bucket URL. + globs (Optional[List[str]]): + A list of glob patterns to match files. + Every glob will be extracted into a separate table. + + Returns: + Iterable[TDataItem]: + Data items, read from the matched CSV files. + """ + duckdb.register_filesystem(LocalFileSystem()) + + for glob in globs: + files = filesystem(bucket_url=bucket, file_glob=glob) + yield dlt.resource(read_location(files)) diff --git a/sources/csv_reader/helpers.py b/sources/csv_reader/helpers.py new file mode 100644 index 000000000..1fed6c27f --- /dev/null +++ b/sources/csv_reader/helpers.py @@ -0,0 +1,16 @@ +def add_columns(columns, rows): + """Add column names to the given rows. + + Args: + columns (List[str]): A list of column names. + rows (List[Tuple[Any]]): A list of rows. + + Returns: + List[Dict[str, Any]]: + A list of rows with column names. + """ + result = [] + for row in rows: + result.append(dict(zip(columns, row))) + + return result diff --git a/sources/csv_reader_pipeline.py b/sources/csv_reader_pipeline.py new file mode 100644 index 000000000..b66360814 --- /dev/null +++ b/sources/csv_reader_pipeline.py @@ -0,0 +1,22 @@ +import dlt + +try: + from .csv_reader import csv_reader # type: ignore +except ImportError: + from csv_reader import csv_reader + + +def csv_with_duck_db(): + pipeline = dlt.pipeline( + pipeline_name="csv_to_duckdb", + destination="postgres", + dataset_name="files", + ) + + res = csv_reader("/home/ilya/test_files/", ("*ddb*.csv",)) + load_info = pipeline.run(res) + print(load_info) + + +if __name__ == "__main__": + csv_with_duck_db() From bc85c23834a403005c7017aa0ede140d868053a9 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 17 Jan 2024 16:50:35 +0400 Subject: [PATCH 02/22] add simple tests and samples --- sources/csv_reader/__init__.py | 7 ++- sources/csv_reader_pipeline.py | 27 +++++++--- tests/csv_reader/test_csv_reader.py | 84 +++++++++++++++++++++++++++++ 3 files changed, 110 insertions(+), 8 deletions(-) create mode 100644 tests/csv_reader/test_csv_reader.py diff --git a/sources/csv_reader/__init__.py b/sources/csv_reader/__init__.py index c6428ac61..394ea9ff6 100644 --- a/sources/csv_reader/__init__.py +++ b/sources/csv_reader/__init__.py @@ -9,7 +9,12 @@ import dlt from dlt.common.typing import TDataItem, TAnyDateTime -from filesystem import filesystem + +try: + from filesystem import filesystem +except ImportError: + from sources.filesystem import filesystem + from fsspec.implementations.local import LocalFileSystem from .helpers import add_columns diff --git a/sources/csv_reader_pipeline.py b/sources/csv_reader_pipeline.py index b66360814..f2f8430b7 100644 --- a/sources/csv_reader_pipeline.py +++ b/sources/csv_reader_pipeline.py @@ -1,9 +1,6 @@ import dlt -try: - from .csv_reader import csv_reader # type: ignore -except ImportError: - from csv_reader import csv_reader +from csv_reader import csv_reader def csv_with_duck_db(): @@ -11,12 +8,28 @@ def csv_with_duck_db(): pipeline_name="csv_to_duckdb", destination="postgres", dataset_name="files", + full_refresh=True, ) - res = csv_reader("/home/ilya/test_files/", ("*ddb*.csv",)) - load_info = pipeline.run(res) + reader = csv_reader("/home/ilya/test_files/", ("*ddb*.csv",)) + load_info = pipeline.run(reader) + print(load_info) + + +def csv_with_duck_db_hints(): + pipeline = dlt.pipeline( + pipeline_name="csv_to_duckdb", + destination="postgres", + dataset_name="files", + full_refresh=True, + ) + + reader = csv_reader("/home/ilya/test_files/", ("*ddb*.csv",)) + reader.resources["read_location"].apply_hints(primary_key="col1") + load_info = pipeline.run(reader) print(load_info) if __name__ == "__main__": - csv_with_duck_db() + # csv_with_duck_db() + csv_with_duck_db_hints() diff --git a/tests/csv_reader/test_csv_reader.py b/tests/csv_reader/test_csv_reader.py new file mode 100644 index 000000000..c35ba7ec5 --- /dev/null +++ b/tests/csv_reader/test_csv_reader.py @@ -0,0 +1,84 @@ +import pytest +from unittest import mock + +import pendulum + +import dlt +from sources.csv_reader import csv_reader +from tests.utils import assert_load_info, assert_query_data, load_table_counts + + +TESTS_BUCKET_URLS = [ + ("/home/ilya/test_files/", ("*ddb*.csv",)), +] + + +@pytest.mark.parametrize("globs", TESTS_BUCKET_URLS) +def test_extract_data(globs): + bucket_url = globs[0] + globs = globs[1] + + pipeline = dlt.pipeline( + pipeline_name="csv_to_duckdb", + destination="postgres", + dataset_name="files", + ) + + res = csv_reader(bucket_url, globs) + load_info = pipeline.run(res) + + assert_load_info(load_info) + + table_names = [t["name"] for t in pipeline.default_schema.data_tables()] + table_counts = load_table_counts(pipeline, *table_names) + + assert table_counts["read_location"] == 3 + + assert_query_data( + pipeline, f"SELECT col1 FROM read_location ORDER BY col1", (1, 2, 3) + ) + + assert_query_data( + pipeline, f"SELECT col2 FROM read_location ORDER BY col1", ("yes", "yes", "no") + ) + + assert_query_data( + pipeline, f"SELECT col3 FROM read_location ORDER BY col1", (3, 66, 8) + ) + + +@pytest.mark.parametrize("globs", TESTS_BUCKET_URLS) +def test_extract_incremental(globs): + bucket_url = globs[0] + globs = globs[1] + + pipeline = dlt.pipeline( + pipeline_name="csv_to_duckdb", + destination="postgres", + dataset_name="files", + ) + + res = csv_reader(bucket_url, globs) + + with mock.patch( + "dlt.current.resource_state", + return_value={ + "last_modified": pendulum.datetime(2024, 1, 17, 9, 24, 47), + }, + ): + load_info = pipeline.run(res) + + assert_load_info(load_info) + + table_names = [t["name"] for t in pipeline.default_schema.data_tables()] + table_counts = load_table_counts(pipeline, *table_names) + + assert table_counts["read_location"] == 2 + + assert_query_data( + pipeline, f"SELECT col2 FROM read_location ORDER BY col1", ("yes", "no") + ) + + assert_query_data( + pipeline, f"SELECT col3 FROM read_location ORDER BY col1", (66, 8) + ) From 85ecfa36b1a039912424fc346c15bb6fcf0d3a83 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 18 Jan 2024 11:18:33 +0400 Subject: [PATCH 03/22] use filesystem detection for DuckDB registration --- sources/csv_reader/__init__.py | 3 ++- sources/csv_reader_pipeline.py | 4 ++-- tests/csv_reader/test_csv_reader.py | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sources/csv_reader/__init__.py b/sources/csv_reader/__init__.py index 394ea9ff6..92e427fc6 100644 --- a/sources/csv_reader/__init__.py +++ b/sources/csv_reader/__init__.py @@ -5,6 +5,7 @@ from typing import Iterable, List import duckdb +import fsspec import pendulum import dlt @@ -62,7 +63,7 @@ def csv_reader(bucket: str, globs: List[str] = ("*",)) -> Iterable[TDataItem]: Iterable[TDataItem]: Data items, read from the matched CSV files. """ - duckdb.register_filesystem(LocalFileSystem()) + duckdb.register_filesystem(fsspec.filesystem(bucket.split(":")[0])) for glob in globs: files = filesystem(bucket_url=bucket, file_glob=glob) diff --git a/sources/csv_reader_pipeline.py b/sources/csv_reader_pipeline.py index f2f8430b7..1adcc7f2b 100644 --- a/sources/csv_reader_pipeline.py +++ b/sources/csv_reader_pipeline.py @@ -11,7 +11,7 @@ def csv_with_duck_db(): full_refresh=True, ) - reader = csv_reader("/home/ilya/test_files/", ("*ddb*.csv",)) + reader = csv_reader("protocol:///bucket_url", ("*file*.csv",)) load_info = pipeline.run(reader) print(load_info) @@ -24,7 +24,7 @@ def csv_with_duck_db_hints(): full_refresh=True, ) - reader = csv_reader("/home/ilya/test_files/", ("*ddb*.csv",)) + reader = csv_reader("protocol:///bucket_url", ("*file*.csv",)) reader.resources["read_location"].apply_hints(primary_key="col1") load_info = pipeline.run(reader) print(load_info) diff --git a/tests/csv_reader/test_csv_reader.py b/tests/csv_reader/test_csv_reader.py index c35ba7ec5..d09f9e4ae 100644 --- a/tests/csv_reader/test_csv_reader.py +++ b/tests/csv_reader/test_csv_reader.py @@ -9,7 +9,7 @@ TESTS_BUCKET_URLS = [ - ("/home/ilya/test_files/", ("*ddb*.csv",)), + ("file:///home/ilya/test_files/", ("*csv_reader_test*.csv",)), ] From be06d326ad84adb3ea3fc9d80a1f28d424ee1338 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 19 Jan 2024 15:56:42 +0400 Subject: [PATCH 04/22] use file-like objects for extracting the data --- sources/csv_reader/__init__.py | 9 +++++---- tests/csv_reader/test_csv_reader.py | 7 +++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sources/csv_reader/__init__.py b/sources/csv_reader/__init__.py index 92e427fc6..6f230785e 100644 --- a/sources/csv_reader/__init__.py +++ b/sources/csv_reader/__init__.py @@ -35,12 +35,15 @@ def read_location(files): start_from = state.setdefault("last_modified", pendulum.datetime(1970, 1, 1)) results = [] + connection = duckdb.connect() + for file in files: if file["modification_date"] <= start_from: continue - file_res = duckdb.sql(f"""SELECT * FROM read_csv_auto('{file["file_url"]}')""") - results += add_columns(file_res.columns, file_res.fetchall()) + with fsspec.open(file["file_url"], mode="rb") as f: + file_res = connection.read_csv(f) + results += add_columns(file_res.columns, file_res.fetchall()) state["last_modified"] = max(file["modification_date"], state["last_modified"]) @@ -63,8 +66,6 @@ def csv_reader(bucket: str, globs: List[str] = ("*",)) -> Iterable[TDataItem]: Iterable[TDataItem]: Data items, read from the matched CSV files. """ - duckdb.register_filesystem(fsspec.filesystem(bucket.split(":")[0])) - for glob in globs: files = filesystem(bucket_url=bucket, file_glob=glob) yield dlt.resource(read_location(files)) diff --git a/tests/csv_reader/test_csv_reader.py b/tests/csv_reader/test_csv_reader.py index d09f9e4ae..7ab37dfc0 100644 --- a/tests/csv_reader/test_csv_reader.py +++ b/tests/csv_reader/test_csv_reader.py @@ -9,7 +9,10 @@ TESTS_BUCKET_URLS = [ - ("file:///home/ilya/test_files/", ("*csv_reader_test*.csv",)), + # ("file:///home/ilya/test_files/", ("test*.csv",)), + ("s3://dlt-ci-test-bucket/standard_source/samples", ("*",)), + # ("gs://ci-test-bucket/standard_source/samples", ("*",)), + # ("az://dlt-ci-test-bucket/standard_source/samples", ("*",)), ] @@ -63,7 +66,7 @@ def test_extract_incremental(globs): with mock.patch( "dlt.current.resource_state", return_value={ - "last_modified": pendulum.datetime(2024, 1, 17, 9, 24, 47), + "last_modified": pendulum.datetime(2024, 1, 19, 8, 56, 58), }, ): load_info = pipeline.run(res) From 8c0b713ca37ebd37152abf5693b8cc6810ec0243 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Mon, 22 Jan 2024 11:29:45 +0400 Subject: [PATCH 05/22] implement S3 and local filesystem tests --- sources/csv_reader/__init__.py | 16 ++++++++++------ tests/csv_reader/test_csv_reader.py | 20 ++++++++++++-------- tests/filesystem/samples/test1.csv | 2 ++ tests/filesystem/samples/test2.csv | 3 +++ 4 files changed, 27 insertions(+), 14 deletions(-) create mode 100644 tests/filesystem/samples/test1.csv create mode 100644 tests/filesystem/samples/test2.csv diff --git a/sources/csv_reader/__init__.py b/sources/csv_reader/__init__.py index 6f230785e..2c411e65b 100644 --- a/sources/csv_reader/__init__.py +++ b/sources/csv_reader/__init__.py @@ -10,19 +10,20 @@ import dlt from dlt.common.typing import TDataItem, TAnyDateTime +from dlt.common.storages.fsspec_filesystem import prepare_fsspec_args +from dlt.common.storages.configuration import FilesystemConfiguration +from sources.filesystem.helpers import FilesystemConfigurationResource try: from filesystem import filesystem except ImportError: from sources.filesystem import filesystem -from fsspec.implementations.local import LocalFileSystem - from .helpers import add_columns -@dlt.resource -def read_location(files): +@dlt.resource(spec=FilesystemConfigurationResource) +def read_location(files, bucket, credentials=dlt.secrets.value): """A resource to extract data from the given CSV files. Args: @@ -31,6 +32,9 @@ def read_location(files): Returns: Iterable[TDataItem]: Data items, read from the given CSV files. """ + config = FilesystemConfiguration(bucket, credentials) + kwargs = prepare_fsspec_args(config) + state = dlt.current.resource_state() start_from = state.setdefault("last_modified", pendulum.datetime(1970, 1, 1)) @@ -41,7 +45,7 @@ def read_location(files): if file["modification_date"] <= start_from: continue - with fsspec.open(file["file_url"], mode="rb") as f: + with fsspec.open(file["file_url"], mode="rb", **kwargs) as f: file_res = connection.read_csv(f) results += add_columns(file_res.columns, file_res.fetchall()) @@ -68,4 +72,4 @@ def csv_reader(bucket: str, globs: List[str] = ("*",)) -> Iterable[TDataItem]: """ for glob in globs: files = filesystem(bucket_url=bucket, file_glob=glob) - yield dlt.resource(read_location(files)) + yield dlt.resource(read_location(files, bucket)) diff --git a/tests/csv_reader/test_csv_reader.py b/tests/csv_reader/test_csv_reader.py index 7ab37dfc0..f9e2b1593 100644 --- a/tests/csv_reader/test_csv_reader.py +++ b/tests/csv_reader/test_csv_reader.py @@ -9,8 +9,16 @@ TESTS_BUCKET_URLS = [ - # ("file:///home/ilya/test_files/", ("test*.csv",)), - ("s3://dlt-ci-test-bucket/standard_source/samples", ("*",)), + ( + "file://tests/filesystem/samples", + ("test*.csv",), + pendulum.datetime(2024, 1, 19, 8, 56, 56), + ), + ( + "s3://dlt-ci-test-bucket/standard_source/samples", + ("test*.csv",), + pendulum.datetime(2024, 1, 19, 10, 49, 20), + ), # ("gs://ci-test-bucket/standard_source/samples", ("*",)), # ("az://dlt-ci-test-bucket/standard_source/samples", ("*",)), ] @@ -53,6 +61,7 @@ def test_extract_data(globs): @pytest.mark.parametrize("globs", TESTS_BUCKET_URLS) def test_extract_incremental(globs): bucket_url = globs[0] + date = globs[2] globs = globs[1] pipeline = dlt.pipeline( @@ -63,12 +72,7 @@ def test_extract_incremental(globs): res = csv_reader(bucket_url, globs) - with mock.patch( - "dlt.current.resource_state", - return_value={ - "last_modified": pendulum.datetime(2024, 1, 19, 8, 56, 58), - }, - ): + with mock.patch("dlt.current.resource_state", return_value={"last_modified": date}): load_info = pipeline.run(res) assert_load_info(load_info) diff --git a/tests/filesystem/samples/test1.csv b/tests/filesystem/samples/test1.csv new file mode 100644 index 000000000..30f09beb4 --- /dev/null +++ b/tests/filesystem/samples/test1.csv @@ -0,0 +1,2 @@ +col1,col2,col3 +1,"yes",3 \ No newline at end of file diff --git a/tests/filesystem/samples/test2.csv b/tests/filesystem/samples/test2.csv new file mode 100644 index 000000000..0024b543e --- /dev/null +++ b/tests/filesystem/samples/test2.csv @@ -0,0 +1,3 @@ +col1,col2,col3 +2,"yes",66 +3,"no",8 \ No newline at end of file From 2e9165b005808061db46e248e76d0cdea8d29165 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 23 Jan 2024 10:06:00 +0400 Subject: [PATCH 06/22] update the code --- sources/csv_reader/__init__.py | 37 +++++++++++++++++++---------- sources/csv_reader/helpers.py | 16 ------------- sources/csv_reader/requirements.txt | 2 ++ sources/filesystem/__init__.py | 1 - tests/csv_reader/test_csv_reader.py | 16 +++++++++---- 5 files changed, 38 insertions(+), 34 deletions(-) delete mode 100644 sources/csv_reader/helpers.py create mode 100644 sources/csv_reader/requirements.txt diff --git a/sources/csv_reader/__init__.py b/sources/csv_reader/__init__.py index 2c411e65b..5cc3198b2 100644 --- a/sources/csv_reader/__init__.py +++ b/sources/csv_reader/__init__.py @@ -2,32 +2,44 @@ Source, which uses the `filesystem` source and DuckDB to extract CSV files data from the given locations. """ -from typing import Iterable, List +from typing import Iterable, List, Union import duckdb import fsspec import pendulum import dlt -from dlt.common.typing import TDataItem, TAnyDateTime +from dlt.common.typing import TDataItem from dlt.common.storages.fsspec_filesystem import prepare_fsspec_args from dlt.common.storages.configuration import FilesystemConfiguration -from sources.filesystem.helpers import FilesystemConfigurationResource try: from filesystem import filesystem + from filesystem import FileItemDict + from filesystem.helpers import ( + AbstractFileSystem, + FilesystemConfigurationResource, + FileSystemCredentials, + ) except ImportError: - from sources.filesystem import filesystem - -from .helpers import add_columns + from sources.filesystem import FileItemDict, filesystem + from sources.filesystem.helpers import ( + AbstractFileSystem, + FilesystemConfigurationResource, + FileSystemCredentials, + ) @dlt.resource(spec=FilesystemConfigurationResource) -def read_location(files, bucket, credentials=dlt.secrets.value): +def read_location( + files: List[FileItemDict], + bucket: str, + credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value, +): """A resource to extract data from the given CSV files. Args: - files (List[FileItem]): A list of files to read. + files (List[FileItemDict]): A list of files to read. Returns: Iterable[TDataItem]: Data items, read from the given CSV files. @@ -38,7 +50,6 @@ def read_location(files, bucket, credentials=dlt.secrets.value): state = dlt.current.resource_state() start_from = state.setdefault("last_modified", pendulum.datetime(1970, 1, 1)) - results = [] connection = duckdb.connect() for file in files: @@ -46,12 +57,12 @@ def read_location(files, bucket, credentials=dlt.secrets.value): continue with fsspec.open(file["file_url"], mode="rb", **kwargs) as f: - file_res = connection.read_csv(f) - results += add_columns(file_res.columns, file_res.fetchall()) + file_data = connection.read_csv(f).to_arrow_table() - state["last_modified"] = max(file["modification_date"], state["last_modified"]) + for batch in file_data.to_batches(max_chunksize=5000): + yield batch - yield results + state["last_modified"] = max(file["modification_date"], state["last_modified"]) @dlt.source diff --git a/sources/csv_reader/helpers.py b/sources/csv_reader/helpers.py deleted file mode 100644 index 1fed6c27f..000000000 --- a/sources/csv_reader/helpers.py +++ /dev/null @@ -1,16 +0,0 @@ -def add_columns(columns, rows): - """Add column names to the given rows. - - Args: - columns (List[str]): A list of column names. - rows (List[Tuple[Any]]): A list of rows. - - Returns: - List[Dict[str, Any]]: - A list of rows with column names. - """ - result = [] - for row in rows: - result.append(dict(zip(columns, row))) - - return result diff --git a/sources/csv_reader/requirements.txt b/sources/csv_reader/requirements.txt new file mode 100644 index 000000000..5edf18415 --- /dev/null +++ b/sources/csv_reader/requirements.txt @@ -0,0 +1,2 @@ +duckdb +fsspec \ No newline at end of file diff --git a/sources/filesystem/__init__.py b/sources/filesystem/__init__.py index f80b82b1f..0908f217e 100644 --- a/sources/filesystem/__init__.py +++ b/sources/filesystem/__init__.py @@ -10,7 +10,6 @@ from .helpers import ( AbstractFileSystem, FilesystemConfigurationResource, - fsspec_from_resource, ) from .readers import ReadersSource, _read_csv, _read_jsonl, _read_parquet from .settings import DEFAULT_CHUNK_SIZE diff --git a/tests/csv_reader/test_csv_reader.py b/tests/csv_reader/test_csv_reader.py index f9e2b1593..a5dacbd31 100644 --- a/tests/csv_reader/test_csv_reader.py +++ b/tests/csv_reader/test_csv_reader.py @@ -15,12 +15,20 @@ pendulum.datetime(2024, 1, 19, 8, 56, 56), ), ( - "s3://dlt-ci-test-bucket/standard_source/samples", + "s3://dlt-ci-test-bucket/standard_source/csv_reader", ("test*.csv",), - pendulum.datetime(2024, 1, 19, 10, 49, 20), + pendulum.datetime(2024, 1, 22, 12, 27, 53), + ), + # ( + # "gs://ci-test-bucket/standard_source/samples", + # ("*",), + # pendulum.datetime(2024, 1, 19, 8, 56, 56), + # ), + ( + "az://dlt-ci-test-bucket", + ("csv_reader_test*.csv",), + pendulum.datetime(2024, 1, 22, 10, 24, 30), ), - # ("gs://ci-test-bucket/standard_source/samples", ("*",)), - # ("az://dlt-ci-test-bucket/standard_source/samples", ("*",)), ] From f342455509dc5e1eb35b8c218d855471b4a1c419 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 23 Jan 2024 10:09:30 +0400 Subject: [PATCH 07/22] add dlt into requirements --- sources/csv_reader/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/sources/csv_reader/requirements.txt b/sources/csv_reader/requirements.txt index 5edf18415..b869b3929 100644 --- a/sources/csv_reader/requirements.txt +++ b/sources/csv_reader/requirements.txt @@ -1,2 +1,3 @@ +dlt>=0.3.5 duckdb fsspec \ No newline at end of file From 0ea1457d7daf1f03b111fcdd06eaff6aa86416db Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 24 Jan 2024 13:42:32 +0400 Subject: [PATCH 08/22] make csv reader a helper --- sources/csv_reader/__init__.py | 86 -------------------- sources/csv_reader/requirements.txt | 3 - sources/filesystem/__init__.py | 11 ++- sources/filesystem/readers.py | 121 +++++++++++++++++++++++++++- tests/csv_reader/test_csv_reader.py | 99 ----------------------- tests/filesystem/samples/test1.csv | 2 - tests/filesystem/samples/test2.csv | 3 - tests/filesystem/test_filesystem.py | 12 ++- 8 files changed, 141 insertions(+), 196 deletions(-) delete mode 100644 sources/csv_reader/__init__.py delete mode 100644 sources/csv_reader/requirements.txt delete mode 100644 tests/csv_reader/test_csv_reader.py delete mode 100644 tests/filesystem/samples/test1.csv delete mode 100644 tests/filesystem/samples/test2.csv diff --git a/sources/csv_reader/__init__.py b/sources/csv_reader/__init__.py deleted file mode 100644 index 5cc3198b2..000000000 --- a/sources/csv_reader/__init__.py +++ /dev/null @@ -1,86 +0,0 @@ -""" -Source, which uses the `filesystem` source and DuckDB -to extract CSV files data from the given locations. -""" -from typing import Iterable, List, Union - -import duckdb -import fsspec -import pendulum - -import dlt -from dlt.common.typing import TDataItem -from dlt.common.storages.fsspec_filesystem import prepare_fsspec_args -from dlt.common.storages.configuration import FilesystemConfiguration - -try: - from filesystem import filesystem - from filesystem import FileItemDict - from filesystem.helpers import ( - AbstractFileSystem, - FilesystemConfigurationResource, - FileSystemCredentials, - ) -except ImportError: - from sources.filesystem import FileItemDict, filesystem - from sources.filesystem.helpers import ( - AbstractFileSystem, - FilesystemConfigurationResource, - FileSystemCredentials, - ) - - -@dlt.resource(spec=FilesystemConfigurationResource) -def read_location( - files: List[FileItemDict], - bucket: str, - credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value, -): - """A resource to extract data from the given CSV files. - - Args: - files (List[FileItemDict]): A list of files to read. - - Returns: - Iterable[TDataItem]: Data items, read from the given CSV files. - """ - config = FilesystemConfiguration(bucket, credentials) - kwargs = prepare_fsspec_args(config) - - state = dlt.current.resource_state() - start_from = state.setdefault("last_modified", pendulum.datetime(1970, 1, 1)) - - connection = duckdb.connect() - - for file in files: - if file["modification_date"] <= start_from: - continue - - with fsspec.open(file["file_url"], mode="rb", **kwargs) as f: - file_data = connection.read_csv(f).to_arrow_table() - - for batch in file_data.to_batches(max_chunksize=5000): - yield batch - - state["last_modified"] = max(file["modification_date"], state["last_modified"]) - - -@dlt.source -def csv_reader(bucket: str, globs: List[str] = ("*",)) -> Iterable[TDataItem]: - """ - A source to extract data from CSV files from - one or several locations. - - Args: - bucket (str): A bucket URL. - globs (Optional[List[str]]): - A list of glob patterns to match files. - Every glob will be extracted into a separate table. - - Returns: - Iterable[TDataItem]: - Data items, read from the matched CSV files. - """ - for glob in globs: - files = filesystem(bucket_url=bucket, file_glob=glob) - yield dlt.resource(read_location(files, bucket)) diff --git a/sources/csv_reader/requirements.txt b/sources/csv_reader/requirements.txt deleted file mode 100644 index b869b3929..000000000 --- a/sources/csv_reader/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -dlt>=0.3.5 -duckdb -fsspec \ No newline at end of file diff --git a/sources/filesystem/__init__.py b/sources/filesystem/__init__.py index 0908f217e..8c7115bd2 100644 --- a/sources/filesystem/__init__.py +++ b/sources/filesystem/__init__.py @@ -11,7 +11,13 @@ AbstractFileSystem, FilesystemConfigurationResource, ) -from .readers import ReadersSource, _read_csv, _read_jsonl, _read_parquet +from .readers import ( + ReadersSource, + _read_csv, + _read_csv_duckdb, + _read_jsonl, + _read_parquet, +) from .settings import DEFAULT_CHUNK_SIZE @@ -38,6 +44,8 @@ def readers( | dlt.transformer(name="read_jsonl")(_read_jsonl), filesystem(bucket_url, credentials, file_glob=file_glob) | dlt.transformer(name="read_parquet")(_read_parquet), + filesystem(bucket_url, credentials, file_glob=file_glob) + | dlt.transformer(name="read_csv_duckdb")(_read_csv_duckdb), ) @@ -88,3 +96,4 @@ def filesystem( read_csv = dlt.transformer(standalone=True)(_read_csv) read_jsonl = dlt.transformer(standalone=True)(_read_jsonl) read_parquet = dlt.transformer(standalone=True)(_read_parquet) +read_csv_duckdb = dlt.transformer(standalone=True)(_read_csv_duckdb) diff --git a/sources/filesystem/readers.py b/sources/filesystem/readers.py index c59037d71..3ae03ece8 100644 --- a/sources/filesystem/readers.py +++ b/sources/filesystem/readers.py @@ -1,10 +1,24 @@ -from typing import TYPE_CHECKING, Any, Iterator +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Union +import dlt from dlt.common import json +from dlt.common.storages.configuration import FilesystemConfiguration +from dlt.common.storages.fsspec_filesystem import prepare_fsspec_args from dlt.common.typing import copy_sig from dlt.sources import TDataItems, DltResource, DltSource from dlt.sources.filesystem import FileItemDict +try: + from filesystem.helpers import ( + AbstractFileSystem, + FileSystemCredentials, + ) +except ImportError: + from sources.filesystem.helpers import ( + AbstractFileSystem, + FileSystemCredentials, + ) + def _read_csv( items: Iterator[FileItemDict], chunksize: int = 10000, **pandas_kwargs: Any @@ -74,6 +88,107 @@ def _read_parquet( yield rows.to_pylist() +def _add_columns(columns: List[str], rows: List[List[Any]]): + """Adds column names to the given rows. + + Args: + columns (List[str]): The column names. + rows (List[List[Any]]): The rows. + + Returns: + List[Dict[str, Any]]: The rows with column names. + """ + result = [] + for row in rows: + result.append(dict(zip(columns, row))) + + return result + + +def _fetch_arrow(file_data, chunk_size): + """Fetches data from the given CSV file. + + Args: + file_data (DuckDBPyRelation): The CSV file data. + chunk_size (int): The number of rows to read at once. + + Yields: + Iterable[TDataItem]: Data items, read from the given CSV file. + """ + batcher = file_data.fetch_arrow_reader(batch_size=chunk_size) + yield from batcher + + +def _fetch_json(file_data, chunk_size): + """Fetches data from the given CSV file. + + Args: + file_data (DuckDBPyRelation): The CSV file data. + chunk_size (int): The number of rows to read at once. + + Yields: + Iterable[TDataItem]: Data items, read from the given CSV file. + """ + batch = True + while batch: + batch = file_data.fetchmany(chunk_size) + yield _add_columns(file_data.columns, batch) + + +def _read_csv_duckdb( + items: Iterator[FileItemDict], + bucket: str, + chunk_size: Optional[int] = 5000, + use_pyarrow: bool = False, + credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value, + read_csv_kwargs: Optional[Dict] = {}, +): + """A resource to extract data from the given CSV files. + + Uses DuckDB engine to import and cast CSV data. + + Args: + items (Iterator[FileItemDict]): CSV files to read. + bucket (str): The bucket name. + chunk_size (Optional[int]): + The number of rows to read at once. Defaults to 5000. + use_pyarrow (bool): + Whether to use `pyarrow` to read the data and designate + data schema. If set to False (by default), JSON is used. + credentials (Union[FileSystemCredentials, AbstractFileSystem]): + The credentials to use. Defaults to dlt.secrets.value. + read_csv_kwargs (Optional[Dict]): + Additional keyword arguments passed to `read_csv()`. + + Returns: + Iterable[TDataItem]: Data items, read from the given CSV files. + """ + import duckdb + import fsspec + import pendulum + + config = FilesystemConfiguration(bucket, credentials) + fs_kwargs = prepare_fsspec_args(config) + + state = dlt.current.resource_state() + start_from = state.setdefault("last_modified", pendulum.datetime(1970, 1, 1)) + + connection = duckdb.connect() + + helper = _fetch_arrow if use_pyarrow else _fetch_json + + for item in items: + if item["modification_date"] <= start_from: + continue + + with fsspec.open(item["file_url"], mode="rb", **fs_kwargs) as f: + file_data = connection.read_csv(f, **read_csv_kwargs) + + yield from helper(file_data, chunk_size) + + state["last_modified"] = max(item["modification_date"], state["last_modified"]) + + if TYPE_CHECKING: class ReadersSource(DltSource): @@ -91,5 +206,9 @@ def read_jsonl(self) -> DltResource: def read_parquet(self) -> DltResource: ... + @copy_sig(_read_csv_duckdb) + def read_location(self) -> DltResource: + ... + else: ReadersSource = DltSource diff --git a/tests/csv_reader/test_csv_reader.py b/tests/csv_reader/test_csv_reader.py deleted file mode 100644 index a5dacbd31..000000000 --- a/tests/csv_reader/test_csv_reader.py +++ /dev/null @@ -1,99 +0,0 @@ -import pytest -from unittest import mock - -import pendulum - -import dlt -from sources.csv_reader import csv_reader -from tests.utils import assert_load_info, assert_query_data, load_table_counts - - -TESTS_BUCKET_URLS = [ - ( - "file://tests/filesystem/samples", - ("test*.csv",), - pendulum.datetime(2024, 1, 19, 8, 56, 56), - ), - ( - "s3://dlt-ci-test-bucket/standard_source/csv_reader", - ("test*.csv",), - pendulum.datetime(2024, 1, 22, 12, 27, 53), - ), - # ( - # "gs://ci-test-bucket/standard_source/samples", - # ("*",), - # pendulum.datetime(2024, 1, 19, 8, 56, 56), - # ), - ( - "az://dlt-ci-test-bucket", - ("csv_reader_test*.csv",), - pendulum.datetime(2024, 1, 22, 10, 24, 30), - ), -] - - -@pytest.mark.parametrize("globs", TESTS_BUCKET_URLS) -def test_extract_data(globs): - bucket_url = globs[0] - globs = globs[1] - - pipeline = dlt.pipeline( - pipeline_name="csv_to_duckdb", - destination="postgres", - dataset_name="files", - ) - - res = csv_reader(bucket_url, globs) - load_info = pipeline.run(res) - - assert_load_info(load_info) - - table_names = [t["name"] for t in pipeline.default_schema.data_tables()] - table_counts = load_table_counts(pipeline, *table_names) - - assert table_counts["read_location"] == 3 - - assert_query_data( - pipeline, f"SELECT col1 FROM read_location ORDER BY col1", (1, 2, 3) - ) - - assert_query_data( - pipeline, f"SELECT col2 FROM read_location ORDER BY col1", ("yes", "yes", "no") - ) - - assert_query_data( - pipeline, f"SELECT col3 FROM read_location ORDER BY col1", (3, 66, 8) - ) - - -@pytest.mark.parametrize("globs", TESTS_BUCKET_URLS) -def test_extract_incremental(globs): - bucket_url = globs[0] - date = globs[2] - globs = globs[1] - - pipeline = dlt.pipeline( - pipeline_name="csv_to_duckdb", - destination="postgres", - dataset_name="files", - ) - - res = csv_reader(bucket_url, globs) - - with mock.patch("dlt.current.resource_state", return_value={"last_modified": date}): - load_info = pipeline.run(res) - - assert_load_info(load_info) - - table_names = [t["name"] for t in pipeline.default_schema.data_tables()] - table_counts = load_table_counts(pipeline, *table_names) - - assert table_counts["read_location"] == 2 - - assert_query_data( - pipeline, f"SELECT col2 FROM read_location ORDER BY col1", ("yes", "no") - ) - - assert_query_data( - pipeline, f"SELECT col3 FROM read_location ORDER BY col1", (66, 8) - ) diff --git a/tests/filesystem/samples/test1.csv b/tests/filesystem/samples/test1.csv deleted file mode 100644 index 30f09beb4..000000000 --- a/tests/filesystem/samples/test1.csv +++ /dev/null @@ -1,2 +0,0 @@ -col1,col2,col3 -1,"yes",3 \ No newline at end of file diff --git a/tests/filesystem/samples/test2.csv b/tests/filesystem/samples/test2.csv deleted file mode 100644 index 0024b543e..000000000 --- a/tests/filesystem/samples/test2.csv +++ /dev/null @@ -1,3 +0,0 @@ -col1,col2,col3 -2,"yes",66 -3,"no",8 \ No newline at end of file diff --git a/tests/filesystem/test_filesystem.py b/tests/filesystem/test_filesystem.py index 91f62e838..4e1aaff51 100644 --- a/tests/filesystem/test_filesystem.py +++ b/tests/filesystem/test_filesystem.py @@ -147,6 +147,9 @@ def test_standard_readers(bucket_url: str) -> None: csv_reader = readers(bucket_url, file_glob="**/*.csv").read_csv( float_precision="high" ) + csv_duckdb_reader = readers(bucket_url, file_glob="**/*.csv").read_csv_duckdb( + bucket_url + ) # a step that copies files into test storage def _copy(item: FileItemDict): @@ -174,16 +177,23 @@ def _copy(item: FileItemDict): parquet_reader.with_name("parquet_example"), downloader.with_name("listing"), csv_reader.with_name("csv_example"), + csv_duckdb_reader.with_name("csv_duckdb_example"), ] ) assert_load_info(load_info) assert load_table_counts( - pipeline, "jsonl_example", "parquet_example", "listing", "csv_example" + pipeline, + "jsonl_example", + "parquet_example", + "listing", + "csv_example", + "csv_duckdb_example", ) == { "jsonl_example": 1034, "parquet_example": 1034, "listing": 10, "csv_example": 1270, + "csv_duckdb_example": 1272, } # print(pipeline.last_trace.last_normalize_info) # print(pipeline.default_schema.to_pretty_yaml()) From 095cc1bd812e9f26220efec56014c7eaf802fbb4 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 24 Jan 2024 13:56:07 +0400 Subject: [PATCH 09/22] fixes --- sources/csv_reader_pipeline.py | 35 ------------------ sources/filesystem/helpers.py | 49 ++++++++++++++++++++++++- sources/filesystem/readers.py | 67 ++++------------------------------ 3 files changed, 56 insertions(+), 95 deletions(-) delete mode 100644 sources/csv_reader_pipeline.py diff --git a/sources/csv_reader_pipeline.py b/sources/csv_reader_pipeline.py deleted file mode 100644 index 1adcc7f2b..000000000 --- a/sources/csv_reader_pipeline.py +++ /dev/null @@ -1,35 +0,0 @@ -import dlt - -from csv_reader import csv_reader - - -def csv_with_duck_db(): - pipeline = dlt.pipeline( - pipeline_name="csv_to_duckdb", - destination="postgres", - dataset_name="files", - full_refresh=True, - ) - - reader = csv_reader("protocol:///bucket_url", ("*file*.csv",)) - load_info = pipeline.run(reader) - print(load_info) - - -def csv_with_duck_db_hints(): - pipeline = dlt.pipeline( - pipeline_name="csv_to_duckdb", - destination="postgres", - dataset_name="files", - full_refresh=True, - ) - - reader = csv_reader("protocol:///bucket_url", ("*file*.csv",)) - reader.resources["read_location"].apply_hints(primary_key="col1") - load_info = pipeline.run(reader) - print(load_info) - - -if __name__ == "__main__": - # csv_with_duck_db() - csv_with_duck_db_hints() diff --git a/sources/filesystem/helpers.py b/sources/filesystem/helpers.py index 225b171b7..b4971ee8e 100644 --- a/sources/filesystem/helpers.py +++ b/sources/filesystem/helpers.py @@ -1,5 +1,5 @@ """Helpers for the filesystem resource.""" -from typing import Optional, Type, Union, TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, List, Optional, Type, Union from fsspec import AbstractFileSystem # type: ignore from dlt.common.configuration import resolve_type @@ -46,3 +46,50 @@ def _get_fsspec( filesystem_instance.explicit_args.get("bucket_url", None), filesystem_instance.explicit_args.get("credentials", None), ) + + +def add_columns(columns: List[str], rows: List[List[Any]]): + """Adds column names to the given rows. + + Args: + columns (List[str]): The column names. + rows (List[List[Any]]): The rows. + + Returns: + List[Dict[str, Any]]: The rows with column names. + """ + result = [] + for row in rows: + result.append(dict(zip(columns, row))) + + return result + + +def fetch_arrow(file_data, chunk_size): + """Fetches data from the given CSV file. + + Args: + file_data (DuckDBPyRelation): The CSV file data. + chunk_size (int): The number of rows to read at once. + + Yields: + Iterable[TDataItem]: Data items, read from the given CSV file. + """ + batcher = file_data.fetch_arrow_reader(batch_size=chunk_size) + yield from batcher + + +def fetch_json(file_data, chunk_size): + """Fetches data from the given CSV file. + + Args: + file_data (DuckDBPyRelation): The CSV file data. + chunk_size (int): The number of rows to read at once. + + Yields: + Iterable[TDataItem]: Data items, read from the given CSV file. + """ + batch = True + while batch: + batch = file_data.fetchmany(chunk_size) + yield add_columns(file_data.columns, batch) diff --git a/sources/filesystem/readers.py b/sources/filesystem/readers.py index 3ae03ece8..f225e7339 100644 --- a/sources/filesystem/readers.py +++ b/sources/filesystem/readers.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional, Union import dlt from dlt.common import json @@ -8,16 +8,12 @@ from dlt.sources import TDataItems, DltResource, DltSource from dlt.sources.filesystem import FileItemDict -try: - from filesystem.helpers import ( - AbstractFileSystem, - FileSystemCredentials, - ) -except ImportError: - from sources.filesystem.helpers import ( - AbstractFileSystem, - FileSystemCredentials, - ) +from .helpers import ( + fetch_arrow, + fetch_json, + AbstractFileSystem, + FileSystemCredentials, +) def _read_csv( @@ -88,53 +84,6 @@ def _read_parquet( yield rows.to_pylist() -def _add_columns(columns: List[str], rows: List[List[Any]]): - """Adds column names to the given rows. - - Args: - columns (List[str]): The column names. - rows (List[List[Any]]): The rows. - - Returns: - List[Dict[str, Any]]: The rows with column names. - """ - result = [] - for row in rows: - result.append(dict(zip(columns, row))) - - return result - - -def _fetch_arrow(file_data, chunk_size): - """Fetches data from the given CSV file. - - Args: - file_data (DuckDBPyRelation): The CSV file data. - chunk_size (int): The number of rows to read at once. - - Yields: - Iterable[TDataItem]: Data items, read from the given CSV file. - """ - batcher = file_data.fetch_arrow_reader(batch_size=chunk_size) - yield from batcher - - -def _fetch_json(file_data, chunk_size): - """Fetches data from the given CSV file. - - Args: - file_data (DuckDBPyRelation): The CSV file data. - chunk_size (int): The number of rows to read at once. - - Yields: - Iterable[TDataItem]: Data items, read from the given CSV file. - """ - batch = True - while batch: - batch = file_data.fetchmany(chunk_size) - yield _add_columns(file_data.columns, batch) - - def _read_csv_duckdb( items: Iterator[FileItemDict], bucket: str, @@ -175,7 +124,7 @@ def _read_csv_duckdb( connection = duckdb.connect() - helper = _fetch_arrow if use_pyarrow else _fetch_json + helper = fetch_arrow if use_pyarrow else fetch_json for item in items: if item["modification_date"] <= start_from: From 490c5e727457811d480054601fa44180ad62ccb6 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 24 Jan 2024 15:32:36 +0400 Subject: [PATCH 10/22] lint fixes --- sources/filesystem/helpers.py | 17 ++++++++++------- sources/filesystem/readers.py | 12 ++++++------ 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/sources/filesystem/helpers.py b/sources/filesystem/helpers.py index b4971ee8e..8bd11705d 100644 --- a/sources/filesystem/helpers.py +++ b/sources/filesystem/helpers.py @@ -1,10 +1,11 @@ """Helpers for the filesystem resource.""" -from typing import TYPE_CHECKING, Any, List, Optional, Type, Union +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Type, Union from fsspec import AbstractFileSystem # type: ignore from dlt.common.configuration import resolve_type +from dlt.common.typing import TDataItem -from dlt.sources import DltResource, DltSource +from dlt.sources import DltResource from dlt.sources.filesystem import fsspec_filesystem from dlt.sources.config import configspec, with_config from dlt.sources.credentials import ( @@ -48,7 +49,7 @@ def _get_fsspec( ) -def add_columns(columns: List[str], rows: List[List[Any]]): +def add_columns(columns: List[str], rows: List[List[Any]]) -> List[Dict[str, Any]]: """Adds column names to the given rows. Args: @@ -65,7 +66,7 @@ def add_columns(columns: List[str], rows: List[List[Any]]): return result -def fetch_arrow(file_data, chunk_size): +def fetch_arrow(file_data, chunk_size: int) -> Iterable[TDataItem]: # type: ignore """Fetches data from the given CSV file. Args: @@ -79,7 +80,7 @@ def fetch_arrow(file_data, chunk_size): yield from batcher -def fetch_json(file_data, chunk_size): +def fetch_json(file_data, chunk_size: int) -> List[Dict[str, Any]]: # type: ignore """Fetches data from the given CSV file. Args: @@ -89,7 +90,9 @@ def fetch_json(file_data, chunk_size): Yields: Iterable[TDataItem]: Data items, read from the given CSV file. """ - batch = True - while batch: + while True: batch = file_data.fetchmany(chunk_size) + if not batch: + break + yield add_columns(file_data.columns, batch) diff --git a/sources/filesystem/readers.py b/sources/filesystem/readers.py index f225e7339..09dde2739 100644 --- a/sources/filesystem/readers.py +++ b/sources/filesystem/readers.py @@ -90,8 +90,8 @@ def _read_csv_duckdb( chunk_size: Optional[int] = 5000, use_pyarrow: bool = False, credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value, - read_csv_kwargs: Optional[Dict] = {}, -): + read_csv_kwargs: Optional[Dict[str, Any]] = {}, +) -> Iterator[TDataItems]: """A resource to extract data from the given CSV files. Uses DuckDB engine to import and cast CSV data. @@ -107,13 +107,13 @@ def _read_csv_duckdb( credentials (Union[FileSystemCredentials, AbstractFileSystem]): The credentials to use. Defaults to dlt.secrets.value. read_csv_kwargs (Optional[Dict]): - Additional keyword arguments passed to `read_csv()`. + Additional keyword arguments to pass to the `read_csv()`. Returns: Iterable[TDataItem]: Data items, read from the given CSV files. """ import duckdb - import fsspec + import fsspec # type: ignore import pendulum config = FilesystemConfiguration(bucket, credentials) @@ -122,10 +122,10 @@ def _read_csv_duckdb( state = dlt.current.resource_state() start_from = state.setdefault("last_modified", pendulum.datetime(1970, 1, 1)) - connection = duckdb.connect() - helper = fetch_arrow if use_pyarrow else fetch_json + connection = duckdb.connect() + for item in items: if item["modification_date"] <= start_from: continue From e25d95aaf3c80fc8441fc6e050c78b0f1eb226cd Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 25 Jan 2024 14:38:29 +0400 Subject: [PATCH 11/22] fixes --- sources/filesystem/README.md | 18 ++++++------ sources/filesystem/readers.py | 26 ++++------------- sources/filesystem_pipeline.py | 28 +++++++++++++++---- tests/filesystem/samples/csv/freshman_lbs.csv | 2 +- tests/filesystem/test_filesystem.py | 4 +-- 5 files changed, 40 insertions(+), 38 deletions(-) diff --git a/sources/filesystem/README.md b/sources/filesystem/README.md index 340f708f9..4a57650df 100644 --- a/sources/filesystem/README.md +++ b/sources/filesystem/README.md @@ -1,18 +1,18 @@ # Readers Source & Filesystem -This verified source easily streams files from AWS S3, GCS, Azure, or local filesystem using the -reader source. +This verified source easily streams files from AWS S3, GCS, Azure, or local filesystem using the reader source. Sources and resources that can be used with this verified source are: -| Name | Type | Description | -|--------------|----------------------|---------------------------------------------------------------------------| -| readers | Source | Lists and reads files with resource `filesystem` and readers transformers | -| filesystem | Resource | Lists files in `bucket_url` using `file_glob` pattern | -| read_csv | Resource-transformer | Reads CSV file with "Pandas" chunk by chunk | -| read_jsonl | Resource-transformer | Reads JSONL file content and extracts the data | -| read_parquet | Resource-transformer | Reads Parquet file content and extracts the data with "Pyarrow" | +| Name | Type | Description | +|-----------------|----------------------|---------------------------------------------------------------------------| +| readers | Source | Lists and reads files with resource `filesystem` and readers transformers | +| filesystem | Resource | Lists files in `bucket_url` using `file_glob` pattern | +| read_csv | Resource-transformer | Reads CSV file with "Pandas" chunk by chunk | +| read_csv_duckdb | Resource-transformer | Reads CSV file with DuckDB engine chunk by chunk | +| read_jsonl | Resource-transformer | Reads JSONL file content and extracts the data | +| read_parquet | Resource-transformer | Reads Parquet file content and extracts the data with "Pyarrow" | ## Initialize the source diff --git a/sources/filesystem/readers.py b/sources/filesystem/readers.py index 09dde2739..ca8dedfd6 100644 --- a/sources/filesystem/readers.py +++ b/sources/filesystem/readers.py @@ -1,19 +1,12 @@ -from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional import dlt from dlt.common import json -from dlt.common.storages.configuration import FilesystemConfiguration -from dlt.common.storages.fsspec_filesystem import prepare_fsspec_args from dlt.common.typing import copy_sig from dlt.sources import TDataItems, DltResource, DltSource from dlt.sources.filesystem import FileItemDict -from .helpers import ( - fetch_arrow, - fetch_json, - AbstractFileSystem, - FileSystemCredentials, -) +from .helpers import fetch_arrow, fetch_json def _read_csv( @@ -86,11 +79,9 @@ def _read_parquet( def _read_csv_duckdb( items: Iterator[FileItemDict], - bucket: str, chunk_size: Optional[int] = 5000, use_pyarrow: bool = False, - credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value, - read_csv_kwargs: Optional[Dict[str, Any]] = {}, + read_csv_kwargs: Optional[Dict[str, Any]] = None, ) -> Iterator[TDataItems]: """A resource to extract data from the given CSV files. @@ -98,14 +89,11 @@ def _read_csv_duckdb( Args: items (Iterator[FileItemDict]): CSV files to read. - bucket (str): The bucket name. chunk_size (Optional[int]): The number of rows to read at once. Defaults to 5000. use_pyarrow (bool): Whether to use `pyarrow` to read the data and designate data schema. If set to False (by default), JSON is used. - credentials (Union[FileSystemCredentials, AbstractFileSystem]): - The credentials to use. Defaults to dlt.secrets.value. read_csv_kwargs (Optional[Dict]): Additional keyword arguments to pass to the `read_csv()`. @@ -113,11 +101,9 @@ def _read_csv_duckdb( Iterable[TDataItem]: Data items, read from the given CSV files. """ import duckdb - import fsspec # type: ignore import pendulum - config = FilesystemConfiguration(bucket, credentials) - fs_kwargs = prepare_fsspec_args(config) + read_csv_kwargs = read_csv_kwargs or {} state = dlt.current.resource_state() start_from = state.setdefault("last_modified", pendulum.datetime(1970, 1, 1)) @@ -130,8 +116,8 @@ def _read_csv_duckdb( if item["modification_date"] <= start_from: continue - with fsspec.open(item["file_url"], mode="rb", **fs_kwargs) as f: - file_data = connection.read_csv(f, **read_csv_kwargs) + with item.open() as f: + file_data = connection.read_csv(f, **read_csv_kwargs) # type: ignore yield from helper(file_data, chunk_size) diff --git a/sources/filesystem_pipeline.py b/sources/filesystem_pipeline.py index af0923851..0bdb3c720 100644 --- a/sources/filesystem_pipeline.py +++ b/sources/filesystem_pipeline.py @@ -1,21 +1,18 @@ -import json import os import posixpath from typing import Iterator import dlt -from dlt.sources import TDataItem, TDataItems +from dlt.sources import TDataItems try: - from .filesystem import FileItemDict, filesystem, readers, read_csv, read_jsonl, read_parquet # type: ignore + from .filesystem import FileItemDict, filesystem, readers, read_csv # type: ignore except ImportError: from filesystem import ( FileItemDict, filesystem, readers, read_csv, - read_jsonl, - read_parquet, ) @@ -26,7 +23,7 @@ def stream_and_merge_csv() -> None: """Demonstrates how to scan folder with csv files, load them in chunk and merge on date column with the previous load""" pipeline = dlt.pipeline( pipeline_name="standard_filesystem_csv", - destination="duckdb", + destination="postgres", dataset_name="met_data", ) # met_data contains 3 columns, where "date" column contain a date on which we want to merge @@ -54,6 +51,24 @@ def stream_and_merge_csv() -> None: print(pipeline.last_trace.last_normalize_info) +def read_csv_with_duckdb() -> None: + pipeline = dlt.pipeline( + pipeline_name="standard_filesystem", + destination="duckdb", + dataset_name="met_data", + ) + + # load all the CSV data, excluding headers + met_files = readers( + bucket_url=TESTS_BUCKET_URL, file_glob="met_csv/A801/*.csv" + ).read_csv_duckdb(chunk_size=1000, read_csv_kwargs={"header": True}) + + load_info = pipeline.run(met_files) + + print(load_info) + print(pipeline.last_trace.last_normalize_info) + + def read_parquet_and_jsonl_chunked() -> None: pipeline = dlt.pipeline( pipeline_name="standard_filesystem", @@ -177,3 +192,4 @@ def read_files_incrementally_mtime() -> None: read_parquet_and_jsonl_chunked() read_custom_file_type_excel() read_files_incrementally_mtime() + read_csv_with_duckdb() diff --git a/tests/filesystem/samples/csv/freshman_lbs.csv b/tests/filesystem/samples/csv/freshman_lbs.csv index 31f4bd738..d5992daff 100644 --- a/tests/filesystem/samples/csv/freshman_lbs.csv +++ b/tests/filesystem/samples/csv/freshman_lbs.csv @@ -1,4 +1,4 @@ -Sex,Weight(lbs,Sep),Weight(lbs,Apr),BMI(Sep),BMI(Apr) +"Sex","Weight(lbs,Sep)","Weight(lbs,Apr)","BMI(Sep)","BMI(Apr)" M,159,130,22.02,18.14 M,214,190,19.70,17.44 M,163,152,24.09,22.43 diff --git a/tests/filesystem/test_filesystem.py b/tests/filesystem/test_filesystem.py index 4e1aaff51..4e841c093 100644 --- a/tests/filesystem/test_filesystem.py +++ b/tests/filesystem/test_filesystem.py @@ -148,7 +148,7 @@ def test_standard_readers(bucket_url: str) -> None: float_precision="high" ) csv_duckdb_reader = readers(bucket_url, file_glob="**/*.csv").read_csv_duckdb( - bucket_url + read_csv_kwargs={"header": True} ) # a step that copies files into test storage @@ -193,7 +193,7 @@ def _copy(item: FileItemDict): "parquet_example": 1034, "listing": 10, "csv_example": 1270, - "csv_duckdb_example": 1272, + "csv_duckdb_example": 1270, } # print(pipeline.last_trace.last_normalize_info) # print(pipeline.default_schema.to_pretty_yaml()) From 755abf09b263042da73372d75a0ce53c15bc79b7 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 25 Jan 2024 14:48:04 +0400 Subject: [PATCH 12/22] a side import fix --- tests/filesystem/test_filesystem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/filesystem/test_filesystem.py b/tests/filesystem/test_filesystem.py index 4e841c093..5b0a6ecda 100644 --- a/tests/filesystem/test_filesystem.py +++ b/tests/filesystem/test_filesystem.py @@ -8,10 +8,10 @@ from sources.filesystem import ( filesystem, readers, - fsspec_from_resource, FileItem, FileItemDict, ) +from sources.filesystem.helpers import fsspec_from_resource from tests.utils import ( assert_load_info, load_table_counts, From bca25daa869c22781efe1f75cb0afd6cacdde490 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 25 Jan 2024 15:19:07 +0400 Subject: [PATCH 13/22] fix typo --- sources/filesystem_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sources/filesystem_pipeline.py b/sources/filesystem_pipeline.py index 0bdb3c720..7af7c60d5 100644 --- a/sources/filesystem_pipeline.py +++ b/sources/filesystem_pipeline.py @@ -23,7 +23,7 @@ def stream_and_merge_csv() -> None: """Demonstrates how to scan folder with csv files, load them in chunk and merge on date column with the previous load""" pipeline = dlt.pipeline( pipeline_name="standard_filesystem_csv", - destination="postgres", + destination="duckdb", dataset_name="met_data", ) # met_data contains 3 columns, where "date" column contain a date on which we want to merge From 5bab574b279626f564b82b9e9e2c8de1e91914da Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 26 Jan 2024 15:17:58 +0400 Subject: [PATCH 14/22] add a gz file and an example --- sources/filesystem_pipeline.py | 16 ++++++++++++++++ tests/filesystem/samples/taxi.csv.gz | Bin 0 -> 899 bytes 2 files changed, 16 insertions(+) create mode 100644 tests/filesystem/samples/taxi.csv.gz diff --git a/sources/filesystem_pipeline.py b/sources/filesystem_pipeline.py index 7af7c60d5..263d12641 100644 --- a/sources/filesystem_pipeline.py +++ b/sources/filesystem_pipeline.py @@ -186,6 +186,21 @@ def read_files_incrementally_mtime() -> None: print(pipeline.last_trace.last_normalize_info) +def read_csv_with_duck_db_compressed(): + pipeline = dlt.pipeline( + pipeline_name="standard_filesystem", + destination="postgres", + dataset_name="met_data", + full_refresh=True, + ) + + met_files = readers(bucket_url=TESTS_BUCKET_URL, file_glob="*.gz").read_csv_duckdb() + + load_info = pipeline.run(met_files) + + print(load_info) + + if __name__ == "__main__": copy_files_resource("_storage") stream_and_merge_csv() @@ -193,3 +208,4 @@ def read_files_incrementally_mtime() -> None: read_custom_file_type_excel() read_files_incrementally_mtime() read_csv_with_duckdb() + read_csv_with_duck_db_compressed() diff --git a/tests/filesystem/samples/taxi.csv.gz b/tests/filesystem/samples/taxi.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..184c0740d47bfefe12c14ed8728d63fa59a4620a GIT binary patch literal 899 zcmV-}1AP1+iwFo2P_tzK19V|{X)a@Pb^w)CU2EG&6n*cnFnJz{x!*H>YfYCFQdrj8 zUHT{*(TEx;m6ewK`@JK{Nolmu;tyHlk$uj&=Nw%D#+Z~d!Bx1z2vS9>BJ5(^`xEHJ zD)64m-pZ)XIkFOpCX&p7V{NPes+3?^ns-@-WD&spAoH%oY9cuSvrv#gwQvA`57A!` zVgRXAI2^CT`|frczW3F7+#QF9=`MV@zi-FgzwLS29lP81)m7mU!I`8@9{O?>_hKs# zbFhgWuc3+-g#J5fmQXU9(ZVA|WXzB=UU(>*`n4v=3{D_G%@TAb5&dB`g~MNO!|QIm zAI9l&oY4aNRT8(*;>%TleY~K}Ni1_zql5`&JsqU$X=D`H94yx{azd~Io>tMxIGJV4 zTHw3PwR24u&Fid1cBoBf-QPB@?LXb=)O>EkEcALZqet?Y8*ZWJ4NkIWozWLy=+H$a zgJoNQC8C%ZqPiVc^y&ivb;O0xY%B-uW@=1b>q-iANtO}Jbry>#??EC+_xLvaosn(llMqZ(C&iSrrG;DB zq|sa;F6&WC!fU_yDJZgz@Nnw-Y3R2@zYI#r4Uo5hZ>M`0|Hh<#)13eb6gSf5q&KD|d%(-I1RT7(4_x@A zdQriPPRSd3r7Y<2Q(;R2%Sg>PfH$3A4mMNus6Lmot=?Q(cy5Z)!quS}`}T9!?yfzB z;o~rV`Q{(igt;Bu4O<*=BX{}7#z^P|n@YK@gLCgvBqx?jxvhPNZbNTzi?ozt^gm>O z_j8oFoDKfJ?|v;j;Y~C7+T-reX8f`{bkk#XGkkXu-Z%Z7f7SFeSwgQg-HaE%92oK* z^{Gx=_BAHB^tP2ODVin`StbVF;X+EL+3%lozTC6Eg><>e`c1V)&skpS`kAxEh3ZY$ Ze;Q6tW_*2~hOuj2{RgnA?CcB(006}u&7}YU literal 0 HcmV?d00001 From ee3dfbd70dae914384f41743c6e5603404f39825 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 26 Jan 2024 16:04:42 +0400 Subject: [PATCH 15/22] fix --- sources/filesystem/readers.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/sources/filesystem/readers.py b/sources/filesystem/readers.py index ca8dedfd6..1f4e2f640 100644 --- a/sources/filesystem/readers.py +++ b/sources/filesystem/readers.py @@ -1,6 +1,5 @@ from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional -import dlt from dlt.common import json from dlt.common.typing import copy_sig from dlt.sources import TDataItems, DltResource, DltSource @@ -101,28 +100,18 @@ def _read_csv_duckdb( Iterable[TDataItem]: Data items, read from the given CSV files. """ import duckdb - import pendulum - read_csv_kwargs = read_csv_kwargs or {} - - state = dlt.current.resource_state() - start_from = state.setdefault("last_modified", pendulum.datetime(1970, 1, 1)) + connection = duckdb.connect() + read_csv_kwargs = read_csv_kwargs or {} helper = fetch_arrow if use_pyarrow else fetch_json - connection = duckdb.connect() - for item in items: - if item["modification_date"] <= start_from: - continue - with item.open() as f: file_data = connection.read_csv(f, **read_csv_kwargs) # type: ignore yield from helper(file_data, chunk_size) - state["last_modified"] = max(item["modification_date"], state["last_modified"]) - if TYPE_CHECKING: From 9831cf8c9dfa88f74f9e5565cd4a1ffa51b51599 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 26 Jan 2024 16:06:28 +0400 Subject: [PATCH 16/22] delete test file --- tests/filesystem/samples/taxi.csv.gz | Bin 899 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 tests/filesystem/samples/taxi.csv.gz diff --git a/tests/filesystem/samples/taxi.csv.gz b/tests/filesystem/samples/taxi.csv.gz deleted file mode 100644 index 184c0740d47bfefe12c14ed8728d63fa59a4620a..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 899 zcmV-}1AP1+iwFo2P_tzK19V|{X)a@Pb^w)CU2EG&6n*cnFnJz{x!*H>YfYCFQdrj8 zUHT{*(TEx;m6ewK`@JK{Nolmu;tyHlk$uj&=Nw%D#+Z~d!Bx1z2vS9>BJ5(^`xEHJ zD)64m-pZ)XIkFOpCX&p7V{NPes+3?^ns-@-WD&spAoH%oY9cuSvrv#gwQvA`57A!` zVgRXAI2^CT`|frczW3F7+#QF9=`MV@zi-FgzwLS29lP81)m7mU!I`8@9{O?>_hKs# zbFhgWuc3+-g#J5fmQXU9(ZVA|WXzB=UU(>*`n4v=3{D_G%@TAb5&dB`g~MNO!|QIm zAI9l&oY4aNRT8(*;>%TleY~K}Ni1_zql5`&JsqU$X=D`H94yx{azd~Io>tMxIGJV4 zTHw3PwR24u&Fid1cBoBf-QPB@?LXb=)O>EkEcALZqet?Y8*ZWJ4NkIWozWLy=+H$a zgJoNQC8C%ZqPiVc^y&ivb;O0xY%B-uW@=1b>q-iANtO}Jbry>#??EC+_xLvaosn(llMqZ(C&iSrrG;DB zq|sa;F6&WC!fU_yDJZgz@Nnw-Y3R2@zYI#r4Uo5hZ>M`0|Hh<#)13eb6gSf5q&KD|d%(-I1RT7(4_x@A zdQriPPRSd3r7Y<2Q(;R2%Sg>PfH$3A4mMNus6Lmot=?Q(cy5Z)!quS}`}T9!?yfzB z;o~rV`Q{(igt;Bu4O<*=BX{}7#z^P|n@YK@gLCgvBqx?jxvhPNZbNTzi?ozt^gm>O z_j8oFoDKfJ?|v;j;Y~C7+T-reX8f`{bkk#XGkkXu-Z%Z7f7SFeSwgQg-HaE%92oK* z^{Gx=_BAHB^tP2ODVin`StbVF;X+EL+3%lozTC6Eg><>e`c1V)&skpS`kAxEh3ZY$ Ze;Q6tW_*2~hOuj2{RgnA?CcB(006}u&7}YU From 96ef2aed91d5e5375073f459521c67e39627204f Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 26 Jan 2024 16:10:32 +0400 Subject: [PATCH 17/22] delete testing example --- sources/filesystem_pipeline.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/sources/filesystem_pipeline.py b/sources/filesystem_pipeline.py index 263d12641..7af7c60d5 100644 --- a/sources/filesystem_pipeline.py +++ b/sources/filesystem_pipeline.py @@ -186,21 +186,6 @@ def read_files_incrementally_mtime() -> None: print(pipeline.last_trace.last_normalize_info) -def read_csv_with_duck_db_compressed(): - pipeline = dlt.pipeline( - pipeline_name="standard_filesystem", - destination="postgres", - dataset_name="met_data", - full_refresh=True, - ) - - met_files = readers(bucket_url=TESTS_BUCKET_URL, file_glob="*.gz").read_csv_duckdb() - - load_info = pipeline.run(met_files) - - print(load_info) - - if __name__ == "__main__": copy_files_resource("_storage") stream_and_merge_csv() @@ -208,4 +193,3 @@ def read_csv_with_duck_db_compressed(): read_custom_file_type_excel() read_files_incrementally_mtime() read_csv_with_duckdb() - read_csv_with_duck_db_compressed() From 8b93e7cb532b9d92503af1b81d0ab87ce9a7840f Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 26 Jan 2024 16:58:24 +0400 Subject: [PATCH 18/22] add a gzip csv read example --- sources/filesystem_pipeline.py | 18 ++++++++++++++++++ tests/filesystem/samples/gzip/taxi.csv.gz | Bin 0 -> 899 bytes tests/filesystem/test_filesystem.py | 2 +- 3 files changed, 19 insertions(+), 1 deletion(-) create mode 100644 tests/filesystem/samples/gzip/taxi.csv.gz diff --git a/sources/filesystem_pipeline.py b/sources/filesystem_pipeline.py index 7af7c60d5..b19f32f0b 100644 --- a/sources/filesystem_pipeline.py +++ b/sources/filesystem_pipeline.py @@ -69,6 +69,23 @@ def read_csv_with_duckdb() -> None: print(pipeline.last_trace.last_normalize_info) +def read_csv_duckdb_compressed(): + pipeline = dlt.pipeline( + pipeline_name="standard_filesystem", + destination="postgres", + dataset_name="met_data", + full_refresh=True, + ) + + met_files = readers( + bucket_url=TESTS_BUCKET_URL, + file_glob="gzip/*.gz", + ).read_csv_duckdb() + load_info = pipeline.run(met_files) + + print(load_info) + + def read_parquet_and_jsonl_chunked() -> None: pipeline = dlt.pipeline( pipeline_name="standard_filesystem", @@ -193,3 +210,4 @@ def read_files_incrementally_mtime() -> None: read_custom_file_type_excel() read_files_incrementally_mtime() read_csv_with_duckdb() + read_csv_duckdb_compressed() diff --git a/tests/filesystem/samples/gzip/taxi.csv.gz b/tests/filesystem/samples/gzip/taxi.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..184c0740d47bfefe12c14ed8728d63fa59a4620a GIT binary patch literal 899 zcmV-}1AP1+iwFo2P_tzK19V|{X)a@Pb^w)CU2EG&6n*cnFnJz{x!*H>YfYCFQdrj8 zUHT{*(TEx;m6ewK`@JK{Nolmu;tyHlk$uj&=Nw%D#+Z~d!Bx1z2vS9>BJ5(^`xEHJ zD)64m-pZ)XIkFOpCX&p7V{NPes+3?^ns-@-WD&spAoH%oY9cuSvrv#gwQvA`57A!` zVgRXAI2^CT`|frczW3F7+#QF9=`MV@zi-FgzwLS29lP81)m7mU!I`8@9{O?>_hKs# zbFhgWuc3+-g#J5fmQXU9(ZVA|WXzB=UU(>*`n4v=3{D_G%@TAb5&dB`g~MNO!|QIm zAI9l&oY4aNRT8(*;>%TleY~K}Ni1_zql5`&JsqU$X=D`H94yx{azd~Io>tMxIGJV4 zTHw3PwR24u&Fid1cBoBf-QPB@?LXb=)O>EkEcALZqet?Y8*ZWJ4NkIWozWLy=+H$a zgJoNQC8C%ZqPiVc^y&ivb;O0xY%B-uW@=1b>q-iANtO}Jbry>#??EC+_xLvaosn(llMqZ(C&iSrrG;DB zq|sa;F6&WC!fU_yDJZgz@Nnw-Y3R2@zYI#r4Uo5hZ>M`0|Hh<#)13eb6gSf5q&KD|d%(-I1RT7(4_x@A zdQriPPRSd3r7Y<2Q(;R2%Sg>PfH$3A4mMNus6Lmot=?Q(cy5Z)!quS}`}T9!?yfzB z;o~rV`Q{(igt;Bu4O<*=BX{}7#z^P|n@YK@gLCgvBqx?jxvhPNZbNTzi?ozt^gm>O z_j8oFoDKfJ?|v;j;Y~C7+T-reX8f`{bkk#XGkkXu-Z%Z7f7SFeSwgQg-HaE%92oK* z^{Gx=_BAHB^tP2ODVin`StbVF;X+EL+3%lozTC6Eg><>e`c1V)&skpS`kAxEh3ZY$ Ze;Q6tW_*2~hOuj2{RgnA?CcB(006}u&7}YU literal 0 HcmV?d00001 diff --git a/tests/filesystem/test_filesystem.py b/tests/filesystem/test_filesystem.py index 5b0a6ecda..6b7731814 100644 --- a/tests/filesystem/test_filesystem.py +++ b/tests/filesystem/test_filesystem.py @@ -191,7 +191,7 @@ def _copy(item: FileItemDict): ) == { "jsonl_example": 1034, "parquet_example": 1034, - "listing": 10, + "listing": 11, "csv_example": 1270, "csv_duckdb_example": 1270, } From 34801e27861533d271a2805f102a15044b077312 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 26 Jan 2024 17:07:33 +0400 Subject: [PATCH 19/22] fix --- sources/filesystem_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sources/filesystem_pipeline.py b/sources/filesystem_pipeline.py index b19f32f0b..0a1a429fa 100644 --- a/sources/filesystem_pipeline.py +++ b/sources/filesystem_pipeline.py @@ -69,7 +69,7 @@ def read_csv_with_duckdb() -> None: print(pipeline.last_trace.last_normalize_info) -def read_csv_duckdb_compressed(): +def read_csv_duckdb_compressed() -> None: pipeline = dlt.pipeline( pipeline_name="standard_filesystem", destination="postgres", From 18af577a938511e54791c3c4fdd2e4a8484fc528 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 30 Jan 2024 11:37:27 +0400 Subject: [PATCH 20/22] update test assert values --- tests/filesystem/settings.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/filesystem/settings.py b/tests/filesystem/settings.py index 5211b4845..a442052b3 100644 --- a/tests/filesystem/settings.py +++ b/tests/filesystem/settings.py @@ -19,6 +19,7 @@ "csv/freshman_lbs.csv", "csv/mlb_players.csv", "csv/mlb_teams_2012.csv", + "gzip/taxi.csv.gz", "jsonl/mlb_players.jsonl", "parquet/mlb_players.parquet", ], From f90a495e82238ad4d1c6cc67fa620b4995816544 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 30 Jan 2024 13:59:26 -0500 Subject: [PATCH 21/22] bumps dlt to version 0.4.3a0 --- poetry.lock | 69 +++++++++++++++++++++++++++----------------------- pyproject.toml | 4 +-- 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/poetry.lock b/poetry.lock index ee043611f..9fcc809f3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -484,33 +484,18 @@ category = "main" optional = false python-versions = ">=3.5" -[[package]] -name = "deprecated" -version = "1.2.14" -description = "Python @deprecated decorator to deprecate old python classes, functions or methods." -category = "main" -optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" - -[package.dependencies] -wrapt = ">=1.10,<2" - -[package.extras] -dev = ["PyTest", "PyTest-Cov", "bump2version (<1)", "sphinx (<2)", "tox"] - [[package]] name = "dlt" -version = "0.3.23" +version = "0.4.3a0" description = "dlt is an open-source python-first scalable data loading library that does not require any backend to run." category = "main" optional = false -python-versions = ">=3.8.1,<4.0" +python-versions = ">=3.8.1,<3.13" [package.dependencies] astunparse = ">=1.6.3" botocore = {version = ">=1.28", optional = true, markers = "extra == \"filesystem\" or extra == \"s3\" or extra == \"athena\""} click = ">=7.1" -deprecated = ">=1.2.9" duckdb = {version = ">=0.6.1,<0.10.0", optional = true, markers = "extra == \"duckdb\" or extra == \"motherduck\""} fsspec = ">=2022.4.0" gcsfs = {version = ">=2022.4.0", optional = true, markers = "extra == \"gcp\" or extra == \"bigquery\" or extra == \"gs\""} @@ -528,7 +513,7 @@ pathvalidate = ">=2.5.2" pendulum = ">=2.1.2" psycopg2-binary = {version = ">=2.9.1", optional = true, markers = "extra == \"postgres\" or extra == \"redshift\""} psycopg2cffi = {version = ">=2.9.0", optional = true, markers = "platform_python_implementation == \"PyPy\" and (extra == \"postgres\" or extra == \"redshift\")"} -pyarrow = {version = ">=8.0.0", optional = true, markers = "extra == \"bigquery\" or extra == \"parquet\" or extra == \"motherduck\" or extra == \"athena\""} +pyarrow = {version = ">=12.0.0", optional = true, markers = "extra == \"bigquery\" or extra == \"parquet\" or extra == \"motherduck\" or extra == \"athena\""} pytz = ">=2022.6" PyYAML = ">=5.4.1" requests = ">=2.26.0" @@ -542,25 +527,26 @@ tenacity = ">=8.0.2" tomlkit = ">=0.11.3" typing-extensions = ">=4.0.0" tzdata = ">=2022.1" +win-precise-time = {version = ">=1.4.2", markers = "os_name == \"nt\""} [package.extras] -athena = ["botocore (>=1.28)", "pyarrow (>=8.0.0)", "pyathena (>=2.9.6)", "s3fs (>=2022.4.0)"] +athena = ["botocore (>=1.28)", "pyarrow (>=12.0.0)", "pyathena (>=2.9.6)", "s3fs (>=2022.4.0)"] az = ["adlfs (>=2022.4.0)"] -bigquery = ["gcsfs (>=2022.4.0)", "google-cloud-bigquery (>=2.26.0)", "grpcio (>=1.50.0)", "pyarrow (>=8.0.0)"] +bigquery = ["gcsfs (>=2022.4.0)", "google-cloud-bigquery (>=2.26.0)", "grpcio (>=1.50.0)", "pyarrow (>=12.0.0)"] cli = ["cron-descriptor (>=1.2.32)", "pipdeptree (>=2.9.0,<2.10)"] dbt = ["dbt-athena-community (>=1.2.0)", "dbt-bigquery (>=1.2.0)", "dbt-core (>=1.2.0)", "dbt-duckdb (>=1.2.0)", "dbt-redshift (>=1.2.0)", "dbt-snowflake (>=1.2.0)"] duckdb = ["duckdb (>=0.6.1,<0.10.0)"] filesystem = ["botocore (>=1.28)", "s3fs (>=2022.4.0)"] gcp = ["gcsfs (>=2022.4.0)", "google-cloud-bigquery (>=2.26.0)", "grpcio (>=1.50.0)"] gs = ["gcsfs (>=2022.4.0)"] -motherduck = ["duckdb (>=0.6.1,<0.10.0)", "pyarrow (>=8.0.0)"] +motherduck = ["duckdb (>=0.6.1,<0.10.0)", "pyarrow (>=12.0.0)"] mssql = ["pyodbc (>=4.0.39,<5.0.0)"] -parquet = ["pyarrow (>=8.0.0)"] +parquet = ["pyarrow (>=12.0.0)"] postgres = ["psycopg2-binary (>=2.9.1)", "psycopg2cffi (>=2.9.0)"] -pydantic = ["pydantic (>=1.10,<2.0)"] +qdrant = ["qdrant-client[fastembed] (>=1.6.4,<2.0.0)"] redshift = ["psycopg2-binary (>=2.9.1)", "psycopg2cffi (>=2.9.0)"] s3 = ["botocore (>=1.28)", "s3fs (>=2022.4.0)"] -snowflake = ["snowflake-connector-python[pandas] (>=3.1.1)"] +snowflake = ["snowflake-connector-python (>=3.5.0)"] weaviate = ["weaviate-client (>=3.22)"] [[package]] @@ -2876,6 +2862,14 @@ python-versions = ">=3.7" [package.extras] test = ["pytest (>=6.0.0)", "setuptools (>=65)"] +[[package]] +name = "win-precise-time" +version = "1.4.2" +description = "" +category = "main" +optional = false +python-versions = ">=3.7" + [[package]] name = "wrapt" version = "1.15.0" @@ -2969,8 +2963,8 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "1.1" -python-versions = "^3.8.1" -content-hash = "a7984ce8ed237389d66b79bdaf8da3a37ab4a60499e619bc62d81d0a0bc5995f" +python-versions = ">=3.8.1,<3.13" +content-hash = "c170d74d55253a6582e9ae86189e97afe5be2f4404b4836e744e0b1c4f6b7f22" [metadata.files] adlfs = [ @@ -3439,13 +3433,9 @@ decorator = [ {file = "decorator-5.1.1-py3-none-any.whl", hash = "sha256:b8c3f85900b9dc423225913c5aace94729fe1fa9763b38939a95226f02d37186"}, {file = "decorator-5.1.1.tar.gz", hash = "sha256:637996211036b6385ef91435e4fae22989472f9d571faba8927ba8253acbc330"}, ] -deprecated = [ - {file = "Deprecated-1.2.14-py2.py3-none-any.whl", hash = "sha256:6fac8b097794a90302bdbb17b9b815e732d3c4720583ff1b198499d78470466c"}, - {file = "Deprecated-1.2.14.tar.gz", hash = "sha256:e5323eb936458dccc2582dc6f9c322c852a775a27065ff2b0c4970b9d53d01b3"}, -] dlt = [ - {file = "dlt-0.3.23-py3-none-any.whl", hash = "sha256:94f17d6788679bf7e9c5a718ea44e93eeab09f7fd26c249885b8891ef326f1e8"}, - {file = "dlt-0.3.23.tar.gz", hash = "sha256:4c3173598369a8834c6ac1d4d5d2de0ce1fbc3be540890cd6f601ee80002c84e"}, + {file = "dlt-0.4.3a0-py3-none-any.whl", hash = "sha256:f6ece639d353c6239fa73465d948a623fda7b4448151e97e6a794ad551acb341"}, + {file = "dlt-0.4.3a0.tar.gz", hash = "sha256:1b964243170cd44cffa98bb9e3455ad60aa7b3885f1d587bc76e86c95f5befa5"}, ] dnspython = [ {file = "dnspython-2.4.2-py3-none-any.whl", hash = "sha256:57c6fbaaeaaf39c891292012060beb141791735dbb4004798328fc2c467402d8"}, @@ -5750,6 +5740,21 @@ wheel = [ {file = "wheel-0.41.2-py3-none-any.whl", hash = "sha256:75909db2664838d015e3d9139004ee16711748a52c8f336b52882266540215d8"}, {file = "wheel-0.41.2.tar.gz", hash = "sha256:0c5ac5ff2afb79ac23ab82bab027a0be7b5dbcf2e54dc50efe4bf507de1f7985"}, ] +win-precise-time = [ + {file = "win-precise-time-1.4.2.tar.gz", hash = "sha256:89274785cbc5f2997e01675206da3203835a442c60fd97798415c6b3c179c0b9"}, + {file = "win_precise_time-1.4.2-cp310-cp310-win32.whl", hash = "sha256:7fa13a2247c2ef41cd5e9b930f40716eacc7fc1f079ea72853bd5613fe087a1a"}, + {file = "win_precise_time-1.4.2-cp310-cp310-win_amd64.whl", hash = "sha256:bb8e44b0fc35fde268e8a781cdcd9f47d47abcd8089465d2d1d1063976411c8e"}, + {file = "win_precise_time-1.4.2-cp311-cp311-win32.whl", hash = "sha256:59272655ad6f36910d0b585969402386fa627fca3be24acc9a21be1d550e5db8"}, + {file = "win_precise_time-1.4.2-cp311-cp311-win_amd64.whl", hash = "sha256:0897bb055f19f3b4336e2ba6bee0115ac20fd7ec615a6d736632e2df77f8851a"}, + {file = "win_precise_time-1.4.2-cp312-cp312-win32.whl", hash = "sha256:0210dcea88a520c91de1708ae4c881e3c0ddc956daa08b9eabf2b7c35f3109f5"}, + {file = "win_precise_time-1.4.2-cp312-cp312-win_amd64.whl", hash = "sha256:85670f77cc8accd8f1e6d05073999f77561c23012a9ee988cbd44bb7ce655062"}, + {file = "win_precise_time-1.4.2-cp37-cp37m-win32.whl", hash = "sha256:3e23693201a0fc6ca39f016871e2581e20c91123734bd48a69259f8c8724eedb"}, + {file = "win_precise_time-1.4.2-cp37-cp37m-win_amd64.whl", hash = "sha256:07ef644d1bb7705039bc54abfe4b45e99e8dc326dfd1dad5831dab19670508cb"}, + {file = "win_precise_time-1.4.2-cp38-cp38-win32.whl", hash = "sha256:0a953b00772f205602fa712ef68387b8fb213a30b267ae310aa56bf17605e11b"}, + {file = "win_precise_time-1.4.2-cp38-cp38-win_amd64.whl", hash = "sha256:b5d83420925beca302b386b19c3e7414ada84b47b42f0680207f1508917a1731"}, + {file = "win_precise_time-1.4.2-cp39-cp39-win32.whl", hash = "sha256:50d11a6ff92e1be96a8d4bee99ff6dc07a0ea0e2a392b0956bb2192e334f41ba"}, + {file = "win_precise_time-1.4.2-cp39-cp39-win_amd64.whl", hash = "sha256:3f510fa92d9c39ea533c983e1d62c7bc66fdf0a3e3c3bdda48d4ebb634ff7034"}, +] wrapt = [ {file = "wrapt-1.15.0-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:ca1cccf838cd28d5a0883b342474c630ac48cac5df0ee6eacc9c7290f76b11c1"}, {file = "wrapt-1.15.0-cp27-cp27m-manylinux1_i686.whl", hash = "sha256:e826aadda3cae59295b95343db8f3d965fb31059da7de01ee8d1c40a60398b29"}, diff --git a/pyproject.toml b/pyproject.toml index 30480db9c..ad57d497d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,8 +11,8 @@ readme = "README.md" packages = [{include = "sources"}] [tool.poetry.dependencies] -python = "^3.8.1" -dlt = {version = "^0.3.23", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]} +python = ">=3.8.1,<3.13" +dlt = {version = "0.4.3a0", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]} [tool.poetry.group.dev.dependencies] mypy = "1.6.1" From a9425183c5c878c92b3dc75de6c716de84e3e3f0 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 30 Jan 2024 22:07:28 -0500 Subject: [PATCH 22/22] fixes taxi dataset, duckdb reader signature and tests --- sources/filesystem/readers.py | 11 ++++------- sources/filesystem_pipeline.py | 13 +++++++------ tests/filesystem/samples/gzip/taxi.csv.gz | Bin 899 -> 897 bytes tests/filesystem/test_filesystem.py | 13 +++++++------ 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/sources/filesystem/readers.py b/sources/filesystem/readers.py index 1f4e2f640..bd5f3575c 100644 --- a/sources/filesystem/readers.py +++ b/sources/filesystem/readers.py @@ -80,7 +80,7 @@ def _read_csv_duckdb( items: Iterator[FileItemDict], chunk_size: Optional[int] = 5000, use_pyarrow: bool = False, - read_csv_kwargs: Optional[Dict[str, Any]] = None, + **duckdb_kwargs: Any ) -> Iterator[TDataItems]: """A resource to extract data from the given CSV files. @@ -93,7 +93,7 @@ def _read_csv_duckdb( use_pyarrow (bool): Whether to use `pyarrow` to read the data and designate data schema. If set to False (by default), JSON is used. - read_csv_kwargs (Optional[Dict]): + duckdb_kwargs (Dict): Additional keyword arguments to pass to the `read_csv()`. Returns: @@ -101,14 +101,11 @@ def _read_csv_duckdb( """ import duckdb - connection = duckdb.connect() - - read_csv_kwargs = read_csv_kwargs or {} helper = fetch_arrow if use_pyarrow else fetch_json for item in items: with item.open() as f: - file_data = connection.read_csv(f, **read_csv_kwargs) # type: ignore + file_data = duckdb.from_csv_auto(f, **duckdb_kwargs) # type: ignore yield from helper(file_data, chunk_size) @@ -131,7 +128,7 @@ def read_parquet(self) -> DltResource: ... @copy_sig(_read_csv_duckdb) - def read_location(self) -> DltResource: + def read_csv_duckdb(self) -> DltResource: ... else: diff --git a/sources/filesystem_pipeline.py b/sources/filesystem_pipeline.py index 0a1a429fa..69094ea65 100644 --- a/sources/filesystem_pipeline.py +++ b/sources/filesystem_pipeline.py @@ -55,13 +55,13 @@ def read_csv_with_duckdb() -> None: pipeline = dlt.pipeline( pipeline_name="standard_filesystem", destination="duckdb", - dataset_name="met_data", + dataset_name="met_data_duckdb", ) # load all the CSV data, excluding headers met_files = readers( bucket_url=TESTS_BUCKET_URL, file_glob="met_csv/A801/*.csv" - ).read_csv_duckdb(chunk_size=1000, read_csv_kwargs={"header": True}) + ).read_csv_duckdb(chunk_size=1000, header=True) load_info = pipeline.run(met_files) @@ -72,18 +72,19 @@ def read_csv_with_duckdb() -> None: def read_csv_duckdb_compressed() -> None: pipeline = dlt.pipeline( pipeline_name="standard_filesystem", - destination="postgres", - dataset_name="met_data", + destination="duckdb", + dataset_name="taxi_data", full_refresh=True, ) met_files = readers( bucket_url=TESTS_BUCKET_URL, - file_glob="gzip/*.gz", + file_glob="gzip/*", ).read_csv_duckdb() - load_info = pipeline.run(met_files) + load_info = pipeline.run(met_files) print(load_info) + print(pipeline.last_trace.last_normalize_info) def read_parquet_and_jsonl_chunked() -> None: diff --git a/tests/filesystem/samples/gzip/taxi.csv.gz b/tests/filesystem/samples/gzip/taxi.csv.gz index 184c0740d47bfefe12c14ed8728d63fa59a4620a..c74f9c86ea61722fb345def3b1a5cab6d141840f 100644 GIT binary patch literal 897 zcmV-{1AhD;iwFotce!N%19V|{X)a@Pb^w)CU2EJ%6n)RHSo}N^bH8WyEm=}raG=yp zOCCimVv$91WjP`LzIQZQH`FR5*xt2fuk3TqJ?H2OFvg^u39iBwMvx|26X6i!(Z7LS zoB|)&XB(q7=g3MZnn*GSj;*l*s8WJ@v`$%uWD&spP1dQzY9cuSvrv#gt#AN;4bguf z!~oK!aJsz??}oc&`Z_k-`{8zaTJFR9hlhSX{MA1$huh(jgUh>RI>#tRK)Q@_^)nZfY~Xjy`;9HM_&OX2kU+wgjr zAEtS^ysuz^{Vs`nVDaTCz%g!Ma}vuO)hJxpk`PqIjFNhz_mks`}f`mHl%#pSw?eSY_TWR`5u^al<|EyhBNjtXCT@bmXFv z!Ln6ci6|z9sP2VrNPQik4!H1HUFDd48I*GongY7m7o9IEe~&U;Kixe}` z`~Ljccg-*7ZW-ojY~GK9e|Y=%Q#WtcN#wTOv5xD$W1W0Nx@@DM1SK&>tXPQ|g@j|I zyb+QDU&;Ma{bkNBQ`ATii@zYCv&btWEQ=_YWe-2jBiv5cQ}f3>yy;I}a~i%39}lN~ zT>81W8J6YTAAViB^S5d~QiQ!sRGaEhz5*AloRs`op+%B#F~E6nE(kCccSK$cUC1wz-?6m4&9vxu6iw9 zRPc^d@^0P;3wric*pfiqr}+SQ$@%SID^t(uzqM?eFKhph>t5_+|!(req++_n_SdF9e;Th*ak(EHmW)jo{=|JYys z8fC3zga5C(AInX6(=C4X?eJSSe?FXsqE%)Qu}wLa#I3O%}gg7xEGH zQY0?=mJqCsZ6!vErb$HV5O{kFDVb(}Zm!XCzxoJieYpC=v_+3t-q`w;lf}yPW*9$C X=VvFreq5$`=wAH;$=jC20|)>BS)jrU literal 899 zcmV-}1AP1+iwFo2P_tzK19V|{X)a@Pb^w)CU2EG&6n*cnFnJz{x!*H>YfYCFQdrj8 zUHT{*(TEx;m6ewK`@JK{Nolmu;tyHlk$uj&=Nw%D#+Z~d!Bx1z2vS9>BJ5(^`xEHJ zD)64m-pZ)XIkFOpCX&p7V{NPes+3?^ns-@-WD&spAoH%oY9cuSvrv#gwQvA`57A!` zVgRXAI2^CT`|frczW3F7+#QF9=`MV@zi-FgzwLS29lP81)m7mU!I`8@9{O?>_hKs# zbFhgWuc3+-g#J5fmQXU9(ZVA|WXzB=UU(>*`n4v=3{D_G%@TAb5&dB`g~MNO!|QIm zAI9l&oY4aNRT8(*;>%TleY~K}Ni1_zql5`&JsqU$X=D`H94yx{azd~Io>tMxIGJV4 zTHw3PwR24u&Fid1cBoBf-QPB@?LXb=)O>EkEcALZqet?Y8*ZWJ4NkIWozWLy=+H$a zgJoNQC8C%ZqPiVc^y&ivb;O0xY%B-uW@=1b>q-iANtO}Jbry>#??EC+_xLvaosn(llMqZ(C&iSrrG;DB zq|sa;F6&WC!fU_yDJZgz@Nnw-Y3R2@zYI#r4Uo5hZ>M`0|Hh<#)13eb6gSf5q&KD|d%(-I1RT7(4_x@A zdQriPPRSd3r7Y<2Q(;R2%Sg>PfH$3A4mMNus6Lmot=?Q(cy5Z)!quS}`}T9!?yfzB z;o~rV`Q{(igt;Bu4O<*=BX{}7#z^P|n@YK@gLCgvBqx?jxvhPNZbNTzi?ozt^gm>O z_j8oFoDKfJ?|v;j;Y~C7+T-reX8f`{bkk#XGkkXu-Z%Z7f7SFeSwgQg-HaE%92oK* z^{Gx=_BAHB^tP2ODVin`StbVF;X+EL+3%lozTC6Eg><>e`c1V)&skpS`kAxEh3ZY$ Ze;Q6tW_*2~hOuj2{RgnA?CcB(006}u&7}YU diff --git a/tests/filesystem/test_filesystem.py b/tests/filesystem/test_filesystem.py index 8cfcea42f..550f4c29d 100644 --- a/tests/filesystem/test_filesystem.py +++ b/tests/filesystem/test_filesystem.py @@ -144,12 +144,11 @@ def test_standard_readers(bucket_url: str) -> None: # extract pipes with standard readers jsonl_reader = readers(bucket_url, file_glob="**/*.jsonl").read_jsonl() parquet_reader = readers(bucket_url, file_glob="**/*.parquet").read_parquet() - csv_reader = readers(bucket_url, file_glob="**/*.csv").read_csv( + # also read zipped csvs + csv_reader = readers(bucket_url, file_glob="**/*.csv*").read_csv( float_precision="high" ) - csv_duckdb_reader = readers(bucket_url, file_glob="**/*.csv").read_csv_duckdb( - read_csv_kwargs={"header": True} - ) + csv_duckdb_reader = readers(bucket_url, file_glob="**/*.csv*").read_csv_duckdb() # a step that copies files into test storage def _copy(item: FileItemDict): @@ -180,6 +179,8 @@ def _copy(item: FileItemDict): csv_duckdb_reader.with_name("csv_duckdb_example"), ] ) + # pandas incorrectly guesses that taxi dataset has headers so it skips one row + # so we have 1 less row in csv_example than in csv_duckdb_example assert_load_info(load_info) assert load_table_counts( pipeline, @@ -192,8 +193,8 @@ def _copy(item: FileItemDict): "jsonl_example": 1034, "parquet_example": 1034, "listing": 11, - "csv_example": 1270, - "csv_duckdb_example": 1270, + "csv_example": 1279, + "csv_duckdb_example": 1280, } # print(pipeline.last_trace.last_normalize_info) # print(pipeline.default_schema.to_pretty_yaml())