Skip to content

Commit

Permalink
improves tests and docstring for state reset on replace
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Oct 9, 2023
1 parent 2c7e4de commit 9dacf5c
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 15 deletions.
4 changes: 2 additions & 2 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ def resource_state(resource_name: str = None, source_state_: Optional[DictStrAny
return state_.setdefault('resources', {}).setdefault(resource_name, {}) # type: ignore


def _reset_resource_state(resource_name: str, source_state_: Optional[DictStrAny] = None, /) -> None:
"""Alpha version of the resource state. Resets the resource state
def reset_resource_state(resource_name: str, source_state_: Optional[DictStrAny] = None, /) -> None:
"""Resets the resource state with name `resource_name` by removing it from `source_state`
Args:
resource_name: The resource key to reset
Expand Down
6 changes: 3 additions & 3 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dlt.common.configuration.container import Container
from dlt.common.configuration.resolve import inject_section
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext
from dlt.common.pipeline import _reset_resource_state
from dlt.common.pipeline import reset_resource_state

from dlt.common.runtime import signals
from dlt.common.runtime.collector import Collector, NULL_COLLECTOR
Expand Down Expand Up @@ -181,11 +181,11 @@ def extract_with_schema(
with Container().injectable_context(SourceSchemaInjectableContext(schema)):
# inject the config section with the current source name
with inject_section(ConfigSectionContext(sections=(known_sections.SOURCES, source.section, source.name), source_state_key=source.name)):
# reset resource states
# reset resource states, the `extracted` list contains all the explicit resources and all their parents
for resource in source.resources.extracted.values():
with contextlib.suppress(DataItemRequiredForDynamicTableHints):
if resource.write_disposition == "replace":
_reset_resource_state(resource.name)
reset_resource_state(resource.name)

extractor = extract(extract_id, source, storage, collector, max_parallel_items=max_parallel_items, workers=workers)
# iterate over all items in the pipeline and update the schema if dynamic table hints were present
Expand Down
1 change: 0 additions & 1 deletion dlt/extract/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ def is_empty(self) -> bool:

@property
def has_parent(self) -> bool:
"""Checks if pipe is connected to parent pipe from which it takes data items. Connected pipes are created from transformer resources"""
return self.parent is not None

@property
Expand Down
2 changes: 1 addition & 1 deletion dlt/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ def extracted(self) -> Dict[str, DltResource]:
# resource for pipe not found: return mock resource
mock_template = DltResourceSchema.new_table_template(
pipe.name,
write_disposition=resource._table_schema_template.get("write_disposition")
write_disposition=resource.write_disposition
)
resource = DltResource(pipe, mock_template, False, section=resource.section)
resource.source_name = resource.source_name
Expand Down
6 changes: 6 additions & 0 deletions dlt/extract/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class SupportsPipe(Protocol):
"""A protocol with the core Pipe properties and operations"""
name: str
"""Pipe name which is inherited by a resource"""
parent: "SupportsPipe"
"""A parent of the current pipe"""
@property
def has_parent(self) -> bool:
"""Checks if pipe is connected to parent pipe from which it takes data items. Connected pipes are created from transformer resources"""
...


ItemTransformFunctionWithMeta = Callable[[TDataItem, str], TAny]
Expand Down
14 changes: 11 additions & 3 deletions dlt/extract/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import inspect
import makefun
from typing import Union, List, Any, Sequence, cast
from typing import Optional, Union, List, Any, Sequence, cast
from collections.abc import Mapping as C_Mapping

from dlt.common.exceptions import MissingDependencyException
from dlt.common.pipeline import reset_resource_state
from dlt.common.schema.typing import TColumnNames, TAnySchemaColumns, TTableSchemaColumns
from dlt.common.typing import AnyFun, TDataItem, TDataItems
from dlt.common.typing import AnyFun, DictStrAny, TDataItem, TDataItems
from dlt.common.utils import get_callable_name
from dlt.extract.exceptions import InvalidResourceDataTypeFunctionNotAGenerator

from dlt.extract.typing import TTableHintTemplate, TDataItem, TFunHintTemplate
from dlt.extract.typing import TTableHintTemplate, TDataItem, TFunHintTemplate, SupportsPipe

try:
from dlt.common.libs import pydantic
Expand Down Expand Up @@ -62,6 +63,13 @@ def wrapper(item: TDataItem) -> TTableSchemaColumns:
return ensure_table_schema_columns(columns)


def reset_pipe_state(pipe: SupportsPipe, source_state_: Optional[DictStrAny] = None) -> None:
"""Resets the resource state for a `pipe` and all its parent pipes"""
if pipe.has_parent:
reset_pipe_state(pipe.parent, source_state_)
reset_resource_state(pipe.name, source_state_)


def simulate_func_call(f: Union[Any, AnyFun], args_to_skip: int, *args: Any, **kwargs: Any) -> inspect.Signature:
"""Simulates a call to a resource or transformer function before it will be wrapped for later execution in the pipe"""
if not callable(f):
Expand Down
4 changes: 2 additions & 2 deletions dlt/pipeline/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dlt.common.schema.utils import group_tables_by_resource, compile_simple_regexes, compile_simple_regex
from dlt.common.schema.typing import TSimpleRegex
from dlt.common.typing import REPattern
from dlt.common.pipeline import TSourceState, _reset_resource_state, _sources_state, _delete_source_state_keys, _get_matching_resources
from dlt.common.pipeline import TSourceState, reset_resource_state, _sources_state, _delete_source_state_keys, _get_matching_resources
from dlt.common.destination.reference import WithStagingDataset

from dlt.destinations.exceptions import DatabaseUndefinedRelation
Expand Down Expand Up @@ -146,7 +146,7 @@ def _create_modified_state(self) -> Dict[str, Any]:
if self.drop_state:
for key in _get_matching_resources(self.resource_pattern, source_state):
self.info['resource_states'].append(key)
_reset_resource_state(key, source_state)
reset_resource_state(key, source_state)
resolved_paths = resolve_paths(self.state_paths_to_drop, source_state)
if self.state_paths_to_drop and not resolved_paths:
self.info['warnings'].append(f"State paths {self.state_paths_to_drop} did not select any paths in source {source_name}")
Expand Down
30 changes: 27 additions & 3 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,23 +478,47 @@ def child(item):
# also transformer will not receive new data
info = p.run(child)
assert len(info.loads_ids) == 0
# now it will
# now it will (as the parent resource also got reset)
info = p.run(child, write_disposition="replace")
print(info.load_packages[0])
# print(info.load_packages[0])
assert len(info.loads_ids) == 1
# pipeline applied hints to the child resource
assert child.write_disposition == "replace"

# create a source where we place only child
s = DltSource("comp", "section", Schema("comp"), [child])
# but extracted resources will include its parent where it derives write disposition from child
extracted = s.resources.extracted
assert extracted[child.name].write_disposition == "replace"
assert extracted[child._pipe.parent.name].write_disposition == "replace"

# create a source where we place parent explicitly
s = DltSource("comp", "section", Schema("comp"), [parent_r, child])
extracted = s.resources.extracted
assert extracted[child.name].write_disposition == "replace"
# now parent exists separately and has its own write disposition
assert extracted[child._pipe.parent.name].write_disposition == "append"

p = dlt.pipeline(pipeline_name=uniq_id(), destination="duckdb")
info = p.run(s)
# print(s.state)
assert len(info.loads_ids) == 1
info = p.run(s)
# state was reset
# print(s.state)
# state was reset (child is replace but parent is append! so it will not generate any more items due to incremental
# so child will reset itself on replace and never set the state...)
assert 'child' not in s.state['resources']
# there will be a load package to reset the state but also a load package to update the child table
assert len(info.load_packages[0].jobs['completed_jobs']) == 2
assert {job.job_file_info.table_name for job in info.load_packages[0].jobs['completed_jobs'] } == {"_dlt_pipeline_state", "child"}

# now we add child that has parent_r as parent but we add another instance of standalone_some_data explicitly
# so we have a resource with the same name as child parent but the pipe instance is different
s = DltSource("comp", "section", Schema("comp"), [standalone_some_data(now), child])
assert extracted[child.name].write_disposition == "replace"
# now parent exists separately and has its own write disposition - because we search by name to identify matching resource
assert extracted[child._pipe.parent.name].write_disposition == "append"


def test_incremental_as_transform() -> None:

Expand Down

0 comments on commit 9dacf5c

Please sign in to comment.