Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custom implementation for sources and typing improvment #687

Merged
merged 6 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None:
[pyarrow.field(
name,
get_py_arrow_datatype(schema_item, self._caps, self.timestamp_timezone),
nullable=schema_item["nullable"]
nullable=schema_item.get("nullable", True)
) for name, schema_item in columns_schema.items()]
)
# find row items that are of the complex type (could be abstracted out for use in other writers?)
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def remove_column_defaults(column_schema: TColumnSchema) -> TColumnSchema:
def add_column_defaults(column: TColumnSchemaBase) -> TColumnSchema:
"""Adds default boolean hints to column"""
return {
**{ # type:ignore
**{
"nullable": True,
"partition": False,
"cluster": False,
Expand Down
13 changes: 13 additions & 0 deletions dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,16 @@ def get_generic_type_argument_from_instance(instance: Any, sample_value: Optiona
if orig_param_type is Any and sample_value is not None:
orig_param_type = type(sample_value)
return orig_param_type # type: ignore


TInputArgs = ParamSpec("TInputArgs")
TReturnVal = TypeVar("TReturnVal")

def copy_sig(wrapper: Callable[TInputArgs, Any]) -> Callable[[Callable[..., TReturnVal]], Callable[TInputArgs, TReturnVal]]:
"""Copies docstring and signature from wrapper to func but keeps the func return value type"""

def decorator(func: Callable[..., TReturnVal]) -> Callable[TInputArgs, TReturnVal]:
func.__doc__ = wrapper.__doc__
return func

return decorator
6 changes: 1 addition & 5 deletions dlt/destinations/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@

from dlt.common.schema.utils import table_schema_has_type

BQT_TO_SCT: Dict[str, TDataType] = {

}


class BigQueryTypeMapper(TypeMapper):
sct_to_unbound_dbt = {
Expand Down Expand Up @@ -270,7 +266,7 @@ def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]
"foreign_key": False,
"cluster": c.name in (table.clustering_fields or []),
"partition": c.name == partition_field,
**self._from_db_type(c.field_type, c.precision, c.scale) # type: ignore[misc]
**self._from_db_type(c.field_type, c.precision, c.scale)
}
schema_table[c.name] = schema_c
return True, schema_table
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def _null_to_bool(v: str) -> bool:
schema_c: TColumnSchemaBase = {
"name": c[0],
"nullable": _null_to_bool(c[2]),
**self._from_db_type(c[1], numeric_precision, numeric_scale), # type: ignore[misc]
**self._from_db_type(c[1], numeric_precision, numeric_scale)
}
schema_table[c[0]] = schema_c # type: ignore
return True, schema_table
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/weaviate/weaviate_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]
for prop in class_schema["properties"]:
schema_c: TColumnSchema = {
"name": self.schema.naming.normalize_identifier(prop["name"]),
**self._from_db_type(prop["dataType"][0], None, None), # type: ignore[misc]
**self._from_db_type(prop["dataType"][0], None, None),
}
table_schema[prop["name"]] = schema_c
return True, table_schema
Expand Down
70 changes: 48 additions & 22 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import os
import inspect
import makefun
from types import ModuleType
from functools import wraps
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Iterator, List, Literal, Optional, Tuple, Type, TypeVar, Union, cast, overload
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Iterator, List, Literal, Optional, Tuple, Type, Union, cast, overload
from typing_extensions import TypeVar

from dlt.common.configuration import with_config, get_fun_spec, known_sections, configspec
from dlt.common.configuration.container import Container
Expand All @@ -21,14 +21,13 @@
from dlt.common.storages.schema_storage import SchemaStorage
from dlt.common.typing import AnyFun, ParamSpec, Concatenate, TDataItem, TDataItems
from dlt.common.utils import get_callable_name, get_module_name, is_inner_callable
from dlt.extract.exceptions import InvalidTransformerDataTypeGeneratorFunctionRequired, ResourceFunctionExpected, ResourceInnerCallableConfigWrapDisallowed, SourceDataIsNone, SourceIsAClassTypeError, ExplicitSourceNameInvalid, SourceNotAFunction, SourceSchemaNotAvailable
from dlt.extract.exceptions import DynamicNameNotStandaloneResource, InvalidTransformerDataTypeGeneratorFunctionRequired, ResourceFunctionExpected, ResourceInnerCallableConfigWrapDisallowed, SourceDataIsNone, SourceIsAClassTypeError, ExplicitSourceNameInvalid, SourceNotAFunction, SourceSchemaNotAvailable
from dlt.extract.incremental import IncrementalResourceWrapper

from dlt.extract.typing import TTableHintTemplate
from dlt.extract.source import DltResource, DltSource, TUnboundDltResource



@configspec
class SourceSchemaInjectableContext(ContainerInjectableContext):
"""A context containing the source schema, present when decorated function is executed"""
Expand All @@ -42,6 +41,7 @@ def __init__(self, schema: Schema = None) -> None:

TSourceFunParams = ParamSpec("TSourceFunParams")
TResourceFunParams = ParamSpec("TResourceFunParams")
TDltSourceImpl = TypeVar("TDltSourceImpl", bound=DltSource, default=DltSource)


@overload
Expand All @@ -53,8 +53,9 @@ def source(
max_table_nesting: int = None,
root_key: bool = False,
schema: Schema = None,
spec: Type[BaseConfiguration] = None
) -> Callable[TSourceFunParams, DltSource]:
spec: Type[BaseConfiguration] = None,
_impl_cls: Type[TDltSourceImpl] = DltSource # type: ignore[assignment]
) -> Callable[TSourceFunParams, TDltSourceImpl]:
...

@overload
Expand All @@ -66,8 +67,9 @@ def source(
max_table_nesting: int = None,
root_key: bool = False,
schema: Schema = None,
spec: Type[BaseConfiguration] = None
) -> Callable[[Callable[TSourceFunParams, Any]], Callable[TSourceFunParams, DltSource]]:
spec: Type[BaseConfiguration] = None,
_impl_cls: Type[TDltSourceImpl] = DltSource # type: ignore[assignment]
) -> Callable[[Callable[TSourceFunParams, Any]], Callable[TSourceFunParams, TDltSourceImpl]]:
...

def source(
Expand All @@ -78,7 +80,8 @@ def source(
max_table_nesting: int = None,
root_key: bool = False,
schema: Schema = None,
spec: Type[BaseConfiguration] = None
spec: Type[BaseConfiguration] = None,
_impl_cls: Type[TDltSourceImpl] = DltSource # type: ignore[assignment]
) -> Any:
"""A decorator that transforms a function returning one or more `dlt resources` into a `dlt source` in order to load it with `dlt`.

Expand Down Expand Up @@ -114,14 +117,16 @@ def source(

spec (Type[BaseConfiguration], optional): A specification of configuration and secret values required by the source.

_impl_cls (Type[TDltSourceImpl], optional): A custom implementation of DltSource, may be also used to providing just a typing stub

Returns:
`DltSource` instance
"""

if name and schema:
raise ArgumentsOverloadException("'name' has no effect when `schema` argument is present", source.__name__)

def decorator(f: Callable[TSourceFunParams, Any]) -> Callable[TSourceFunParams, DltSource]:
def decorator(f: Callable[TSourceFunParams, Any]) -> Callable[TSourceFunParams, TDltSourceImpl]:
nonlocal schema, name

if not callable(f) or isinstance(f, DltResource):
Expand Down Expand Up @@ -151,7 +156,7 @@ def decorator(f: Callable[TSourceFunParams, Any]) -> Callable[TSourceFunParams,
conf_f = with_config(f, spec=spec, sections=source_sections)

@wraps(conf_f)
def _wrap(*args: Any, **kwargs: Any) -> DltSource:
def _wrap(*args: Any, **kwargs: Any) -> TDltSourceImpl:
# make schema available to the source
with Container().injectable_context(SourceSchemaInjectableContext(schema)):
# configurations will be accessed in this section in the source
Expand All @@ -166,7 +171,7 @@ def _wrap(*args: Any, **kwargs: Any) -> DltSource:
rv = list(rv)

# convert to source
s = DltSource.from_data(name, source_section, schema.clone(update_normalizers=True), rv)
s = _impl_cls.from_data(name, source_section, schema.clone(update_normalizers=True), rv)
# apply hints
if max_table_nesting is not None:
s.max_table_nesting = max_table_nesting
Expand Down Expand Up @@ -225,7 +230,7 @@ def resource(
def resource(
data: None = ...,
/,
name: str = None,
name: TTableHintTemplate[str] = None,
table_name: TTableHintTemplate[str] = None,
write_disposition: TTableHintTemplate[TWriteDisposition] = None,
columns: TTableHintTemplate[TAnySchemaColumns] = None,
Expand Down Expand Up @@ -257,7 +262,7 @@ def resource(
def resource(
data: Optional[Any] = None,
/,
name: str = None,
name: TTableHintTemplate[str] = None,
table_name: TTableHintTemplate[str] = None,
write_disposition: TTableHintTemplate[TWriteDisposition] = None,
columns: TTableHintTemplate[TAnySchemaColumns] = None,
Expand Down Expand Up @@ -344,8 +349,11 @@ def decorator(f: Callable[TResourceFunParams, Any]) -> Callable[TResourceFunPara
# raise more descriptive exception if we construct transformer
raise InvalidTransformerDataTypeGeneratorFunctionRequired(name or "<no name>", f, type(f))
raise ResourceFunctionExpected(name or "<no name>", f, type(f))
if not standalone and callable(name):
raise DynamicNameNotStandaloneResource(get_callable_name(f))

resource_name = name or get_callable_name(f)
# resource_section = name if name and not callable(name) else get_callable_name(f)
resource_name = name if name and not callable(name) else get_callable_name(f)

# do not inject config values for inner functions, we assume that they are part of the source
SPEC: Type[BaseConfiguration] = None
Expand Down Expand Up @@ -385,12 +393,13 @@ def decorator(f: Callable[TResourceFunParams, Any]) -> Callable[TResourceFunPara

@wraps(conf_f)
def _wrap(*args: Any, **kwargs: Any) -> DltResource:
sig = simulate_func_call(conf_f, skip_args, *args, **kwargs)
r = make_resource(resource_name, source_section, compat_wrapper(resource_name, conf_f, sig, *args, **kwargs), incremental)
_, mod_sig, bound_args = simulate_func_call(conf_f, skip_args, *args, **kwargs)
actual_resource_name = name(bound_args.arguments) if callable(name) else resource_name
r = make_resource(actual_resource_name, source_section, compat_wrapper(actual_resource_name, conf_f, sig, *args, **kwargs), incremental)
# consider transformer arguments bound
r._args_bound = True
# keep explicit args passed
r._set_explicit_args(conf_f, sig, *args, **kwargs)
r._set_explicit_args(conf_f, mod_sig, *args, **kwargs)
return r
return _wrap
else:
Expand All @@ -410,7 +419,7 @@ def _wrap(*args: Any, **kwargs: Any) -> DltResource:
name = name or get_callable_name(data) # type: ignore
func_module = inspect.getmodule(data.gi_frame)
source_section = _get_source_section_name(func_module)

assert not callable(name)
return make_resource(name, source_section, data)


Expand All @@ -435,7 +444,7 @@ def transformer(
f: None = ...,
/,
data_from: TUnboundDltResource = DltResource.Empty,
name: str = None,
name: TTableHintTemplate[str] = None,
table_name: TTableHintTemplate[str] = None,
write_disposition: TTableHintTemplate[TWriteDisposition] = None,
columns: TTableHintTemplate[TAnySchemaColumns] = None,
Expand Down Expand Up @@ -463,11 +472,28 @@ def transformer(
) -> DltResource:
...

@overload
def transformer(
f: Callable[Concatenate[TDataItem, TResourceFunParams], Any],
/,
data_from: TUnboundDltResource = DltResource.Empty,
name: TTableHintTemplate[str] = None,
table_name: TTableHintTemplate[str] = None,
write_disposition: TTableHintTemplate[TWriteDisposition] = None,
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
standalone: Literal[True] = True
) -> Callable[..., DltResource]: # TODO: change back to Callable[TResourceFunParams, DltResource] when mypy 1.6 is fixed
...

def transformer(
f: Optional[Callable[Concatenate[TDataItem, TResourceFunParams], Any]] = None,
/,
data_from: TUnboundDltResource = DltResource.Empty,
name: str = None,
name: TTableHintTemplate[str] = None,
table_name: TTableHintTemplate[str] = None,
write_disposition: TTableHintTemplate[TWriteDisposition] = None,
columns: TTableHintTemplate[TAnySchemaColumns] = None,
Expand All @@ -476,7 +502,7 @@ def transformer(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
standalone: bool = False
) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], DltResource]:
) -> Any:
"""A form of `dlt resource` that takes input from other resources via `data_from` argument in order to enrich or transform the data.

The decorated function `f` must take at least one argument of type TDataItems (a single item or list of items depending on the resource `data_from`). `dlt` will pass
Expand Down
4 changes: 4 additions & 0 deletions dlt/extract/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ def __init__(self) -> None:
Please note that for resources created from functions or generators, the name is the function name by default.""")


class DynamicNameNotStandaloneResource(DltResourceException):
def __init__(self, resource_name: str) -> None:
super().__init__(resource_name, "You must set the resource as standalone to be able to dynamically set its name based on call arguments")

# class DependentResourceIsNotCallable(DltResourceException):
# def __init__(self, resource_name: str) -> None:
# super().__init__(resource_name, f"Attempted to call the dependent resource {resource_name}. Do not call the dependent resources. They will be called only when iterated.")
Expand Down
40 changes: 23 additions & 17 deletions dlt/extract/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
InvalidResourceDataTypeFunctionNotAGenerator, InvalidTransformerGeneratorFunction, ParametrizedResourceUnbound,
PipeException, PipeGenInvalid, PipeItemProcessingError, PipeNotBoundToData, ResourceExtractionError)
from dlt.extract.typing import DataItemWithMeta, ItemTransform, SupportsPipe, TPipedDataItems
from dlt.extract.utils import simulate_func_call, wrap_compat_transformer, wrap_resource_gen
from dlt.extract.utils import check_compat_transformer, simulate_func_call, wrap_compat_transformer, wrap_resource_gen

if TYPE_CHECKING:
TItemFuture = Future[Union[TDataItems, DataItemWithMeta]]
Expand Down Expand Up @@ -296,7 +296,7 @@ def _wrap_gen(self, *args: Any, **kwargs: Any) -> Any:
# skip the data item argument for transformers
args_to_skip = 1 if self.has_parent else 0
# simulate function call
sig = simulate_func_call(head, args_to_skip, *args, **kwargs)
sig, _, _ = simulate_func_call(head, args_to_skip, *args, **kwargs)
assert callable(head)

# create wrappers with partial
Expand Down Expand Up @@ -324,28 +324,34 @@ def _wrap_transform_step_meta(self, step_no: int, step: TPipeStep) -> TPipeStep:
else:
# check the signature
sig = inspect.signature(step)
sig_arg_count = len(sig.parameters)
callable_name = get_callable_name(step)
if sig_arg_count == 0:
raise InvalidStepFunctionArguments(self.name, callable_name, sig, "Function takes no arguments")
# see if meta is present in kwargs
meta_arg = next((p for p in sig.parameters.values() if p.name == "meta"), None)
if meta_arg is not None:
if meta_arg.kind not in (meta_arg.KEYWORD_ONLY, meta_arg.POSITIONAL_OR_KEYWORD):
raise InvalidStepFunctionArguments(self.name, callable_name, sig, "'meta' cannot be pos only argument '")
elif meta_arg is None:
meta_arg = check_compat_transformer(self.name, step, sig)
# sig_arg_count = len(sig.parameters)
# callable_name = get_callable_name(step)
# if sig_arg_count == 0:
# raise InvalidStepFunctionArguments(self.name, callable_name, sig, "Function takes no arguments")
# # see if meta is present in kwargs
# meta_arg = next((p for p in sig.parameters.values() if p.name == "meta"), None)
# if meta_arg is not None:
# if meta_arg.kind not in (meta_arg.KEYWORD_ONLY, meta_arg.POSITIONAL_OR_KEYWORD):
# raise InvalidStepFunctionArguments(self.name, callable_name, sig, "'meta' cannot be pos only argument '")
if meta_arg is None:
# add meta parameter when not present
orig_step = step

def _partial(*args: Any, **kwargs: Any) -> Any:
# orig step does not have meta
del kwargs["meta"]
kwargs.pop("meta", None)
# del kwargs["meta"]
return orig_step(*args, **kwargs)

step = makefun.wraps(
step,
append_args=inspect.Parameter("meta", inspect._ParameterKind.KEYWORD_ONLY, default=None)
)(_partial)
meta_arg = inspect.Parameter("meta", inspect._ParameterKind.KEYWORD_ONLY, default=None)
kwargs_arg = next((p for p in sig.parameters.values() if p.kind == inspect.Parameter.VAR_KEYWORD), None)
if kwargs_arg:
# pass meta in variadic
new_sig = sig
else:
new_sig = makefun.add_signature_parameters(sig, last=(meta_arg,))
step = makefun.wraps(step, new_sig=new_sig)(_partial)

# verify the step callable, gen may be parametrized and will be evaluated at run time
if not self.is_empty:
Expand Down
Loading