From 9dacf5c021279b57ec0ba4574fb78b6c23a888ed Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Mon, 9 Oct 2023 15:27:39 +0200 Subject: [PATCH] improves tests and docstring for state reset on replace --- dlt/common/pipeline.py | 4 ++-- dlt/extract/extract.py | 6 +++--- dlt/extract/pipe.py | 1 - dlt/extract/source.py | 2 +- dlt/extract/typing.py | 6 ++++++ dlt/extract/utils.py | 14 +++++++++++--- dlt/pipeline/helpers.py | 4 ++-- tests/extract/test_incremental.py | 30 +++++++++++++++++++++++++++--- 8 files changed, 52 insertions(+), 15 deletions(-) diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index ebc33e2513..aeb0bdc68a 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -424,8 +424,8 @@ def resource_state(resource_name: str = None, source_state_: Optional[DictStrAny return state_.setdefault('resources', {}).setdefault(resource_name, {}) # type: ignore -def _reset_resource_state(resource_name: str, source_state_: Optional[DictStrAny] = None, /) -> None: - """Alpha version of the resource state. Resets the resource state +def reset_resource_state(resource_name: str, source_state_: Optional[DictStrAny] = None, /) -> None: + """Resets the resource state with name `resource_name` by removing it from `source_state` Args: resource_name: The resource key to reset diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index 5a7e2afa30..f5bc5e4888 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -5,7 +5,7 @@ from dlt.common.configuration.container import Container from dlt.common.configuration.resolve import inject_section from dlt.common.configuration.specs.config_section_context import ConfigSectionContext -from dlt.common.pipeline import _reset_resource_state +from dlt.common.pipeline import reset_resource_state from dlt.common.runtime import signals from dlt.common.runtime.collector import Collector, NULL_COLLECTOR @@ -181,11 +181,11 @@ def extract_with_schema( with Container().injectable_context(SourceSchemaInjectableContext(schema)): # inject the config section with the current source name with inject_section(ConfigSectionContext(sections=(known_sections.SOURCES, source.section, source.name), source_state_key=source.name)): - # reset resource states + # reset resource states, the `extracted` list contains all the explicit resources and all their parents for resource in source.resources.extracted.values(): with contextlib.suppress(DataItemRequiredForDynamicTableHints): if resource.write_disposition == "replace": - _reset_resource_state(resource.name) + reset_resource_state(resource.name) extractor = extract(extract_id, source, storage, collector, max_parallel_items=max_parallel_items, workers=workers) # iterate over all items in the pipeline and update the schema if dynamic table hints were present diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index d19a0f9234..fd058b8fa1 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -125,7 +125,6 @@ def is_empty(self) -> bool: @property def has_parent(self) -> bool: - """Checks if pipe is connected to parent pipe from which it takes data items. Connected pipes are created from transformer resources""" return self.parent is not None @property diff --git a/dlt/extract/source.py b/dlt/extract/source.py index c271944dc8..0b910eb089 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -527,7 +527,7 @@ def extracted(self) -> Dict[str, DltResource]: # resource for pipe not found: return mock resource mock_template = DltResourceSchema.new_table_template( pipe.name, - write_disposition=resource._table_schema_template.get("write_disposition") + write_disposition=resource.write_disposition ) resource = DltResource(pipe, mock_template, False, section=resource.section) resource.source_name = resource.source_name diff --git a/dlt/extract/typing.py b/dlt/extract/typing.py index 5f32556f92..ad4e23b84f 100644 --- a/dlt/extract/typing.py +++ b/dlt/extract/typing.py @@ -39,6 +39,12 @@ class SupportsPipe(Protocol): """A protocol with the core Pipe properties and operations""" name: str """Pipe name which is inherited by a resource""" + parent: "SupportsPipe" + """A parent of the current pipe""" + @property + def has_parent(self) -> bool: + """Checks if pipe is connected to parent pipe from which it takes data items. Connected pipes are created from transformer resources""" + ... ItemTransformFunctionWithMeta = Callable[[TDataItem, str], TAny] diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index 794c606040..5efe510f33 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -1,15 +1,16 @@ import inspect import makefun -from typing import Union, List, Any, Sequence, cast +from typing import Optional, Union, List, Any, Sequence, cast from collections.abc import Mapping as C_Mapping from dlt.common.exceptions import MissingDependencyException +from dlt.common.pipeline import reset_resource_state from dlt.common.schema.typing import TColumnNames, TAnySchemaColumns, TTableSchemaColumns -from dlt.common.typing import AnyFun, TDataItem, TDataItems +from dlt.common.typing import AnyFun, DictStrAny, TDataItem, TDataItems from dlt.common.utils import get_callable_name from dlt.extract.exceptions import InvalidResourceDataTypeFunctionNotAGenerator -from dlt.extract.typing import TTableHintTemplate, TDataItem, TFunHintTemplate +from dlt.extract.typing import TTableHintTemplate, TDataItem, TFunHintTemplate, SupportsPipe try: from dlt.common.libs import pydantic @@ -62,6 +63,13 @@ def wrapper(item: TDataItem) -> TTableSchemaColumns: return ensure_table_schema_columns(columns) +def reset_pipe_state(pipe: SupportsPipe, source_state_: Optional[DictStrAny] = None) -> None: + """Resets the resource state for a `pipe` and all its parent pipes""" + if pipe.has_parent: + reset_pipe_state(pipe.parent, source_state_) + reset_resource_state(pipe.name, source_state_) + + def simulate_func_call(f: Union[Any, AnyFun], args_to_skip: int, *args: Any, **kwargs: Any) -> inspect.Signature: """Simulates a call to a resource or transformer function before it will be wrapped for later execution in the pipe""" if not callable(f): diff --git a/dlt/pipeline/helpers.py b/dlt/pipeline/helpers.py index 7d1b210fb1..670b2a7887 100644 --- a/dlt/pipeline/helpers.py +++ b/dlt/pipeline/helpers.py @@ -7,7 +7,7 @@ from dlt.common.schema.utils import group_tables_by_resource, compile_simple_regexes, compile_simple_regex from dlt.common.schema.typing import TSimpleRegex from dlt.common.typing import REPattern -from dlt.common.pipeline import TSourceState, _reset_resource_state, _sources_state, _delete_source_state_keys, _get_matching_resources +from dlt.common.pipeline import TSourceState, reset_resource_state, _sources_state, _delete_source_state_keys, _get_matching_resources from dlt.common.destination.reference import WithStagingDataset from dlt.destinations.exceptions import DatabaseUndefinedRelation @@ -146,7 +146,7 @@ def _create_modified_state(self) -> Dict[str, Any]: if self.drop_state: for key in _get_matching_resources(self.resource_pattern, source_state): self.info['resource_states'].append(key) - _reset_resource_state(key, source_state) + reset_resource_state(key, source_state) resolved_paths = resolve_paths(self.state_paths_to_drop, source_state) if self.state_paths_to_drop and not resolved_paths: self.info['warnings'].append(f"State paths {self.state_paths_to_drop} did not select any paths in source {source_name}") diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 146ca954bf..cf644aa08d 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -478,23 +478,47 @@ def child(item): # also transformer will not receive new data info = p.run(child) assert len(info.loads_ids) == 0 - # now it will + # now it will (as the parent resource also got reset) info = p.run(child, write_disposition="replace") - print(info.load_packages[0]) + # print(info.load_packages[0]) assert len(info.loads_ids) == 1 + # pipeline applied hints to the child resource + assert child.write_disposition == "replace" + # create a source where we place only child + s = DltSource("comp", "section", Schema("comp"), [child]) + # but extracted resources will include its parent where it derives write disposition from child + extracted = s.resources.extracted + assert extracted[child.name].write_disposition == "replace" + assert extracted[child._pipe.parent.name].write_disposition == "replace" + + # create a source where we place parent explicitly s = DltSource("comp", "section", Schema("comp"), [parent_r, child]) + extracted = s.resources.extracted + assert extracted[child.name].write_disposition == "replace" + # now parent exists separately and has its own write disposition + assert extracted[child._pipe.parent.name].write_disposition == "append" p = dlt.pipeline(pipeline_name=uniq_id(), destination="duckdb") info = p.run(s) + # print(s.state) assert len(info.loads_ids) == 1 info = p.run(s) - # state was reset + # print(s.state) + # state was reset (child is replace but parent is append! so it will not generate any more items due to incremental + # so child will reset itself on replace and never set the state...) assert 'child' not in s.state['resources'] # there will be a load package to reset the state but also a load package to update the child table assert len(info.load_packages[0].jobs['completed_jobs']) == 2 assert {job.job_file_info.table_name for job in info.load_packages[0].jobs['completed_jobs'] } == {"_dlt_pipeline_state", "child"} + # now we add child that has parent_r as parent but we add another instance of standalone_some_data explicitly + # so we have a resource with the same name as child parent but the pipe instance is different + s = DltSource("comp", "section", Schema("comp"), [standalone_some_data(now), child]) + assert extracted[child.name].write_disposition == "replace" + # now parent exists separately and has its own write disposition - because we search by name to identify matching resource + assert extracted[child._pipe.parent.name].write_disposition == "append" + def test_incremental_as_transform() -> None: