Skip to content

Commit

Permalink
Merge pull request #274 from dlt-hub/rfix/adds-incremental-to-apply-h…
Browse files Browse the repository at this point in the history
…ints

adds incremental to apply hints
  • Loading branch information
rudolfix authored Apr 16, 2023
2 parents b46238a + 95bc3d0 commit 2f49a01
Show file tree
Hide file tree
Showing 31 changed files with 476 additions and 196 deletions.
2 changes: 1 addition & 1 deletion dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from dlt.common.configuration.accessors import config, secrets
from dlt.common.typing import TSecretValue as _TSecretValue
from dlt.common.configuration.specs import CredentialsConfiguration as _CredentialsConfiguration
from dlt.common.pipeline import state
from dlt.common.pipeline import source_state as state
from dlt.common.schema import Schema

from dlt import sources
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __str__(self) -> str:
msg += f'\tfor field "{f}" config providers and keys were tried in following order:\n'
for tr in field_traces:
msg += f'\t\tIn {tr.provider} key {tr.key} was not found.\n'
msg += "Please refer to https://dlthub.com/docs/customization/credentials for more information\n"
msg += "Please refer to https://dlthub.com/docs/general-usage/credentials for more information\n"
return msg


Expand Down
4 changes: 3 additions & 1 deletion dlt/common/configuration/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
config = resolve_configuration(config or SPEC(), explicit_value=bound_args.arguments)
resolved_params = dict(config)
bound_args.apply_defaults()
# overwrite or add resolved params
for p in sig.parameters.values():
if p.name in resolved_params:
Expand All @@ -143,6 +142,9 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
bound_args.arguments[p.name] = config
# pass all other config parameters into kwargs if present
if kwargs_arg is not None:
if kwargs_arg.name not in bound_args.arguments:
# add variadic keyword argument
bound_args.arguments[kwargs_arg.name] = {}
bound_args.arguments[kwargs_arg.name].update(resolved_params)
bound_args.arguments[kwargs_arg.name][_LAST_DLT_CONFIG] = config
bound_args.arguments[kwargs_arg.name][_ORIGINAL_ARGS] = (args, kwargs)
Expand Down
22 changes: 12 additions & 10 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def __init__(self, state: TPipelineState = None) -> None:
...


def state_value(container: Container, initial_default: TPipelineState = None) -> Tuple[TPipelineState, bool]:
def pipeline_state(container: Container, initial_default: TPipelineState = None) -> Tuple[TPipelineState, bool]:
"""Gets value of the state from context or active pipeline, if none found returns `initial_default`
Injected state is called "writable": it is injected by the `Pipeline` class and all the changes will be persisted.
Expand All @@ -259,16 +259,18 @@ def state_value(container: Container, initial_default: TPipelineState = None) ->
return proxy.pipeline().state, False


def state() -> DictStrAny:
"""Returns a dictionary with the source/resource state. Such state is preserved across pipeline runs and may be used to implement incremental loads.
def source_state() -> DictStrAny:
"""Returns a dictionary with the source state. Such state is preserved across pipeline runs and may be used to implement incremental loads.
### Summary
The state is a python dictionary-like object that is available within the `@dlt.source` and `@dlt.resource` decorated functions and may be read and written to.
The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time.
When using the state:
* Any JSON-serializable values can be written and the read from the state.
* The state available in the `dlt source` is read only and any changes will be discarded. Still it may be used to initialize the resources.
* The state available in the `dlt resource` is writable and written values will be available only once
* The source state is scoped to a section of the source. The source section is set by default to the module name in which source function is defined.
* If the `section` argument when decorating source function is not specified, all sources in the module will share the state
* Any JSON-serializable values can be written and the read from the state. `dlt` dumps and restores instances of Python bytes, DateTime, Date and Decimal types.
* The state available in the source decorated function is read only and any changes will be discarded.
* The state available in the resource decorated function is writable and written values will be available on the next pipeline run
### Example
The most typical use case for the state is to implement incremental load.
Expand Down Expand Up @@ -305,7 +307,7 @@ def state() -> DictStrAny:
if not source_section:
raise SourceSectionNotAvailable()

state, _ = state_value(container)
state, _ = pipeline_state(container)
if state is None:
raise PipelineStateNotAvailable(source_section)

Expand All @@ -324,12 +326,12 @@ def _resource_state(resource_name: str) -> DictStrAny:
"""Alpha version of the resource state, the signature will change.
Returns resource-scoped state.
"""
return state().setdefault('resources', {}).setdefault(resource_name, {}) # type: ignore
return source_state().setdefault('resources', {}).setdefault(resource_name, {}) # type: ignore


def _reset_resource_state(resource_name: str) -> None:
"""Alpha version of the resource state. Resets the resource states"""
state_ = state()
"""Alpha version of the resource state. Resets the resource state"""
state_ = source_state()
if "resources" in state_ and resource_name in state_["resources"]:
state_["resources"].pop(resource_name)

Expand Down
14 changes: 10 additions & 4 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ def source(
### Summary
A `dlt source` is a logical grouping of resources that are often extracted and loaded together. A source is associated with a schema, which describes the structure of the loaded data and provides instructions how to load it.
Such schema contains table schemas that describe the structure of the data coming from the resources. See https://dlthub.com/docs/glossary for more basic term definitions.
Such schema contains table schemas that describe the structure of the data coming from the resources.
Please refer to https://dlthub.com/docs/general-usage/source for a complete documentation.
### Passing credentials
Another important function of the source decorator is to provide credentials and other configuration to the code that extracts data. The decorator may automatically bind the source function arguments to the secret and config values.
Expand All @@ -102,7 +104,7 @@ def source(
>>> list(chess("magnuscarlsen"))
Here `username` is a required, explicit python argument, `chess_url` is a required argument, that if not explicitly passed will be taken from configuration ie. `config.toml`, `api_secret` is a required argument, that if not explicitly passed will be taken from dlt secrets ie. `secrets.toml`.
See https://dlthub.com/docs/customization/credentials for details.
See https://dlthub.com/docs/general-usage/credentials for details.
### Args:
func: A function that returns a dlt resource or a list of those or a list of any data items that can be loaded by `dlt`.
Expand Down Expand Up @@ -255,7 +257,9 @@ def resource(
### Summary
A `resource`is a location within a `source` that holds the data with specific structure (schema) or coming from specific origin. A resource may be a rest API endpoint, table in the database or a tab in Google Sheets.
A `dlt resource` is python representation of a `resource` that combines both data and metadata (table schema) that describes the structure and instructs the loading of the data.
A `dlt resource` is also an `Iterable` and can used like any other similar object ie. list or tuple. See https://dlthub.com/docs/glossary for more on basic term definitions.
A `dlt resource` is also an `Iterable` and can used like any other iterable object ie. list or tuple.
Please refer to https://dlthub.com/docs/general-usage/resource for a complete documentation.
### Passing credentials
If used as a decorator (`data` argument is a `Generator`), it may automatically bind the source function arguments to the secret and config values.
Expand All @@ -266,7 +270,7 @@ def resource(
>>> list(user_games("magnuscarlsen"))
Here `username` is a required, explicit python argument, `chess_url` is a required argument, that if not explicitly passed will be taken from configuration ie. `config.toml`, `api_secret` is a required argument, that if not explicitly passed will be taken from dlt secrets ie. `secrets.toml`.
See https://dlthub.com/docs/customization/credentials for details.
See https://dlthub.com/docs/general-usage/credentials for details.
Note that if decorated function is an inner function, passing of the credentials will be disabled.
### Args:
Expand All @@ -285,8 +289,10 @@ def resource(
This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes.
primary_key (str | Sequence[str]): A column name or a list of column names that comprise a private key. Typically used with "merge" write disposition to deduplicate loaded data.
This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes.
merge_key (str | Sequence[str]): A column name or a list of column names that define a merge key. Typically used with "merge" write disposition to remove overlapping data ranges ie. to keep a single record for a given day.
This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes.
selected (bool, optional): When `True` `dlt pipeline` will extract and load this resource, if `False`, the resource will be ignored.
Expand Down
62 changes: 34 additions & 28 deletions dlt/extract/incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dlt.common.pipeline import _resource_state
from dlt.common.utils import digest128
from dlt.extract.exceptions import PipeException
from dlt.extract.pipe import Pipe
from dlt.extract.utils import resolve_column_value
from dlt.extract.typing import FilterItem, SupportsPipe, TTableHintTemplate

Expand Down Expand Up @@ -89,6 +90,9 @@ def __init__(
self.cursor_path_p: JSONPath = jsonpath_parse(cursor_path)
self.last_value_func = last_value_func
self.initial_value = initial_value
"""Initial value of last_value"""
self.start_value: Any = initial_value
"""Value of last_value at the beginning of current pipeline run"""
self.resource_name: Optional[str] = None
self.primary_key: Optional[TTableHintTemplate[TColumnKey]] = primary_key
self._cached_state: IncrementalColumnState = None
Expand All @@ -98,13 +102,13 @@ def __init__(
@classmethod
def from_existing_state(cls, resource_name: str, cursor_path: str) -> "Incremental[TCursorValue]":
"""Create Incremental instance from existing state."""
state = Incremental.get_state(resource_name, cursor_path)
state = Incremental._get_state(resource_name, cursor_path)
i = cls(cursor_path, state["initial_value"])
i.resource_name = resource_name
return i

def copy(self) -> "Incremental[TCursorValue]":
return self.__class__(self.cursor_path, initial_value=self.initial_value, last_value_func=self.last_value_func)
return self.__class__(self.cursor_path, initial_value=self.initial_value, last_value_func=self.last_value_func, primary_key=self.primary_key)

def on_resolved(self) -> None:
self.cursor_path_p = jsonpath_parse(self.cursor_path)
Expand All @@ -121,12 +125,9 @@ def parse_native_representation(self, native_value: Any) -> None:
self.initial_value = native_value
self.__is_resolved__ = not self.is_partial()


def get_cached_state(self) -> IncrementalColumnState:
"""Given resource state, returns a Incremental state for particular cursor column"""
if self._cached_state:
return self._cached_state
self._cached_state = Incremental.get_state(self.resource_name, self.cursor_path)
def get_state(self) -> IncrementalColumnState:
"""Returns an Incremental state for a particular cursor column"""
self._cached_state = Incremental._get_state(self.resource_name, self.cursor_path)
if len(self._cached_state) == 0:
# set the default like this, setdefault evaluates the default no matter if it is needed or not. and our default is heavy
self._cached_state.update(
Expand All @@ -139,14 +140,14 @@ def get_cached_state(self) -> IncrementalColumnState:
return self._cached_state

@staticmethod
def get_state(resource_name: str, cursor_path: str) -> IncrementalColumnState:
def _get_state(resource_name: str, cursor_path: str) -> IncrementalColumnState:
state: IncrementalColumnState = _resource_state(resource_name).setdefault('incremental', {}).setdefault(cursor_path, {})
# if state params is empty
return state

@property
def last_value(self) -> Optional[TCursorValue]:
s = self.get_cached_state()
s = self.get_state()
return s['last_value'] # type: ignore

def unique_value(self, row: TDataItem) -> str:
Expand All @@ -169,21 +170,21 @@ def transform(self, row: TDataItem) -> bool:
incremental_state = self._cached_state
last_value = incremental_state['last_value']
row_value = row_values[0].value # For now the value needs to match deserialized presentation from state
check_values = [row_value] + ([last_value] if last_value is not None else [])
check_values = (row_value,) + ((last_value, ) if last_value is not None else ())
new_value = self.last_value_func(check_values)
if last_value == new_value:
# we store row id for all records with the current "last_value" in state and use it to deduplicate
if self.last_value_func([row_value]) == last_value:
if self.last_value_func((row_value, )) == last_value:
unique_value = self.unique_value(row)
if unique_value in incremental_state['unique_hashes']:
return False
# add new hash only if the record row id is same as current last value
incremental_state['unique_hashes'].append(unique_value)
return True
# skip the record that is not a last_value or new_value: that record was already processed
check_values = ([self.initial_value] if self.initial_value is not None else []) + [row_value]
check_values = (row_value,) + ((self.start_value,) if self.start_value is not None else ())
new_value = self.last_value_func(check_values)
if new_value == self.initial_value:
if new_value == self.start_value:
return False
else:
return True
Expand All @@ -195,12 +196,13 @@ def transform(self, row: TDataItem) -> bool:
def bind(self, pipe: SupportsPipe) -> "Incremental[TCursorValue]":
"Called by pipe just before evaluation"
# bind the resource/pipe name
if self.resource_name is None:
self.resource_name = pipe.name
if self.is_partial():
raise IncrementalCursorPathMissing(pipe.name, None, None)
# fill cache
self.get_cached_state()
self.resource_name = pipe.name
# set initial value from last value, in case of a new state those are equal
self.start_value = self.last_value
# cache state
self._cached_state = self.get_state()
return self

def __str__(self) -> str:
Expand Down Expand Up @@ -242,10 +244,7 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
p = incremental_param
assert p is not None
new_incremental: Incremental[Any] = None

bound_args = sig.bind(*args, **kwargs)
if isinstance(p.default, Incremental):
new_incremental = p.default.copy()

if p.name in bound_args.arguments:
explicit_value = bound_args.arguments[p.name]
Expand All @@ -264,18 +263,25 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
bound_args.arguments[p.name] = None # Remove partial spec
return func(*bound_args.args, **bound_args.kwargs)
raise ValueError(f"{p.name} Incremental has no default")
new_incremental.resource_name = self.resource_name
# set initial value from last value, in case of a new state those are equal
# this will also cache state in incremental
new_incremental.initial_value = new_incremental.last_value
self._incremental = new_incremental
bound_args.arguments[p.name] = new_incremental
# set the incremental only if not yet set or if it was passed explicitly
# NOTE: the _incremental may be also set by applying hints to the resource see `set_template` in `DltResource`
if p.name in bound_args.arguments or not self._incremental:
self._incremental = new_incremental
# in case of transformers the bind will be called before this wrapper is set: because transformer is called for a first time late in the pipe
self._incremental.bind(Pipe(self.resource_name))
bound_args.arguments[p.name] = self._incremental
return func(*bound_args.args, **bound_args.kwargs)

return _wrap # type: ignore

def bind(self, pipe: SupportsPipe) -> "IncrementalResourceWrapper":
if self._incremental:
self._incremental.bind(pipe)
return self

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
if not self._incremental:
return item
self._incremental.primary_key = self.primary_key
if not self._incremental.primary_key:
self._incremental.primary_key = self.primary_key
return self._incremental(item, meta)
Loading

0 comments on commit 2f49a01

Please sign in to comment.