Skip to content

Commit

Permalink
ensure pipeline state is only saved to load package if sync is enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Apr 17, 2024
1 parent b2b5913 commit 2857a5c
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 16 deletions.
7 changes: 4 additions & 3 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,10 @@ def commit_packages(self, pipline_state_doc: TPipelineStateDoc = None) -> None:
"""Commits all extracted packages to normalize storage, and adds the pipeline state to the load package"""
# commit load packages
for load_id, metrics in self._load_id_metrics.items():
package_state = self.extract_storage.new_packages.get_load_package_state(load_id)
package_state["pipeline_state"] = pipline_state_doc
self.extract_storage.new_packages.save_load_package_state(load_id, package_state)
if pipline_state_doc:
package_state = self.extract_storage.new_packages.get_load_package_state(load_id)
package_state["pipeline_state"] = {**pipline_state_doc, "dlt_load_id": load_id}
self.extract_storage.new_packages.save_load_package_state(load_id, package_state)
self.extract_storage.commit_new_load_package(
load_id, self.schema_storage[metrics[0]["schema_name"]]
)
Expand Down
16 changes: 10 additions & 6 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
LoadJobInfo,
LoadPackageInfo,
)
from dlt.common.storages.load_package import TPipelineStateDoc
from dlt.common.destination import (
DestinationCapabilitiesContext,
merge_caps_file_formats,
Expand Down Expand Up @@ -428,13 +429,14 @@ def extract(
raise SourceExhausted(source.name)
self._extract_source(extract_step, source, max_parallel_items, workers)
# extract state
state = None
state: TPipelineStateDoc = None
if self.config.restore_from_destination:
# this will update state version hash so it will not be extracted again by with_state_sync
state = self._container[StateInjectableContext].state
self._bump_version_and_extract_state(state, True, extract_step)
state = self._bump_version_and_extract_state(
self._container[StateInjectableContext].state, True, extract_step
)
# commit load packages with state
extract_step.commit_packages(state_doc(state) if state else None)
extract_step.commit_packages(state)
return self._get_step_info(extract_step)
except Exception as exc:
# emit step info
Expand Down Expand Up @@ -1513,15 +1515,15 @@ def _props_to_state(self, state: TPipelineState) -> TPipelineState:

def _bump_version_and_extract_state(
self, state: TPipelineState, extract_state: bool, extract: Extract = None
) -> None:
) -> TPipelineStateDoc:
"""Merges existing state into `state` and extracts state using `storage` if extract_state is True.
Storage will be created on demand. In that case the extracted package will be immediately committed.
"""
_, hash_, _ = bump_pipeline_state_version_if_modified(self._props_to_state(state))
should_extract = hash_ != state["_local"].get("_last_extracted_hash")
if should_extract and extract_state:
data = state_resource(state)
data, doc = state_resource(state)
extract_ = extract or Extract(
self._schema_storage, self._normalize_storage_config(), original_data=data
)
Expand All @@ -1533,6 +1535,8 @@ def _bump_version_and_extract_state(
# commit only if we created storage
if not extract:
extract_.commit_packages()
return doc
return None

def _list_schemas_sorted(self) -> List[str]:
"""Lists schema names sorted to have deterministic state"""
Expand Down
16 changes: 10 additions & 6 deletions dlt/pipeline/state_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,16 @@ def state_doc(state: TPipelineState, load_id: str = None) -> TPipelineStateDoc:
return doc


def state_resource(state: TPipelineState) -> DltResource:
return dlt.resource(
[state_doc(state)],
name=STATE_TABLE_NAME,
write_disposition="append",
columns=STATE_TABLE_COLUMNS,
def state_resource(state: TPipelineState) -> Tuple[DltResource, TPipelineStateDoc]:
doc = state_doc(state)
return (
dlt.resource(
[doc],
name=STATE_TABLE_NAME,
write_disposition="append",
columns=STATE_TABLE_COLUMNS,
),
doc,
)


Expand Down
2 changes: 1 addition & 1 deletion tests/load/pipeline/test_restore_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_restore_state_utils(destination_config: DestinationTestConfiguration) -
initial_state["_local"]["_last_extracted_at"] = pendulum.now()
initial_state["_local"]["_last_extracted_hash"] = initial_state["_version_hash"]
# add _dlt_id and _dlt_load_id
resource = state_resource(initial_state)
resource, _ = state_resource(initial_state)
resource.apply_hints(
columns={
"_dlt_id": {"name": "_dlt_id", "data_type": "text", "nullable": False},
Expand Down

0 comments on commit 2857a5c

Please sign in to comment.