Skip to content

Commit

Permalink
Merge pull request #228 from dlt-hub/rfix/moves-state-common
Browse files Browse the repository at this point in the history
moves state to common
  • Loading branch information
rudolfix authored Apr 5, 2023
2 parents c9c5f5c + 0b8fccc commit 08eabd1
Show file tree
Hide file tree
Showing 21 changed files with 420 additions and 252 deletions.
6 changes: 3 additions & 3 deletions dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
from dlt.common.configuration.accessors import config, secrets
from dlt.common.typing import TSecretValue as _TSecretValue
from dlt.common.configuration.specs import CredentialsConfiguration as _CredentialsConfiguration
from dlt.common.pipeline import state
from dlt.common.schema import Schema

from dlt.extract.decorators import source, resource, transformer, defer
from dlt.extract.source import with_table_name
from dlt.pipeline import pipeline as _pipeline, run, attach, Pipeline, dbt, current as _current, mark as _mark
from dlt.pipeline.state import state
from dlt.common.schema import Schema

pipeline = _pipeline
current = _current
Expand Down
3 changes: 1 addition & 2 deletions dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
from dlt.cli.exceptions import CliCommandException

from dlt.common import json
from dlt.common.pipeline import get_dlt_pipelines_dir
from dlt.common.pipeline import get_dlt_pipelines_dir, TSourceState
from dlt.common.runners import Venv
from dlt.common.runners.stdout import iter_stdout
from dlt.common.schema.utils import remove_defaults
from dlt.common.storages.file_storage import FileStorage

from dlt.cli import echo as fmt
from dlt.pipeline.state import TSourceState


def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, verbosity: int, load_id: str = None) -> None:
Expand Down
11 changes: 11 additions & 0 deletions dlt/common/configuration/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@


class Container:
"""A singleton injection container holding several injection contexts. Implements basic dictionary interface.
Injection context is identified by its type and available via dict indexer. The common pattern is to instantiate default context value
if it is not yet present in container.
The indexer is settable and allows to explicitly set the value. This is required by for context that needs to be explicitly instantiated.
The `injectable_context` allows to set a context with a `with` keyword and then restore the previous one after it gets out of scope.
"""

_INSTANCE: "Container" = None

Expand Down Expand Up @@ -52,6 +62,7 @@ def __contains__(self, spec: Type[TConfiguration]) -> bool:

@contextmanager
def injectable_context(self, config: TConfiguration) -> Iterator[TConfiguration]:
"""A context manager that will insert `config` into the container and restore the previous value when it gets out of scope."""
spec = type(config)
previous_config: ContainerInjectableContext = None
if spec in self.contexts:
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/configuration/specs/base_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ def default_credentials(self) -> Any:

@configspec
class ContainerInjectableContext(BaseConfiguration):
"""Base class for all configurations that may be injected from Container. Injectable configurations are called contexts"""
"""Base class for all configurations that may be injected from a Container. Injectable configuration is called a context"""

# If True, `Container` is allowed to create default context instance, if none exists
can_create_default: ClassVar[bool] = True
"""If True, `Container` is allowed to create default context instance, if none exists"""


_F_ContainerInjectableContext = ContainerInjectableContext
11 changes: 11 additions & 0 deletions dlt/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,14 @@ def __init__(self, pipeline_name: str, msg: str) -> None:
"""Base class for all pipeline exceptions. Should not be raised."""
self.pipeline_name = pipeline_name
super().__init__(msg)


class PipelineStateNotAvailable(PipelineException):
def __init__(self, source_name: str) -> None:
if source_name:
msg = f"The source {source_name} requested the access to pipeline state but no pipeline is active right now."
else:
msg = "The resource you called requested the access to pipeline state but no pipeline is active right now."
msg += " Call dlt.pipeline(...) before you call the @dlt.source or @dlt.resource decorated function."
self.source_name = source_name
super().__init__(None, msg)
106 changes: 103 additions & 3 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import os
import datetime # noqa: 251
import humanize
from typing import Any, Callable, ClassVar, Dict, List, NamedTuple, Optional, Protocol, Sequence, Tuple, TypedDict
import contextlib
from typing import Any, Callable, ClassVar, Dict, List, NamedTuple, Optional, Protocol, Sequence, TYPE_CHECKING, Tuple, TypedDict

from dlt.common import pendulum
from dlt.common.configuration import configspec
from dlt.common.configuration.container import ContainerInjectableContext
from dlt.common.configuration import known_sections
from dlt.common.configuration.container import Container
from dlt.common.configuration.exceptions import ContextDefaultCannotBeCreated
from dlt.common.configuration.specs import ContainerInjectableContext
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext
from dlt.common.configuration.paths import get_dlt_home_dir
from dlt.common.configuration.specs import RunConfiguration
from dlt.common.destination.reference import DestinationReference, TDestinationReferenceArg
from dlt.common.exceptions import DestinationHasFailedJobs
from dlt.common.exceptions import DestinationHasFailedJobs, PipelineStateNotAvailable
from dlt.common.schema import Schema
from dlt.common.schema.typing import TColumnKey, TColumnSchema, TWriteDisposition
from dlt.common.storages.load_storage import LoadPackageInfo
Expand Down Expand Up @@ -127,6 +132,11 @@ class TPipelineState(TypedDict, total=False):
"""A section of state that is not synchronized with the destination and does not participate in change merging and version control"""


class TSourceState(TPipelineState):
sources: Dict[str, Dict[str, Any]]



class SupportsPipeline(Protocol):
"""A protocol with core pipeline operations that lets high level abstractions ie. sources to access pipeline methods and properties"""
pipeline_name: str
Expand Down Expand Up @@ -217,6 +227,96 @@ def __init__(self, deferred_pipeline: Callable[..., SupportsPipeline]) -> None:
self._deferred_pipeline = deferred_pipeline


@configspec(init=True)
class StateInjectableContext(ContainerInjectableContext):
state: TPipelineState

can_create_default: ClassVar[bool] = False

if TYPE_CHECKING:
def __init__(self, state: TPipelineState = None) -> None:
...


def state_value(container: Container, initial_default: TPipelineState = None) -> Tuple[TPipelineState, bool]:
"""Gets value of the state from context or active pipeline, if none found returns `initial_default`
Injected state is called "writable": it is injected by the `Pipeline` class and all the changes will be persisted.
The state coming from pipeline context or `initial_default` is called "read only" and all the changes to it will be discarded
Returns tuple (state, writable)
"""
try:
# get injected state if present. injected state is typically "managed" so changes will be persisted
return container[StateInjectableContext].state, True
except ContextDefaultCannotBeCreated:
# check if there's pipeline context
proxy = container[PipelineContext]
if not proxy.is_active():
return initial_default, False
else:
# get unmanaged state that is read only
# TODO: make sure that state if up to date by syncing the pipeline earlier
return proxy.pipeline().state, False


def state() -> DictStrAny:
"""Returns a dictionary with the source/resource state. Such state is preserved across pipeline runs and may be used to implement incremental loads.
### Summary
The state is a python dictionary-like object that is available within the `@dlt.source` and `@dlt.resource` decorated functions and may be read and written to.
The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time.
When using the state:
* Any JSON-serializable values can be written and the read from the state.
* The state available in the `dlt source` is read only and any changes will be discarded. Still it may be used to initialize the resources.
* The state available in the `dlt resource` is writable and written values will be available only once
### Example
The most typical use case for the state is to implement incremental load.
>>> @dlt.resource(write_disposition="append")
>>> def players_games(chess_url, players, start_month=None, end_month=None):
>>> checked_archives = dlt.current.state().setdefault("archives", [])
>>> archives = players_archives(chess_url, players)
>>> for url in archives:
>>> if url in checked_archives:
>>> print(f"skipping archive {url}")
>>> continue
>>> else:
>>> print(f"getting archive {url}")
>>> checked_archives.append(url)
>>> # get the filtered archive
>>> r = requests.get(url)
>>> r.raise_for_status()
>>> yield r.json().get("games", [])
Here we store all the urls with game archives in the state and we skip loading them on next run. The archives are immutable. The state will grow with the coming months (and more players).
Up to few thousand archives we should be good though.
"""
global _last_full_state

container = Container()
# get the source name from the section context
source_section: str = None
with contextlib.suppress(ContextDefaultCannotBeCreated):
sections_context = container[ConfigSectionContext]
with contextlib.suppress(ValueError):
source_section = sections_context.source_section()

state, _ = state_value(container)
if state is None:
raise PipelineStateNotAvailable(source_section)

source_state: DictStrAny = state.setdefault(known_sections.SOURCES, {}) # type: ignore
if source_section:
source_state = source_state.setdefault(source_section, {})

# allow inspection of last returned full state
_last_full_state = state
return source_state

_last_full_state: TPipelineState = None


def get_dlt_pipelines_dir() -> str:
""" Gets default directory where pipelines' data will be stored
1. in user home directory ~/.dlt/pipelines/
Expand Down
27 changes: 24 additions & 3 deletions dlt/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from functools import wraps
from os import environ

from typing import Any, Dict, Iterator, Optional, Sequence, TypeVar, Mapping, List, Union, Counter
from typing import Any, ContextManager, Dict, Iterator, Optional, Sequence, TypeVar, Mapping, List, Union, Counter
from collections.abc import Mapping as C_Mapping

from dlt.common.typing import AnyFun, StrAny, DictStrAny, StrStr, TAny, TFun
Expand All @@ -34,12 +34,12 @@ def uniq_id_base64(len_: int = 16) -> str:


def digest128(v: str, len_: int = 15) -> str:
"""Returns a base64 encoded shake128 hash of str v with digest of length len_ (default: 15 bytes = 20 characters length)"""
"""Returns a base64 encoded shake128 hash of str `v` with digest of length `len_` (default: 15 bytes = 20 characters length)"""
return base64.b64encode(hashlib.shake_128(v.encode("utf-8")).digest(len_)).decode('ascii').rstrip("=")


def digest128b(v: bytes, len_: int = 15) -> str:
"""Returns a base64 encoded shake128 hash of bytes v with digest of length len_ (default: 15 bytes = 20 characters length)"""
"""Returns a base64 encoded shake128 hash of bytes `v` with digest of length `len_` (default: 15 bytes = 20 characters length)"""
enc_v = base64.b64encode(hashlib.shake_128(v).digest(len_)).decode('ascii')
return enc_v.rstrip("=")

Expand Down Expand Up @@ -265,6 +265,27 @@ def set_working_dir(path: str) -> Iterator[str]:
os.chdir(curr_dir)


@contextmanager
def multi_context_manager(managers: Sequence[ContextManager[Any]]) -> Iterator[Any]:
"""A context manager holding several other context managers. Enters and exists all of them. Yields from the last in the list"""
try:
rv: Any = None
for manager in managers:
rv = manager.__enter__()
yield rv
except Exception as ex:
# release context manager
for manager in managers:
if isinstance(ex, StopIteration):
manager.__exit__(None, None, None)
else:
manager.__exit__(type(ex), ex, None)
raise
else:
for manager in managers:
manager.__exit__(None, None, None)


def get_callable_name(f: AnyFun, name_attr: str = "__name__") -> Optional[str]:
# check first if __name__ is present directly (function), if not then look for type name
name: str = getattr(f, name_attr, None)
Expand Down
7 changes: 6 additions & 1 deletion dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def source(
func: Callable[TSourceFunParams, Any],
/,
name: str = None,
section: str = None,
max_table_nesting: int = None,
root_key: bool = False,
schema: Schema = None,
Expand All @@ -68,6 +69,7 @@ def source(
func: None = ...,
/,
name: str = None,
section: str = None,
max_table_nesting: int = None,
root_key: bool = False,
schema: Schema = None,
Expand All @@ -79,6 +81,7 @@ def source(
func: Optional[AnyFun] = None,
/,
name: str = None,
section: str = None,
max_table_nesting: int = None,
root_key: bool = False,
schema: Schema = None,
Expand Down Expand Up @@ -106,6 +109,8 @@ def source(
name (str, optional): A name of the source which is also the name of the associated schema. If not present, the function name will be used.
section (str, optional): A name of configuration and state sections. If not present, the current python module name will be used.
max_table_nesting (int, optional): A schema hint that sets the maximum depth of nested table beyond which the remaining nodes are loaded as string.
root_key (bool): Enables merging on all resources by propagating root foreign key to child tables. This option is most useful if you plan to change write disposition of a resource to disable/enable merge. Defaults to False.
Expand Down Expand Up @@ -158,7 +163,7 @@ def decorator(f: Callable[TSourceFunParams, Any]) -> Callable[TSourceFunParams,

# wrap source extraction function in configuration with section
func_module = inspect.getmodule(f)
source_section = _get_source_section_name(func_module)
source_section = section or _get_source_section_name(func_module)
source_sections = (known_sections.SOURCES, source_section, name)
conf_f = with_config(f, spec=spec, sections=source_sections)

Expand Down
46 changes: 36 additions & 10 deletions dlt/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import makefun
import inspect
from collections.abc import Mapping as C_Mapping
from typing import AsyncIterable, AsyncIterator, ClassVar, Callable, Dict, Iterable, Iterator, List, Sequence, Union, cast, Any
from typing import AsyncIterable, AsyncIterator, ClassVar, Callable, ContextManager, Dict, Iterable, Iterator, List, Sequence, Union, cast, Any
from dlt.common.configuration.resolve import inject_section
from dlt.common.configuration.specs import known_sections
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext
Expand All @@ -12,10 +12,10 @@
from dlt.common.schema import Schema
from dlt.common.schema.utils import merge_columns, new_column, new_table
from dlt.common.schema.typing import TColumnProp, TColumnSchema, TPartialTableSchema, TTableSchemaColumns, TWriteDisposition
from dlt.common.typing import AnyFun, TDataItem, TDataItems, NoneType, TFun
from dlt.common.typing import AnyFun, TDataItem, TDataItems, NoneType
from dlt.common.configuration.container import Container
from dlt.common.pipeline import PipelineContext, SupportsPipelineRun
from dlt.common.utils import flatten_list_or_items, get_callable_name
from dlt.common.pipeline import PipelineContext, StateInjectableContext, SupportsPipelineRun, state_value
from dlt.common.utils import flatten_list_or_items, get_callable_name, multi_context_manager

from dlt.extract.typing import DataItemWithMeta, ItemTransformFunc, TColumnKey, TableNameMeta, TFunHintTemplate, TTableHintTemplate, TTableSchemaTemplate, FilterItem, MapItem, YieldMapItem
from dlt.extract.pipe import Pipe, ManagedPipeIterator
Expand Down Expand Up @@ -417,7 +417,20 @@ def __or__(self, transform: Union["DltResource", AnyFun]) -> "DltResource":
return self.add_map(transform)

def __iter__(self) -> Iterator[TDataItem]:
return flatten_list_or_items(map(lambda item: item.item, ManagedPipeIterator.from_pipe(self._pipe)))
"""Opens iterator that yields the data items from the resources in the same order as in Pipeline class.
A read-only state is provided, initialized from active pipeline state. The state is discarded after the iterator is closed.
"""
# use the same state dict when opening iterator and when iterator is iterated
state, _ = state_value(Container(), {})

# managed pipe iterator will remove injected contexts when closing
with Container().injectable_context(StateInjectableContext(state=state)):
pipe_iterator: ManagedPipeIterator = ManagedPipeIterator.from_pipe(self._pipe) # type: ignore

pipe_iterator.set_context_manager(Container().injectable_context(StateInjectableContext(state=state)))
_iter = map(lambda item: item.item, pipe_iterator)
return flatten_list_or_items(_iter)

def __str__(self) -> str:
info = f"DltResource {self.name}:"
Expand Down Expand Up @@ -638,12 +651,25 @@ def __setattr__(self, name: str, value: Any) -> None:
super().__setattr__(name, value)

def __iter__(self) -> Iterator[TDataItem]:
with inject_section(ConfigSectionContext(sections=(known_sections.SOURCES, self.section, self.name))):
# evaluate the pipes in the section context
"""Opens iterator that yields the data items from all the resources within the source in the same order as in Pipeline class.
A read-only state is provided, initialized from active pipeline state. The state is discarded after the iterator is closed.
A source config section is injected to allow secrets/config injection as during regular extraction.
"""
# use the same state dict when opening iterator and when iterator is iterated
mock_state, _ = state_value(Container(), {})

def _get_context() -> List[ContextManager[Any]]:
return [
inject_section(ConfigSectionContext(sections=(known_sections.SOURCES, self.section, self.name))),
Container().injectable_context(StateInjectableContext(state=mock_state))
]

# managed pipe iterator will remove injected contexts when closing
with multi_context_manager(_get_context()):
pipe_iterator: ManagedPipeIterator = ManagedPipeIterator.from_pipes(self._resources.selected_pipes) # type: ignore
# keep same context during evaluation
context = inject_section(ConfigSectionContext(sections=(known_sections.SOURCES, self.section, self.name)))
pipe_iterator.set_context_manager(context)
pipe_iterator.set_context_manager(multi_context_manager(_get_context()))
_iter = map(lambda item: item.item, pipe_iterator)
self.exhausted = True
return flatten_list_or_items(_iter)
Expand Down
2 changes: 1 addition & 1 deletion dlt/helpers/streamlit.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dlt.helpers.pandas import pd
from dlt.pipeline import Pipeline
from dlt.pipeline.exceptions import CannotRestorePipelineException
from dlt.pipeline.state import load_state_from_destination
from dlt.pipeline.state_sync import load_state_from_destination

try:
import streamlit as st
Expand Down
Loading

0 comments on commit 08eabd1

Please sign in to comment.