diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index f9f62a23ff..d38c7125b2 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -2,7 +2,7 @@ import inspect from types import ModuleType from functools import wraps -from typing import TYPE_CHECKING, Any, Callable, ClassVar, Iterator, List, Optional, Tuple, Type, TypeVar, Union, cast, overload +from typing import TYPE_CHECKING, Any, Callable, ClassVar, Iterator, List, Literal, Optional, Tuple, Type, TypeVar, Union, cast, overload from dlt.common.configuration import with_config, get_fun_spec, known_sections, configspec from dlt.common.configuration.container import Container @@ -220,6 +220,23 @@ def resource( ) -> Callable[[Callable[TResourceFunParams, Any]], DltResource]: ... +@overload +def resource( + data: None = ..., + /, + name: str = None, + table_name: TTableHintTemplate[str] = None, + write_disposition: TTableHintTemplate[TWriteDisposition] = None, + columns: TTableHintTemplate[TAnySchemaColumns] = None, + primary_key: TTableHintTemplate[TColumnNames] = None, + merge_key: TTableHintTemplate[TColumnNames] = None, + selected: bool = True, + spec: Type[BaseConfiguration] = None, + standalone: Literal[True] = True +) -> Callable[[Callable[TResourceFunParams, Any]], Callable[TResourceFunParams, DltResource]]: + ... + + @overload def resource( data: Union[List[Any], Tuple[Any], Iterator[Any]], @@ -247,6 +264,7 @@ def resource( merge_key: TTableHintTemplate[TColumnNames] = None, selected: bool = True, spec: Type[BaseConfiguration] = None, + standalone: bool = False, data_from: TUnboundDltResource = None, ) -> Any: """When used as a decorator, transforms any generator (yielding) function into a `dlt resource`. When used as a function, it transforms data in `data` argument into a `dlt resource`. @@ -297,6 +315,8 @@ def resource( spec (Type[BaseConfiguration], optional): A specification of configuration and secret values required by the source. + standalone (bool, optional): Returns a wrapped decorated function that creates DltResource instance. Must be called before use. Cannot be part of a source. + data_from (TUnboundDltResource, optional): Allows to pipe data from one resource to another to build multi-step pipelines. Raises: @@ -355,7 +375,14 @@ def decorator(f: Callable[TResourceFunParams, Any]) -> Callable[TResourceFunPara if not is_inner_resource: _SOURCES[f.__qualname__] = SourceInfo(SPEC, f, func_module) - return make_resource(resource_name, source_section, conf_f, incremental) + @wraps(conf_f) + def _wrap(*args: Any, **kwargs: Any) -> DltResource: + return make_resource(resource_name, source_section, conf_f(*args, **kwargs), incremental) + + if standalone: + return _wrap + else: + return make_resource(resource_name, source_section, conf_f, incremental) # if data is callable or none use decorator if data is None: diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index 2d45efae52..a22141046a 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -212,6 +212,20 @@ In the example above, `user_details` will receive data from default instance of pipeline.run(users(limit=100) | user_details) ``` +### Declare a standalone resource +A standalone resource is defined on a function that is top level in a module (not inner function) that accepts config and secrets values. Additionally +if `standalone` flag is specified, the decorated function signature and docstring will be preserved. `dlt.resource` will just wrap the +function decorated function and user must call the wrapper to get the actual resource. Below we declare a `filesystem` resource that must be called before use. +```python +@dlt.resource(standalone=True) +def filesystem(bucket_url=dlt.config.value): + """list and yield files in `bucket_url`""" + ... + +# `filesystem` must be called before it is extracted or used in any other way +pipeline.run(filesystem("s3://my-bucket/reports"), table_name="reports") +``` + ## Customize resources ### Filter, transform and pivot data @@ -306,8 +320,8 @@ tables.users.table_name = "other_users" ### Duplicate and rename resources There are cases when you your resources are generic (ie. bucket filesystem) and you want to load several instances of it (ie. files from different folders) to separate tables. In example below we use `filesystem` source to load csvs from two different folders into separate tables: ```python -@dlt.resource -def filesystem(bucket_url) +@dlt.resource(standalone=True) +def filesystem(bucket_url): # list and yield files in bucket_url ... diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index aa3cd27619..9c065b2dc5 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -7,6 +7,7 @@ import dlt from dlt.common.configuration import known_sections from dlt.common.configuration.container import Container +from dlt.common.configuration.exceptions import ConfigFieldMissingException from dlt.common.configuration.inject import get_fun_spec from dlt.common.configuration.resolve import inject_section from dlt.common.configuration.specs.config_section_context import ConfigSectionContext @@ -18,7 +19,7 @@ from dlt.common.schema.typing import TTableSchemaColumns from dlt.cli.source_detection import detect_source_configs -from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints, ExplicitSourceNameInvalid, InconsistentTableTemplate, InvalidResourceDataTypeFunctionNotAGenerator, InvalidResourceDataTypeIsNone, ParametrizedResourceUnbound, PipeNotBoundToData, ResourceFunctionExpected, ResourceInnerCallableConfigWrapDisallowed, SourceDataIsNone, SourceIsAClassTypeError, SourceNotAFunction, SourceSchemaNotAvailable +from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints, ExplicitSourceNameInvalid, InconsistentTableTemplate, InvalidResourceDataTypeFunctionNotAGenerator, InvalidResourceDataTypeIsNone, InvalidResourceDataTypeMultiplePipes, ParametrizedResourceUnbound, PipeGenInvalid, PipeNotBoundToData, ResourceFunctionExpected, ResourceInnerCallableConfigWrapDisallowed, SourceDataIsNone, SourceIsAClassTypeError, SourceNotAFunction, SourceSchemaNotAvailable from dlt.extract.source import DltResource, DltSource from dlt.common.schema.exceptions import InvalidSchemaName @@ -586,6 +587,53 @@ def invalid_disposition(): assert "write_disposition" in str(py_ex.value) +# wrapped flag will not create the resource but just simple function wrapper that must be called before use +@dlt.resource(standalone=True) +def standalone_signature(init: int, secret_end: int = dlt.secrets.value): + """Has fine docstring""" + yield from range(init, secret_end) + + +def test_standalone_resource() -> None: + + # wrapped flag will not create the resource but just simple function wrapper that must be called before use + @dlt.resource(standalone=True) + def nice_signature(init: int): + """Has nice signature""" + yield from range(init, 10) + + assert not isinstance(nice_signature, DltResource) + assert callable(nice_signature) + assert nice_signature.__doc__ == """Has nice signature""" + + assert list(nice_signature(7)) == [7, 8, 9] + + # can't work in a source + + @dlt.source + def nice_source(): + return nice_signature + + source = nice_source() + source.nice_signature.bind(7) + with pytest.raises(PipeGenInvalid): + assert list(source) == [7, 8, 9] + + @dlt.source + def many_instances(): + return nice_signature(9), nice_signature(7) + + with pytest.raises(InvalidResourceDataTypeMultiplePipes): + source = many_instances() + + with pytest.raises(ConfigFieldMissingException): + list(standalone_signature(1)) + + # make sure that config sections work + os.environ["SOURCES__TEST_DECORATORS__STANDALONE_SIGNATURE__SECRET_END"] = "5" + assert list(standalone_signature(1)) == [1, 2, 3, 4] + + def test_class_source() -> None: class _Source: diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 506cefa355..7739abeb0f 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -660,6 +660,9 @@ def test_illegal_double_bind() -> None: def _r1(): yield ["a", "b", "c"] + assert _r1._bound is False + assert _r1()._bound is True + with pytest.raises(TypeError) as py_ex: _r1()() assert "Bound DltResource" in str(py_ex.value) @@ -668,6 +671,15 @@ def _r1(): _r1.bind().bind() assert "Bound DltResource" in str(py_ex.value) + bound_r = dlt.resource([1, 2, 3], name="rx") + assert bound_r._bound is True + with pytest.raises(TypeError): + _r1() + + def _gen(): + yield from [1, 2, 3] + + assert dlt.resource(_gen())._bound is True @dlt.resource @@ -949,6 +961,49 @@ def number_source(): source.resources["unknown"] +def test_clone_resource_on_call(): + @dlt.resource(name="gene") + def number_gen(init): + yield from range(init, init + 5) + + @dlt.transformer() + def multiplier(number, mul): + return number * mul + + gene_clone = number_gen(10) + assert gene_clone is not number_gen + assert gene_clone._pipe is not number_gen._pipe + assert gene_clone.name == number_gen.name + + pipe = number_gen | multiplier + pipe_clone = pipe(4) + assert pipe_clone._pipe is not pipe._pipe + assert pipe._pipe is multiplier._pipe + # but parents are the same + assert pipe_clone._pipe.parent is number_gen._pipe + with pytest.raises(ParametrizedResourceUnbound): + list(pipe_clone) + # bind the original directly via pipe + pipe_clone._pipe.parent.bind_gen(10) + assert list(pipe_clone) == [40, 44, 48, 52, 56] + + +def test_clone_resource_on_bind(): + @dlt.resource(name="gene") + def number_gen(): + yield from range(1, 5) + + @dlt.transformer + def multiplier(number, mul): + return number * mul + + pipe = number_gen | multiplier + bound_pipe = pipe.bind(3) + assert bound_pipe is pipe is multiplier + assert bound_pipe._pipe is pipe._pipe + assert bound_pipe._pipe.parent is pipe._pipe.parent + + def test_source_multiple_iterations() -> None: def some_data():