Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds destination registry #2183

Draft
wants to merge 4 commits into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions dlt/common/configuration/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def with_config(
/,
spec: Type[BaseConfiguration] = None,
sections: Union[str, Tuple[str, ...]] = (),
section_arg_name: str = None,
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: BaseConfiguration = None,
Expand All @@ -48,8 +48,8 @@ def with_config(
/,
spec: Type[BaseConfiguration] = None,
sections: Union[str, Tuple[str, ...]] = (),
section_arg_name: str = None,
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: Optional[BaseConfiguration] = None,
Expand All @@ -63,8 +63,8 @@ def with_config(
/,
spec: Type[BaseConfiguration] = None,
sections: Union[str, Tuple[str, ...]] = (),
section_arg_name: str = None,
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: Optional[BaseConfiguration] = None,
Expand All @@ -80,8 +80,9 @@ def with_config(
func (Optional[AnyFun], optional): A function with arguments to be injected. Defaults to None.
spec (Type[BaseConfiguration], optional): A specification of injectable arguments. Defaults to None.
sections (Tuple[str, ...], optional): A set of config sections in which to look for arguments values. Defaults to ().
section_arg_name (bool, optional): Name of the argument in the signature of the decorated function which will be used to extend `sections` tuple.
A top level pipeline section will be added if argument name is `pipeline_name`
prefer_existing_sections: (bool, optional): When joining existing section context, the existing context will be preferred to the one in `sections`. Default: False
auto_pipeline_section (bool, optional): If True, a top level pipeline section will be added if `pipeline_name` argument is present . Defaults to False.
include_defaults (bool, optional): If True then arguments with default values will be included in synthesized spec. If False only the required arguments marked with `dlt.secrets.value` and `dlt.config.value` are included
base (Type[BaseConfiguration], optional): A base class for synthesized spec. Defaults to BaseConfiguration.
lock_context_on_injection (bool, optional): If True, the thread context will be locked during injection to prevent race conditions. Defaults to True.
Expand Down Expand Up @@ -115,7 +116,7 @@ def decorator(f: TFun) -> TFun:
return f

spec_arg: Parameter = None
pipeline_name_arg: Parameter = None
section_name_arg: Parameter = None

for p in sig.parameters.values():
# for all positional parameters that do not have default value, set default
Expand All @@ -124,10 +125,10 @@ def decorator(f: TFun) -> TFun:
if p.annotation is SPEC:
# if any argument has type SPEC then us it to take initial value
spec_arg = p
if p.name == "pipeline_name" and auto_pipeline_section:
# if argument has name pipeline_name and auto_section is used, use it to generate section context
pipeline_name_arg = p
pipeline_name_arg_default = None if p.default == Parameter.empty else p.default
if p.name == section_arg_name:
# add value of section_name_arg to
section_name_arg = p
section_name_arg_default = None if p.default == Parameter.empty else p.default

def resolve_config(
bound_args: inspect.BoundArguments, accept_partial_: bool
Expand All @@ -146,16 +147,27 @@ def resolve_config(
# sections may be a string
if isinstance(curr_sections, str):
curr_sections = (curr_sections,)
# extend sections with section_name_arg
if section_name_arg:
section_extension = bound_args.arguments.get(
section_name_arg.name, section_name_arg_default
)
if section_extension:
curr_sections = (
curr_sections + (section_extension,)
if curr_sections
else (section_extension,)
)

# if one of arguments is spec the use it as initial value
if initial_config:
config = initial_config
elif spec_arg:
config = bound_args.arguments.get(spec_arg.name, None)
# resolve SPEC, also provide section_context with pipeline_name
if pipeline_name_arg:
if section_name_arg and section_name_arg.name == "pipeline_name":
curr_pipeline_name = bound_args.arguments.get(
pipeline_name_arg.name, pipeline_name_arg_default
section_name_arg.name, section_name_arg_default
)
else:
curr_pipeline_name = None
Expand Down
13 changes: 12 additions & 1 deletion dlt/common/configuration/specs/known_sections.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
# TODO: register layout types and specs for each top level sections so we can generate full config typing

SOURCES = "sources"
"""a top section holding source and resource configs often within their own sections named after modules they are in"""

DESTINATION = "destination"
"""a top section holding sections named after particular destinations with configurations and credentials."""
"""a top section holding sections named after particular destinations with configurations and credentials. NOTE: will be deprecated"""

DESTINATIONS = "destinations"
"""a top section holding sections named after particular destinations with configurations and credentials. NOTE: not yet supported"""

PIPELINES = "pipelines"
"""a top section holding pipeline configurations"""

DATASETS = "datasets"
"""a top section holding dataset configurations"""

LOAD = "load"
"""load and load storage configuration"""
Expand Down
Loading
Loading