Skip to content

Commit

Permalink
Don't modify schema when refresh='drop_data'
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed May 22, 2024
1 parent 999982e commit 61cbaf9
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
3 changes: 2 additions & 1 deletion dlt/pipeline/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ def refresh_source(
if drop_result.dropped_tables:
key = "dropped_tables" if refresh != "drop_data" else "truncated_tables"
load_package_state[key] = drop_result.dropped_tables
source.schema = drop_result.schema
if refresh != "drop_data": # drop_data is only data wipe, keep original schema
source.schema = drop_result.schema
if "sources" in drop_result.state:
pipeline_state["sources"] = drop_result.state["sources"]
return load_package_state
7 changes: 7 additions & 0 deletions tests/pipeline/test_refresh_modes.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ def test_refresh_drop_data_only():
info = pipeline.run(refresh_source(first_run=True), write_disposition="append")
assert_load_info(info)

first_schema_hash = pipeline.default_schema.version_hash

# Second run of pipeline with only selected resources
# Mock wrap sql client to capture all queries executed
with mock.patch.object(
Expand All @@ -258,6 +260,11 @@ def test_refresh_drop_data_only():
write_disposition="append",
)

assert_load_info(info)

# Schema should not be mutated
assert pipeline.default_schema.version_hash == first_schema_hash

all_queries = [k[0][1] for k in mock_execute_query.call_args_list]
assert all_queries
for q in all_queries:
Expand Down

0 comments on commit 61cbaf9

Please sign in to comment.