Skip to content

Commit

Permalink
adds explicit standalone resources
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Oct 6, 2023
1 parent cd530bf commit ad8bf0f
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 5 deletions.
31 changes: 29 additions & 2 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import inspect
from types import ModuleType
from functools import wraps
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Iterator, List, Optional, Tuple, Type, TypeVar, Union, cast, overload
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Iterator, List, Literal, Optional, Tuple, Type, TypeVar, Union, cast, overload

from dlt.common.configuration import with_config, get_fun_spec, known_sections, configspec
from dlt.common.configuration.container import Container
Expand Down Expand Up @@ -220,6 +220,23 @@ def resource(
) -> Callable[[Callable[TResourceFunParams, Any]], DltResource]:
...

@overload
def resource(
data: None = ...,
/,
name: str = None,
table_name: TTableHintTemplate[str] = None,
write_disposition: TTableHintTemplate[TWriteDisposition] = None,
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
standalone: Literal[True] = True
) -> Callable[[Callable[TResourceFunParams, Any]], Callable[TResourceFunParams, DltResource]]:
...


@overload
def resource(
data: Union[List[Any], Tuple[Any], Iterator[Any]],
Expand Down Expand Up @@ -247,6 +264,7 @@ def resource(
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
standalone: bool = False,
data_from: TUnboundDltResource = None,
) -> Any:
"""When used as a decorator, transforms any generator (yielding) function into a `dlt resource`. When used as a function, it transforms data in `data` argument into a `dlt resource`.
Expand Down Expand Up @@ -297,6 +315,8 @@ def resource(
spec (Type[BaseConfiguration], optional): A specification of configuration and secret values required by the source.
standalone (bool, optional): Returns a wrapped decorated function that creates DltResource instance. Must be called before use. Cannot be part of a source.
data_from (TUnboundDltResource, optional): Allows to pipe data from one resource to another to build multi-step pipelines.
Raises:
Expand Down Expand Up @@ -355,7 +375,14 @@ def decorator(f: Callable[TResourceFunParams, Any]) -> Callable[TResourceFunPara
if not is_inner_resource:
_SOURCES[f.__qualname__] = SourceInfo(SPEC, f, func_module)

return make_resource(resource_name, source_section, conf_f, incremental)
@wraps(conf_f)
def _wrap(*args: Any, **kwargs: Any) -> DltResource:
return make_resource(resource_name, source_section, conf_f(*args, **kwargs), incremental)

if standalone:
return _wrap
else:
return make_resource(resource_name, source_section, conf_f, incremental)

# if data is callable or none use decorator
if data is None:
Expand Down
18 changes: 16 additions & 2 deletions docs/website/docs/general-usage/resource.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,20 @@ In the example above, `user_details` will receive data from default instance of
pipeline.run(users(limit=100) | user_details)
```

### Declare a standalone resource
A standalone resource is defined on a function that is top level in a module (not inner function) that accepts config and secrets values. Additionally
if `standalone` flag is specified, the decorated function signature and docstring will be preserved. `dlt.resource` will just wrap the
function decorated function and user must call the wrapper to get the actual resource. Below we declare a `filesystem` resource that must be called before use.
```python
@dlt.resource(standalone=True)
def filesystem(bucket_url=dlt.config.value):
"""list and yield files in `bucket_url`"""
...

# `filesystem` must be called before it is extracted or used in any other way
pipeline.run(filesystem("s3://my-bucket/reports"), table_name="reports")
```

## Customize resources

### Filter, transform and pivot data
Expand Down Expand Up @@ -306,8 +320,8 @@ tables.users.table_name = "other_users"
### Duplicate and rename resources
There are cases when you your resources are generic (ie. bucket filesystem) and you want to load several instances of it (ie. files from different folders) to separate tables. In example below we use `filesystem` source to load csvs from two different folders into separate tables:
```python
@dlt.resource
def filesystem(bucket_url)
@dlt.resource(standalone=True)
def filesystem(bucket_url):
# list and yield files in bucket_url
...

Expand Down
50 changes: 49 additions & 1 deletion tests/extract/test_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import dlt
from dlt.common.configuration import known_sections
from dlt.common.configuration.container import Container
from dlt.common.configuration.exceptions import ConfigFieldMissingException
from dlt.common.configuration.inject import get_fun_spec
from dlt.common.configuration.resolve import inject_section
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext
Expand All @@ -18,7 +19,7 @@
from dlt.common.schema.typing import TTableSchemaColumns

from dlt.cli.source_detection import detect_source_configs
from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints, ExplicitSourceNameInvalid, InconsistentTableTemplate, InvalidResourceDataTypeFunctionNotAGenerator, InvalidResourceDataTypeIsNone, ParametrizedResourceUnbound, PipeNotBoundToData, ResourceFunctionExpected, ResourceInnerCallableConfigWrapDisallowed, SourceDataIsNone, SourceIsAClassTypeError, SourceNotAFunction, SourceSchemaNotAvailable
from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints, ExplicitSourceNameInvalid, InconsistentTableTemplate, InvalidResourceDataTypeFunctionNotAGenerator, InvalidResourceDataTypeIsNone, InvalidResourceDataTypeMultiplePipes, ParametrizedResourceUnbound, PipeGenInvalid, PipeNotBoundToData, ResourceFunctionExpected, ResourceInnerCallableConfigWrapDisallowed, SourceDataIsNone, SourceIsAClassTypeError, SourceNotAFunction, SourceSchemaNotAvailable
from dlt.extract.source import DltResource, DltSource
from dlt.common.schema.exceptions import InvalidSchemaName

Expand Down Expand Up @@ -586,6 +587,53 @@ def invalid_disposition():
assert "write_disposition" in str(py_ex.value)


# wrapped flag will not create the resource but just simple function wrapper that must be called before use
@dlt.resource(standalone=True)
def standalone_signature(init: int, secret_end: int = dlt.secrets.value):
"""Has fine docstring"""
yield from range(init, secret_end)


def test_standalone_resource() -> None:

# wrapped flag will not create the resource but just simple function wrapper that must be called before use
@dlt.resource(standalone=True)
def nice_signature(init: int):
"""Has nice signature"""
yield from range(init, 10)

assert not isinstance(nice_signature, DltResource)
assert callable(nice_signature)
assert nice_signature.__doc__ == """Has nice signature"""

assert list(nice_signature(7)) == [7, 8, 9]

# can't work in a source

@dlt.source
def nice_source():
return nice_signature

source = nice_source()
source.nice_signature.bind(7)
with pytest.raises(PipeGenInvalid):
assert list(source) == [7, 8, 9]

@dlt.source
def many_instances():
return nice_signature(9), nice_signature(7)

with pytest.raises(InvalidResourceDataTypeMultiplePipes):
source = many_instances()

with pytest.raises(ConfigFieldMissingException):
list(standalone_signature(1))

# make sure that config sections work
os.environ["SOURCES__TEST_DECORATORS__STANDALONE_SIGNATURE__SECRET_END"] = "5"
assert list(standalone_signature(1)) == [1, 2, 3, 4]


def test_class_source() -> None:

class _Source:
Expand Down
55 changes: 55 additions & 0 deletions tests/extract/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,9 @@ def test_illegal_double_bind() -> None:
def _r1():
yield ["a", "b", "c"]

assert _r1._bound is False
assert _r1()._bound is True

with pytest.raises(TypeError) as py_ex:
_r1()()
assert "Bound DltResource" in str(py_ex.value)
Expand All @@ -668,6 +671,15 @@ def _r1():
_r1.bind().bind()
assert "Bound DltResource" in str(py_ex.value)

bound_r = dlt.resource([1, 2, 3], name="rx")
assert bound_r._bound is True
with pytest.raises(TypeError):
_r1()

def _gen():
yield from [1, 2, 3]

assert dlt.resource(_gen())._bound is True


@dlt.resource
Expand Down Expand Up @@ -949,6 +961,49 @@ def number_source():
source.resources["unknown"]


def test_clone_resource_on_call():
@dlt.resource(name="gene")
def number_gen(init):
yield from range(init, init + 5)

@dlt.transformer()
def multiplier(number, mul):
return number * mul

gene_clone = number_gen(10)
assert gene_clone is not number_gen
assert gene_clone._pipe is not number_gen._pipe
assert gene_clone.name == number_gen.name

pipe = number_gen | multiplier
pipe_clone = pipe(4)
assert pipe_clone._pipe is not pipe._pipe
assert pipe._pipe is multiplier._pipe
# but parents are the same
assert pipe_clone._pipe.parent is number_gen._pipe
with pytest.raises(ParametrizedResourceUnbound):
list(pipe_clone)
# bind the original directly via pipe
pipe_clone._pipe.parent.bind_gen(10)
assert list(pipe_clone) == [40, 44, 48, 52, 56]


def test_clone_resource_on_bind():
@dlt.resource(name="gene")
def number_gen():
yield from range(1, 5)

@dlt.transformer
def multiplier(number, mul):
return number * mul

pipe = number_gen | multiplier
bound_pipe = pipe.bind(3)
assert bound_pipe is pipe is multiplier
assert bound_pipe._pipe is pipe._pipe
assert bound_pipe._pipe.parent is pipe._pipe.parent


def test_source_multiple_iterations() -> None:

def some_data():
Expand Down

0 comments on commit ad8bf0f

Please sign in to comment.