Skip to content

Commit

Permalink
Create database schema before running the pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed May 16, 2024
1 parent dc5fd8d commit c278878
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
2 changes: 1 addition & 1 deletion dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def apply_schema_contract(
existing_table: TTableSchema = self._schema_tables.get(table_name, None)

# table is new when not yet exist or
is_dlt_table = table_name.startswith("_dlt")
is_dlt_table = table_name.startswith(self._dlt_tables_prefix)
should_raise = raise_on_freeze and not is_dlt_table
is_new_table = not existing_table or self.is_new_table(table_name)
# check case where we have a new table
Expand Down
19 changes: 8 additions & 11 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2280,22 +2280,19 @@ def test_pipeline_with_frozen_schema_contract() -> None:
destination="duckdb",
)

# Create a database schema with table
with pipeline.sql_client() as c:
dataset = c.fully_qualified_dataset_name()
table = f"{c.fully_qualified_dataset_name()}.test_items"
conn = c.open_connection()
conn.sql(f"CREATE SCHEMA {dataset}")
conn.sql(f"CREATE TABLE {table} (id INTEGER PRIMARY KEY, name VARCHAR)")

data = [
{"id": 101, "name": "sub item 101"},
{"id": 101, "name": "sub item 102"},
]

pipeline.run(
data,
table_name="test_items",
)

with pipeline.sql_client() as c:
c.execute_sql("DROP TABLE _dlt_loads")
c.execute_sql("DROP TABLE _dlt_version")
c.execute_sql("DROP TABLE _dlt_pipeline_state")
c.execute_sql("TRUNCATE TABLE test_items")

pipeline.run(
data,
table_name="test_items",
Expand Down

0 comments on commit c278878

Please sign in to comment.