From cd4dd2378beeae203808fe80057c53147837106b Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 17 Apr 2024 12:16:39 +0200 Subject: [PATCH] ensure pipeline state is only saved to load package if it has changed --- dlt/extract/extract.py | 7 ++++--- dlt/pipeline/pipeline.py | 16 ++++++++++------ dlt/pipeline/state_sync.py | 16 ++++++++++------ tests/load/pipeline/test_restore_state.py | 2 +- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index 159a5e7c23..d4298f2f6b 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -406,9 +406,10 @@ def commit_packages(self, pipline_state_doc: TPipelineStateDoc = None) -> None: """Commits all extracted packages to normalize storage, and adds the pipeline state to the load package""" # commit load packages for load_id, metrics in self._load_id_metrics.items(): - package_state = self.extract_storage.new_packages.get_load_package_state(load_id) - package_state["pipeline_state"] = pipline_state_doc - self.extract_storage.new_packages.save_load_package_state(load_id, package_state) + if pipline_state_doc: + package_state = self.extract_storage.new_packages.get_load_package_state(load_id) + package_state["pipeline_state"] = {**pipline_state_doc, "dlt_load_id": load_id} + self.extract_storage.new_packages.save_load_package_state(load_id, package_state) self.extract_storage.commit_new_load_package( load_id, self.schema_storage[metrics[0]["schema_name"]] ) diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 2b6d5c7a85..1ba7896e1c 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -63,6 +63,7 @@ LoadJobInfo, LoadPackageInfo, ) +from dlt.common.storages.load_package import TPipelineStateDoc from dlt.common.destination import ( DestinationCapabilitiesContext, merge_caps_file_formats, @@ -428,13 +429,14 @@ def extract( raise SourceExhausted(source.name) self._extract_source(extract_step, source, max_parallel_items, workers) # extract state - state = None + state: TPipelineStateDoc = None if self.config.restore_from_destination: # this will update state version hash so it will not be extracted again by with_state_sync - state = self._container[StateInjectableContext].state - self._bump_version_and_extract_state(state, True, extract_step) + state = self._bump_version_and_extract_state( + self._container[StateInjectableContext].state, True, extract_step + ) # commit load packages with state - extract_step.commit_packages(state_doc(state) if state else None) + extract_step.commit_packages(state) return self._get_step_info(extract_step) except Exception as exc: # emit step info @@ -1513,7 +1515,7 @@ def _props_to_state(self, state: TPipelineState) -> TPipelineState: def _bump_version_and_extract_state( self, state: TPipelineState, extract_state: bool, extract: Extract = None - ) -> None: + ) -> TPipelineStateDoc: """Merges existing state into `state` and extracts state using `storage` if extract_state is True. Storage will be created on demand. In that case the extracted package will be immediately committed. @@ -1521,7 +1523,7 @@ def _bump_version_and_extract_state( _, hash_, _ = bump_pipeline_state_version_if_modified(self._props_to_state(state)) should_extract = hash_ != state["_local"].get("_last_extracted_hash") if should_extract and extract_state: - data = state_resource(state) + data, doc = state_resource(state) extract_ = extract or Extract( self._schema_storage, self._normalize_storage_config(), original_data=data ) @@ -1533,6 +1535,8 @@ def _bump_version_and_extract_state( # commit only if we created storage if not extract: extract_.commit_packages() + return doc + return None def _list_schemas_sorted(self) -> List[str]: """Lists schema names sorted to have deterministic state""" diff --git a/dlt/pipeline/state_sync.py b/dlt/pipeline/state_sync.py index e5d937d05a..41009f2909 100644 --- a/dlt/pipeline/state_sync.py +++ b/dlt/pipeline/state_sync.py @@ -111,12 +111,16 @@ def state_doc(state: TPipelineState, load_id: str = None) -> TPipelineStateDoc: return doc -def state_resource(state: TPipelineState) -> DltResource: - return dlt.resource( - [state_doc(state)], - name=STATE_TABLE_NAME, - write_disposition="append", - columns=STATE_TABLE_COLUMNS, +def state_resource(state: TPipelineState) -> Tuple[DltResource, TPipelineStateDoc]: + doc = state_doc(state) + return ( + dlt.resource( + [doc], + name=STATE_TABLE_NAME, + write_disposition="append", + columns=STATE_TABLE_COLUMNS, + ), + doc, ) diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index ffd921f8a8..6518ca46ae 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -76,7 +76,7 @@ def test_restore_state_utils(destination_config: DestinationTestConfiguration) - initial_state["_local"]["_last_extracted_at"] = pendulum.now() initial_state["_local"]["_last_extracted_hash"] = initial_state["_version_hash"] # add _dlt_id and _dlt_load_id - resource = state_resource(initial_state) + resource, _ = state_resource(initial_state) resource.apply_hints( columns={ "_dlt_id": {"name": "_dlt_id", "data_type": "text", "nullable": False},