Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make duckdb handle Iceberg table with nested types #2141

Merged
merged 6 commits into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions dlt/destinations/impl/filesystem/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,17 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None:
# unknown views will not be created
continue

# only create view if it does not exist in the current schema yet
existing_tables = [tname[0] for tname in self._conn.execute("SHOW TABLES").fetchall()]
if view_name in existing_tables:
continue

# NOTE: if this is staging configuration then `prepare_load_table` will remove some info
# from table schema, if we ever extend this to handle staging destination, this needs to change
schema_table = self.fs_client.prepare_load_table(table_name)
table_format = schema_table.get("table_format")

# skip if view already exists and does not need to be replaced each time
existing_tables = [tname[0] for tname in self._conn.execute("SHOW TABLES").fetchall()]
needs_replace = table_format == "iceberg" or self.fs_client.config.protocol == "abfss"
if view_name in existing_tables and not needs_replace:
continue

# discover file type
folder = self.fs_client.get_table_dir(table_name)
files = self.fs_client.list_table_files(table_name)
Expand Down Expand Up @@ -258,15 +261,17 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None:

# create from statement
from_statement = ""
if schema_table.get("table_format") == "delta":
if table_format == "delta":
from_statement = f"delta_scan('{resolved_folder}')"
elif schema_table.get("table_format") == "iceberg":
elif table_format == "iceberg":
from dlt.common.libs.pyiceberg import _get_last_metadata_file

self._setup_iceberg(self._conn)
metadata_path = f"{resolved_folder}/metadata"
last_metadata_file = _get_last_metadata_file(metadata_path, self.fs_client)
from_statement = f"iceberg_scan('{last_metadata_file}')"
# skip schema inference to make nested data types work
# https://github.com/duckdb/duckdb_iceberg/issues/47
from_statement = f"iceberg_scan('{last_metadata_file}', skip_schema_inference=True)"
elif first_file_type == "parquet":
from_statement = f"read_parquet([{resolved_files_string}])"
elif first_file_type == "jsonl":
Expand All @@ -281,7 +286,9 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None:

# create table
view_name = self.make_qualified_table_name(view_name)
create_table_sql_base = f"CREATE VIEW {view_name} AS SELECT * FROM {from_statement}"
create_table_sql_base = (
f"CREATE OR REPLACE VIEW {view_name} AS SELECT * FROM {from_statement}"
)
self._conn.execute(create_table_sql_base)

@contextmanager
Expand Down
42 changes: 34 additions & 8 deletions tests/load/filesystem/test_sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from dlt.destinations import filesystem
from tests.utils import TEST_STORAGE_ROOT
from tests.cases import arrow_table_all_data_types
from dlt.destinations.exceptions import DatabaseUndefinedRelation


Expand Down Expand Up @@ -81,12 +82,17 @@ def double_items():
for i in range(total_records)
]

return [items, double_items]
@dlt.resource(table_format=table_format)
def arrow_all_types():
yield arrow_table_all_data_types("arrow-table", num_rows=total_records)[0]

return [items, double_items, arrow_all_types]

# run source
pipeline.run(source(), loader_file_format=destination_config.file_format)

if alternate_access_pipeline:
orig_dest = pipeline.destination
pipeline.destination = alternate_access_pipeline.destination

import duckdb
Expand All @@ -96,8 +102,11 @@ def double_items():
DuckDbCredentials,
)

# check we can create new tables from the views
with pipeline.sql_client() as c:
# check if all data types are handled properly
c.execute_sql("SELECT * FROM arrow_all_types;")

# check we can create new tables from the views
c.execute_sql(
"CREATE TABLE items_joined AS (SELECT i.id, di.double_id FROM items as i JOIN"
" double_items as di ON (i.id = di.id));"
Expand All @@ -109,16 +118,14 @@ def double_items():
assert list(joined_table[5]) == [5, 10]
assert list(joined_table[10]) == [10, 20]

# inserting values into a view should fail gracefully
with pipeline.sql_client() as c:
# inserting values into a view should fail gracefully
try:
c.execute_sql("INSERT INTO double_items VALUES (1, 2)")
except Exception as exc:
assert "double_items is not an table" in str(exc)

# check that no automated views are created for a schema different than
# the known one
with pipeline.sql_client() as c:
# check that no automated views are created for a schema different than
# the known one
c.execute_sql("CREATE SCHEMA other_schema;")
with pytest.raises(DatabaseUndefinedRelation):
with c.execute_query("SELECT * FROM other_schema.items ORDER BY id ASC;") as cursor:
Expand Down Expand Up @@ -172,6 +179,24 @@ def _fs_sql_client_for_external_db(
# views exist
assert len(external_db.sql("SELECT * FROM second.referenced_items").fetchall()) == total_records
assert len(external_db.sql("SELECT * FROM first.items").fetchall()) == 3

# test if view reflects source table accurately after it has changed
# conretely, this tests if an existing view is replaced with formats that need it, such as
# `iceberg` table format
with fs_sql_client as sql_client:
sql_client.create_views_for_tables({"arrow_all_types": "arrow_all_types"})
assert external_db.sql("FROM second.arrow_all_types;").arrow().num_rows == total_records
if alternate_access_pipeline:
# switch back for the write path
pipeline.destination = orig_dest
pipeline.run( # run pipeline again to add rows to source table
source().with_resources("arrow_all_types"),
loader_file_format=destination_config.file_format,
)
with fs_sql_client as sql_client:
sql_client.create_views_for_tables({"arrow_all_types": "arrow_all_types"})
assert external_db.sql("FROM second.arrow_all_types;").arrow().num_rows == (2 * total_records)

external_db.close()

# in case we are not connecting to a bucket that needs secrets, views should still be here after connection reopen
Expand Down Expand Up @@ -298,6 +323,7 @@ def test_table_formats(
pipeline = destination_config.setup_pipeline(
"read_pipeline",
dataset_name="read_test",
dev_mode=True,
)

# in case of gcs we use the s3 compat layer for reading
Expand All @@ -310,7 +336,7 @@ def test_table_formats(
GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp"
)
access_pipeline = destination_config.setup_pipeline(
"read_pipeline", dataset_name="read_test", destination=gcp_bucket
"read_pipeline", dataset_name="read_test", dev_mode=True, destination=gcp_bucket
)

_run_dataset_checks(
Expand Down
Loading