diff --git a/dlt/pipeline/helpers.py b/dlt/pipeline/helpers.py index 811fe31733..2abc04363a 100644 --- a/dlt/pipeline/helpers.py +++ b/dlt/pipeline/helpers.py @@ -204,7 +204,8 @@ def refresh_source( if drop_result.dropped_tables: key = "dropped_tables" if refresh != "drop_data" else "truncated_tables" load_package_state[key] = drop_result.dropped_tables - source.schema = drop_result.schema + if refresh != "drop_data": # drop_data is only data wipe, keep original schema + source.schema = drop_result.schema if "sources" in drop_result.state: pipeline_state["sources"] = drop_result.state["sources"] return load_package_state diff --git a/tests/pipeline/test_refresh_modes.py b/tests/pipeline/test_refresh_modes.py index a967a36877..c47664aeae 100644 --- a/tests/pipeline/test_refresh_modes.py +++ b/tests/pipeline/test_refresh_modes.py @@ -248,6 +248,8 @@ def test_refresh_drop_data_only(): info = pipeline.run(refresh_source(first_run=True), write_disposition="append") assert_load_info(info) + first_schema_hash = pipeline.default_schema.version_hash + # Second run of pipeline with only selected resources # Mock wrap sql client to capture all queries executed with mock.patch.object( @@ -258,6 +260,11 @@ def test_refresh_drop_data_only(): write_disposition="append", ) + assert_load_info(info) + + # Schema should not be mutated + assert pipeline.default_schema.version_hash == first_schema_hash + all_queries = [k[0][1] for k in mock_execute_query.call_args_list] assert all_queries for q in all_queries: