Skip to content

Commit

Permalink
replace duckdb views for iceberg tables
Browse files Browse the repository at this point in the history
  • Loading branch information
jorritsandbrink committed Dec 14, 2024
1 parent 865e0e9 commit 9a3631e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
21 changes: 13 additions & 8 deletions dlt/destinations/impl/filesystem/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,17 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None:
# unknown views will not be created
continue

# only create view if it does not exist in the current schema yet
existing_tables = [tname[0] for tname in self._conn.execute("SHOW TABLES").fetchall()]
if view_name in existing_tables:
continue

# NOTE: if this is staging configuration then `prepare_load_table` will remove some info
# from table schema, if we ever extend this to handle staging destination, this needs to change
schema_table = self.fs_client.prepare_load_table(table_name)
table_format = schema_table.get("table_format")

# skip if view already exists and does not need to be replaced each time
existing_tables = [tname[0] for tname in self._conn.execute("SHOW TABLES").fetchall()]
needs_replace = table_format == "iceberg"
if view_name in existing_tables and not needs_replace:
continue

# discover file type
folder = self.fs_client.get_table_dir(table_name)
files = self.fs_client.list_table_files(table_name)
Expand Down Expand Up @@ -258,9 +261,9 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None:

# create from statement
from_statement = ""
if schema_table.get("table_format") == "delta":
if table_format == "delta":
from_statement = f"delta_scan('{resolved_folder}')"
elif schema_table.get("table_format") == "iceberg":
elif table_format == "iceberg":
from dlt.common.libs.pyiceberg import _get_last_metadata_file

self._setup_iceberg(self._conn)
Expand All @@ -283,7 +286,9 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None:

# create table
view_name = self.make_qualified_table_name(view_name)
create_table_sql_base = f"CREATE VIEW {view_name} AS SELECT * FROM {from_statement}"
create_table_sql_base = (
f"CREATE OR REPLACE VIEW {view_name} AS SELECT * FROM {from_statement}"
)
self._conn.execute(create_table_sql_base)

@contextmanager
Expand Down
17 changes: 16 additions & 1 deletion tests/load/filesystem/test_sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def double_items():

@dlt.resource(table_format=table_format)
def arrow_all_types():
yield arrow_table_all_data_types("arrow-table")[0]
yield arrow_table_all_data_types("arrow-table", num_rows=total_records)[0]

return [items, double_items, arrow_all_types]

Expand Down Expand Up @@ -180,6 +180,21 @@ def _fs_sql_client_for_external_db(
# views exist
assert len(external_db.sql("SELECT * FROM second.referenced_items").fetchall()) == total_records
assert len(external_db.sql("SELECT * FROM first.items").fetchall()) == 3

# test if view reflects source table accurately after it has changed
# conretely, this tests if an existing view is replaced with formats that need it, such as
# `iceberg` table format
with fs_sql_client as sql_client:
sql_client.create_views_for_tables({"arrow_all_types": "arrow_all_types"})
assert external_db.sql("FROM second.arrow_all_types;").arrow().num_rows == total_records
pipeline.run( # run pipeline again to add rows to source table
source().with_resources("arrow_all_types"),
loader_file_format=destination_config.file_format,
)
with fs_sql_client as sql_client:
sql_client.create_views_for_tables({"arrow_all_types": "arrow_all_types"})
assert external_db.sql("FROM second.arrow_all_types;").arrow().num_rows == (2 * total_records)

external_db.close()

# in case we are not connecting to a bucket that needs secrets, views should still be here after connection reopen
Expand Down

0 comments on commit 9a3631e

Please sign in to comment.