diff --git a/dlt/destinations/impl/filesystem/sql_client.py b/dlt/destinations/impl/filesystem/sql_client.py index dd1fc6a330..5a0582defd 100644 --- a/dlt/destinations/impl/filesystem/sql_client.py +++ b/dlt/destinations/impl/filesystem/sql_client.py @@ -214,14 +214,17 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None: # unknown views will not be created continue - # only create view if it does not exist in the current schema yet - existing_tables = [tname[0] for tname in self._conn.execute("SHOW TABLES").fetchall()] - if view_name in existing_tables: - continue - # NOTE: if this is staging configuration then `prepare_load_table` will remove some info # from table schema, if we ever extend this to handle staging destination, this needs to change schema_table = self.fs_client.prepare_load_table(table_name) + table_format = schema_table.get("table_format") + + # skip if view already exists and does not need to be replaced each time + existing_tables = [tname[0] for tname in self._conn.execute("SHOW TABLES").fetchall()] + needs_replace = table_format == "iceberg" + if view_name in existing_tables and not needs_replace: + continue + # discover file type folder = self.fs_client.get_table_dir(table_name) files = self.fs_client.list_table_files(table_name) @@ -258,9 +261,9 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None: # create from statement from_statement = "" - if schema_table.get("table_format") == "delta": + if table_format == "delta": from_statement = f"delta_scan('{resolved_folder}')" - elif schema_table.get("table_format") == "iceberg": + elif table_format == "iceberg": from dlt.common.libs.pyiceberg import _get_last_metadata_file self._setup_iceberg(self._conn) @@ -283,7 +286,9 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None: # create table view_name = self.make_qualified_table_name(view_name) - create_table_sql_base = f"CREATE VIEW {view_name} AS SELECT * FROM {from_statement}" + create_table_sql_base = ( + f"CREATE OR REPLACE VIEW {view_name} AS SELECT * FROM {from_statement}" + ) self._conn.execute(create_table_sql_base) @contextmanager diff --git a/tests/load/filesystem/test_sql_client.py b/tests/load/filesystem/test_sql_client.py index 1d5eaa9984..7025ef0e18 100644 --- a/tests/load/filesystem/test_sql_client.py +++ b/tests/load/filesystem/test_sql_client.py @@ -84,7 +84,7 @@ def double_items(): @dlt.resource(table_format=table_format) def arrow_all_types(): - yield arrow_table_all_data_types("arrow-table")[0] + yield arrow_table_all_data_types("arrow-table", num_rows=total_records)[0] return [items, double_items, arrow_all_types] @@ -180,6 +180,21 @@ def _fs_sql_client_for_external_db( # views exist assert len(external_db.sql("SELECT * FROM second.referenced_items").fetchall()) == total_records assert len(external_db.sql("SELECT * FROM first.items").fetchall()) == 3 + + # test if view reflects source table accurately after it has changed + # conretely, this tests if an existing view is replaced with formats that need it, such as + # `iceberg` table format + with fs_sql_client as sql_client: + sql_client.create_views_for_tables({"arrow_all_types": "arrow_all_types"}) + assert external_db.sql("FROM second.arrow_all_types;").arrow().num_rows == total_records + pipeline.run( # run pipeline again to add rows to source table + source().with_resources("arrow_all_types"), + loader_file_format=destination_config.file_format, + ) + with fs_sql_client as sql_client: + sql_client.create_views_for_tables({"arrow_all_types": "arrow_all_types"}) + assert external_db.sql("FROM second.arrow_all_types;").arrow().num_rows == (2 * total_records) + external_db.close() # in case we are not connecting to a bucket that needs secrets, views should still be here after connection reopen