Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filesystem): implement a csv reader with duckdb engine #319

Merged
merged 23 commits into from
Jan 31, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix
IlyaFaer committed Jan 26, 2024
commit ee3dfbd70dae914384f41743c6e5603404f39825
15 changes: 2 additions & 13 deletions sources/filesystem/readers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional

import dlt
from dlt.common import json
from dlt.common.typing import copy_sig
from dlt.sources import TDataItems, DltResource, DltSource
@@ -101,28 +100,18 @@ def _read_csv_duckdb(
Iterable[TDataItem]: Data items, read from the given CSV files.
"""
import duckdb
import pendulum

read_csv_kwargs = read_csv_kwargs or {}

state = dlt.current.resource_state()
start_from = state.setdefault("last_modified", pendulum.datetime(1970, 1, 1))
connection = duckdb.connect()

read_csv_kwargs = read_csv_kwargs or {}
helper = fetch_arrow if use_pyarrow else fetch_json

connection = duckdb.connect()

for item in items:
if item["modification_date"] <= start_from:
continue

with item.open() as f:
file_data = connection.read_csv(f, **read_csv_kwargs) # type: ignore

yield from helper(file_data, chunk_size)

state["last_modified"] = max(item["modification_date"], state["last_modified"])


if TYPE_CHECKING: