Skip to content

Commit

Permalink
some work on filesystem destination
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Jun 24, 2024
1 parent 5a8ea54 commit 36e94af
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 19 deletions.
2 changes: 1 addition & 1 deletion dlt/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def _client(self) -> Generator[SupportsDataAccess, None, None]:
yield client
return

raise Exception("Destination does not support data access")
raise Exception("Destination does not support data access.")

def df(self, *, sql: str = None, table: str = None, batch_size: int = 1000) -> DataFrame:
"""Get first batch of table as dataframe"""
Expand Down
29 changes: 27 additions & 2 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,33 @@ def get_table_jobs(

def iter_df(
self, *, sql: str = None, table: str = None, batch_size: int = 1000
) -> Generator[DataFrame, None, None]: ...
) -> Generator[DataFrame, None, None]:
"""Provide dataframes via duckdb"""
import duckdb

duckdb.register_filesystem(self.fs_client)

# create in memory table, for now we read all available files
db = duckdb.connect(":memory:")
files = self.list_table_files(table)
protocol = "" if self.is_local_filesystem else f"{self.config.protocol}://"
files_string = ",".join([f"'{protocol}{f}'" for f in files])
db.sql(f"CREATE TABLE {table} AS SELECT * FROM read_json([{files_string}]);")

# yield in batches
offset = 0
while True:
df = db.sql(f"SELECT * FROM {table} OFFSET {offset} LIMIT {batch_size}").df()
if len(df.index) == 0:
break
yield df
offset += batch_size

def iter_arrow(
self, *, sql: str = None, table: str = None, batch_size: int = 1000
) -> Generator[ArrowTable, None, None]: ...
) -> Generator[ArrowTable, None, None]:
"""Default implementation converts df to arrow"""

# TODO: duckdb supports iterating in batches natively..
for df in self.iter_df(sql=sql, table=table, batch_size=batch_size):
yield ArrowTable.from_pandas(df)
79 changes: 63 additions & 16 deletions tests/load/test_read_interfaces.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
import dlt
import os

from typing import List
from functools import reduce
Expand All @@ -8,35 +9,81 @@
from pandas import DataFrame


@dlt.source()
def source():
@dlt.resource()
def items():
yield from [{"id": i, "children": [{"id": i + 100}, {"id": i + 1000}]} for i in range(300)]

@dlt.resource()
def items2():
yield from [{"id": i, "children": [{"id": i + 100}, {"id": i + 1000}]} for i in range(150)]

return [items, items2]


@pytest.mark.essential
@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True),
ids=lambda x: x.name,
)
def test_read_interfaces(destination_config: DestinationTestConfiguration) -> None:
# we load a table with child table and check wether ibis works
def test_read_interfaces_sql(destination_config: DestinationTestConfiguration) -> None:
pipeline = destination_config.setup_pipeline(
"read_pipeline", dataset_name="read_test", dev_mode=True
)

@dlt.source()
def source():
@dlt.resource()
def items():
yield from [
{"id": i, "children": [{"id": i + 100}, {"id": i + 1000}]} for i in range(300)
]
# run source
s = source()
pipeline.run(
s,
)

# get one df
df = pipeline.dataset.df(table="items", batch_size=5)
assert len(df.index) == 5
assert set(df.columns.values) == {"id", "_dlt_load_id", "_dlt_id"}

# iterate all dataframes
frames = []
for df in pipeline.dataset.iter_df(table="items", batch_size=70):
frames.append(df)

# check frame amount and items counts
assert len(frames) == 5
assert [len(df.index) for df in frames] == [70, 70, 70, 70, 20]

# check all items are present
ids = reduce(lambda a, b: a + b, [f["id"].to_list() for f in frames])
assert set(ids) == set(range(300))

# basic check of arrow table
table = pipeline.dataset.arrow(table="items", batch_size=5)
assert set(table.column_names) == {"id", "_dlt_load_id", "_dlt_id"}
table.num_rows == 5

# access via resource
len(s.items.dataset.df().index) == 300
len(s.items2.dataset.df().index) == 150


@dlt.resource()
def items2():
yield from [
{"id": i, "children": [{"id": i + 100}, {"id": i + 1000}]} for i in range(150)
]
@pytest.mark.essential
@pytest.mark.parametrize(
"destination_config",
destinations_configs(
local_filesystem_configs=True, all_buckets_filesystem_configs=True
), # TODO: test all buckets
ids=lambda x: x.name,
)
def test_read_interfaces_filesystem(destination_config: DestinationTestConfiguration) -> None:
# we force multiple files per table, they may only hold 50 items
os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "50"

return [items, items2]
pipeline = destination_config.setup_pipeline(
"read_pipeline", dataset_name="read_test", dev_mode=True
)

# create 300 entries in "items" table
# run source
s = source()
pipeline.run(
s,
Expand Down

0 comments on commit 36e94af

Please sign in to comment.