From 4810baeac3ddde58693916032274aa40c81f8247 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sat, 23 Mar 2024 22:59:11 +0100 Subject: [PATCH] adds tables only tests to drop command --- dlt/common/schema/schema.py | 2 -- dlt/common/storages/live_schema_storage.py | 4 +-- dlt/pipeline/helpers.py | 18 +++++----- dlt/pipeline/state_sync.py | 15 ++++++++ tests/load/pipeline/test_drop.py | 40 +++++++++++++++++++--- 5 files changed, 61 insertions(+), 18 deletions(-) diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index 1e2bef6b27..c738f1753e 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -814,8 +814,6 @@ def _bump_version(self) -> Tuple[int, str]: Returns: Tuple[int, str]: Current (``stored_version``, ``stored_version_hash``) tuple """ - prev_version = self._stored_version - prev_version_hash = self._stored_version_hash self._stored_version, self._stored_version_hash, _, _ = utils.bump_version_if_modified( self.to_dict(bump_version=False) ) diff --git a/dlt/common/storages/live_schema_storage.py b/dlt/common/storages/live_schema_storage.py index 6ea69c208a..fb94a21b7a 100644 --- a/dlt/common/storages/live_schema_storage.py +++ b/dlt/common/storages/live_schema_storage.py @@ -70,10 +70,10 @@ def set_live_schema(self, schema: Schema) -> Schema: if live_schema: if id(live_schema) != id(schema): # replace content without replacing instance - print(f"live schema {live_schema} updated in place") + # print(f"live schema {live_schema} updated in place") live_schema.replace_schema_content(schema, link_to_replaced_schema=True) else: - print(f"live schema {schema.name} created from schema") + # print(f"live schema {schema.name} created from schema") live_schema = self.live_schemas[schema.name] = schema return live_schema diff --git a/dlt/pipeline/helpers.py b/dlt/pipeline/helpers.py index 7bba5f84e7..c242a26eaa 100644 --- a/dlt/pipeline/helpers.py +++ b/dlt/pipeline/helpers.py @@ -12,7 +12,6 @@ from dlt.common.schema.typing import TSimpleRegex from dlt.common.typing import REPattern from dlt.common.pipeline import ( - TSourceState, reset_resource_state, _sources_state, _delete_source_state_keys, @@ -26,6 +25,7 @@ PipelineStepFailed, PipelineHasPendingDataException, ) +from dlt.pipeline.state_sync import force_state_extract from dlt.pipeline.typing import TPipelineStep from dlt.pipeline import Pipeline @@ -122,7 +122,7 @@ def __init__( else: self.tables_to_drop = [] self.drop_tables = False # No tables to drop - self.drop_state = not not self.state_paths_to_drop + self.drop_state = not not self.state_paths_to_drop # obtain truth value self.drop_all = drop_all self.info: _DropInfo = dict( @@ -167,10 +167,11 @@ def _drop_destination_tables(self) -> None: with client.with_staging_dataset(): client.drop_tables(*table_names, replace_schema=True) - def _delete_pipeline_tables(self) -> None: + def _delete_schema_tables(self) -> None: for tbl in self.tables_to_drop: del self.schema_tables[tbl["name"]] - self.schema.bump_version() + # bump schema, we'll save later + self.schema._bump_version() def _list_state_paths(self, source_state: Dict[str, Any]) -> List[str]: return resolve_paths(self.state_paths_to_drop, source_state) @@ -197,7 +198,7 @@ def _create_modified_state(self) -> Dict[str, Any]: self.info["state_paths"].extend(f"{source_name}.{p}" for p in resolved_paths) return state # type: ignore[return-value] - def _drop_state_keys(self) -> None: + def _extract_state(self) -> None: state: Dict[str, Any] with self.pipeline.managed_state(extract_state=True) as state: # type: ignore[assignment] state.clear() @@ -216,12 +217,12 @@ def __call__(self) -> None: return # Nothing to drop if self.drop_tables: - self._delete_pipeline_tables() + self._delete_schema_tables() self._drop_destination_tables() if self.drop_tables: self.pipeline.schemas.save_schema(self.schema) if self.drop_state: - self._drop_state_keys() + self._extract_state() # Send updated state to destination self.pipeline.normalize() try: @@ -230,8 +231,7 @@ def __call__(self) -> None: # Clear extracted state on failure so command can run again self.pipeline.drop_pending_packages() with self.pipeline.managed_state() as state: - state["_local"].pop("_last_extracted_at", None) - state["_local"].pop("_last_extracted_hash", None) + force_state_extract(state) raise diff --git a/dlt/pipeline/state_sync.py b/dlt/pipeline/state_sync.py index 8c72a218a4..5366b9c46d 100644 --- a/dlt/pipeline/state_sync.py +++ b/dlt/pipeline/state_sync.py @@ -68,6 +68,21 @@ def bump_pipeline_state_version_if_modified(state: TPipelineState) -> Tuple[int, return bump_state_version_if_modified(state, exclude_attrs=["_local"]) +def mark_state_extracted(state: TPipelineState, hash_: str) -> None: + """Marks state as extracted by setting last extracted hash to hash_ (which is current version_hash) + + `_last_extracted_hash` is kept locally and never synced with the destination + """ + state["_local"]["_last_extracted_at"] = pendulum.now() + state["_local"]["_last_extracted_hash"] = hash_ + + +def force_state_extract(state: TPipelineState) -> None: + """Forces `state` to be extracted by removing local information on the most recent extraction""" + state["_local"].pop("_last_extracted_at", None) + state["_local"].pop("_last_extracted_hash", None) + + def migrate_pipeline_state( pipeline_name: str, state: DictStrAny, from_engine: int, to_engine: int ) -> TPipelineState: diff --git a/tests/load/pipeline/test_drop.py b/tests/load/pipeline/test_drop.py index 8614af4734..afae1c22ca 100644 --- a/tests/load/pipeline/test_drop.py +++ b/tests/load/pipeline/test_drop.py @@ -56,7 +56,11 @@ def droppable_d( dlt.state()["data_from_d"] = {"foo1": {"bar": 1}, "foo2": {"bar": 2}} yield [dict(o=55), dict(o=22)] - return [droppable_a(), droppable_b(), droppable_c(), droppable_d()] + @dlt.resource(selected=True) + def droppable_no_state(): + yield [1, 2, 3] + + return [droppable_a(), droppable_b(), droppable_c(), droppable_d(), droppable_no_state] RESOURCE_TABLES = dict( @@ -64,8 +68,11 @@ def droppable_d( droppable_b=["droppable_b", "droppable_b__items"], droppable_c=["droppable_c", "droppable_c__items", "droppable_c__items__labels"], droppable_d=["droppable_d"], + droppable_no_state=["droppable_no_state"], ) +NO_STATE_RESOURCES = {"droppable_no_state"} + def assert_dropped_resources(pipeline: Pipeline, resources: List[str]) -> None: assert_dropped_resource_tables(pipeline, resources) @@ -95,7 +102,7 @@ def assert_dropped_resource_tables(pipeline: Pipeline, resources: List[str]) -> def assert_dropped_resource_states(pipeline: Pipeline, resources: List[str]) -> None: # Verify only requested resource keys are removed from state - all_resources = set(RESOURCE_TABLES.keys()) + all_resources = set(RESOURCE_TABLES.keys()) - NO_STATE_RESOURCES expected_keys = all_resources - set(resources) sources_state = pipeline.state["sources"] result_keys = set(sources_state["droppable"]["resources"].keys()) @@ -109,6 +116,8 @@ def assert_destination_state_loaded(pipeline: Pipeline) -> None: destination_state = state_sync.load_pipeline_state_from_destination( pipeline.pipeline_name, client ) + # current pipeline schema available in the destination + client.get_stored_schema_by_hash(pipeline.default_schema.version_hash) pipeline_state = dict(pipeline.state) del pipeline_state["_local"] assert pipeline_state == destination_state @@ -144,8 +153,7 @@ def test_drop_command_resources_and_state(destination_config: DestinationTestCon "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) def test_drop_command_only_state(destination_config: DestinationTestConfiguration) -> None: - """Test the drop command with resource and state path options and - verify correct data is deleted from destination and locally""" + """Test drop command that deletes part of the state and syncs with destination""" source = droppable_source() pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) pipeline.run(source) @@ -164,6 +172,28 @@ def test_drop_command_only_state(destination_config: DestinationTestConfiguratio assert_destination_state_loaded(pipeline) +@pytest.mark.parametrize( + "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name +) +def test_drop_command_only_tables(destination_config: DestinationTestConfiguration) -> None: + """Test drop only tables and makes sure that schema and state are synced""" + source = droppable_source() + pipeline = destination_config.setup_pipeline("drop_test_" + uniq_id(), full_refresh=True) + pipeline.run(source) + sources_state = pipeline.state["sources"] + + attached = _attach(pipeline) + helpers.drop(attached, resources=["droppable_no_state"]) + + attached = _attach(pipeline) + + assert_dropped_resources(attached, ["droppable_no_state"]) + # source state didn't change + assert pipeline.state["sources"] == sources_state + + assert_destination_state_loaded(pipeline) + + @pytest.mark.parametrize( "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name ) @@ -202,7 +232,7 @@ def test_fail_after_drop_tables(destination_config: DestinationTestConfiguration attached = _attach(pipeline) with mock.patch.object( - helpers.DropCommand, "_drop_state_keys", side_effect=RuntimeError("Something went wrong") + helpers.DropCommand, "_extract_state", side_effect=RuntimeError("Something went wrong") ): with pytest.raises(RuntimeError): helpers.drop(attached, resources=("droppable_a", "droppable_b"))