Skip to content

Commit

Permalink
Parametrized destinations (#746)
Browse files Browse the repository at this point in the history
* Move destination modules to subfolder

* Mockup destination factory

* Destination factory replacing reference and dest __init__

* Update factories

* Defer duckdb credentials resolving in pipeline context

* Simplify destination config resolution

* capabilities are callable

* bigquery, athena factories

* Add rest of factories

* Cleanup

* Destination type vars

* Cleanup

* Fix test

* Create initial config from non-defaults only

* Update naming convention path

* Fix config in bigquery location test

* Only keep non-default config args in factory

* Resolve duckdb credentials in pipeline context

* Cleanup

* Union credentials arguments

* Common tests without dest dependencies

* Forward all athena arguments

* Delete commented code

* Reference docstrings

* Add deprecation warning for credentials argument

* Init docstrings for destination factories

* Fix tests

* Destination name in output

* Correct exception in unknown destination test

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
steinitzu and rudolfix authored Nov 18, 2023
1 parent 582448c commit 105795c
Show file tree
Hide file tree
Showing 123 changed files with 978 additions and 593 deletions.
2 changes: 2 additions & 0 deletions dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from dlt.extract.decorators import source, resource, transformer, defer
from dlt.pipeline import pipeline as _pipeline, run, attach, Pipeline, dbt, current as _current, mark as _mark
from dlt.pipeline import progress
from dlt import destinations

pipeline = _pipeline
current = _current
Expand Down Expand Up @@ -64,4 +65,5 @@
"TSecretValue",
"TCredentials",
"sources",
"destinations",
]
4 changes: 2 additions & 2 deletions dlt/cli/deploy_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from dlt.version import DLT_PKG_NAME

from dlt.common.destination.reference import DestinationReference
from dlt.common.destination.reference import Destination

REQUIREMENTS_GITHUB_ACTION = "requirements_github_action.txt"
DLT_DEPLOY_DOCS_URL = "https://dlthub.com/docs/walkthroughs/deploy-a-pipeline"
Expand Down Expand Up @@ -198,7 +198,7 @@ def __init__(
def _generate_workflow(self, *args: Optional[Any]) -> None:
self.deployment_method = DeploymentMethods.airflow_composer.value

req_dep = f"{DLT_PKG_NAME}[{DestinationReference.to_name(self.state['destination'])}]"
req_dep = f"{DLT_PKG_NAME}[{Destination.to_name(self.state['destination'])}]"
req_dep_line = f"{req_dep}>={pkg_version(DLT_PKG_NAME)}"

self.artifacts["requirements_txt"] = req_dep_line
Expand Down
6 changes: 3 additions & 3 deletions dlt/cli/init_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dlt.common.pipeline import get_dlt_repos_dir
from dlt.common.source import _SOURCES
from dlt.version import DLT_PKG_NAME, __version__
from dlt.common.destination import DestinationReference
from dlt.common.destination import Destination
from dlt.common.reflection.utils import rewrite_python_script
from dlt.common.schema.utils import is_valid_schema_name
from dlt.common.schema.exceptions import InvalidSchemaName
Expand Down Expand Up @@ -160,8 +160,8 @@ def list_verified_sources_command(repo_location: str, branch: str = None) -> Non

def init_command(source_name: str, destination_name: str, use_generic_template: bool, repo_location: str, branch: str = None) -> None:
# try to import the destination and get config spec
destination_reference = DestinationReference.from_name(destination_name)
destination_spec = destination_reference.spec()
destination_reference = Destination.from_reference(destination_name)
destination_spec = destination_reference.spec

fmt.echo("Looking up the init scripts in %s..." % fmt.bold(repo_location))
clone_storage = git.get_fresh_repo_files(repo_location, get_dlt_repos_dir(), branch=branch)
Expand Down
2 changes: 1 addition & 1 deletion dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
fmt.warning(warning)
return

fmt.echo("About to drop the following data in dataset %s in destination %s:" % (fmt.bold(drop.info["dataset_name"]), fmt.bold(p.destination.__name__)))
fmt.echo("About to drop the following data in dataset %s in destination %s:" % (fmt.bold(drop.info["dataset_name"]), fmt.bold(p.destination.name)))
fmt.echo("%s: %s" % (fmt.style("Selected schema", fg="green"), drop.info["schema_name"]))
fmt.echo("%s: %s" % (fmt.style("Selected resource(s)", fg="green"), drop.info["resource_names"]))
fmt.echo("%s: %s" % (fmt.style("Table(s) to drop", fg="green"), drop.info["tables"]))
Expand Down
16 changes: 11 additions & 5 deletions dlt/common/configuration/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def with_config(
sections: Tuple[str, ...] = (),
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True
include_defaults: bool = True,
accept_partial: bool = False,
) -> TFun:
...

Expand All @@ -45,7 +46,8 @@ def with_config(
sections: Tuple[str, ...] = (),
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True
include_defaults: bool = True,
accept_partial: bool = False,
) -> Callable[[TFun], TFun]:
...

Expand All @@ -57,7 +59,9 @@ def with_config(
sections: Tuple[str, ...] = (),
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: Optional[BaseConfiguration] = None,
) -> Callable[[TFun], TFun]:
"""Injects values into decorated function arguments following the specification in `spec` or by deriving one from function's signature.
Expand Down Expand Up @@ -127,7 +131,9 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
curr_sections = sections

# if one of arguments is spec the use it as initial value
if spec_arg:
if initial_config:
config = initial_config
elif spec_arg:
config = bound_args.arguments.get(spec_arg.name, None)
# resolve SPEC, also provide section_context with pipeline_name
if pipeline_name_arg:
Expand All @@ -139,7 +145,7 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
with _RESOLVE_LOCK:
with inject_section(section_context):
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
config = resolve_configuration(config or SPEC(), explicit_value=bound_args.arguments)
config = resolve_configuration(config or SPEC(), explicit_value=bound_args.arguments, accept_partial=accept_partial)
resolved_params = dict(config)
# overwrite or add resolved params
for p in sig.parameters.values():
Expand Down
5 changes: 3 additions & 2 deletions dlt/common/destination/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from dlt.common.destination.capabilities import DestinationCapabilitiesContext, TLoaderFileFormat, ALL_SUPPORTED_FILE_FORMATS
from dlt.common.destination.reference import DestinationReference, TDestinationReferenceArg
from dlt.common.destination.reference import TDestinationReferenceArg, Destination, TDestination

__all__ = [
"DestinationCapabilitiesContext",
"TLoaderFileFormat",
"ALL_SUPPORTED_FILE_FORMATS",
"DestinationReference",
"TDestinationReferenceArg",
"Destination",
"TDestination",
]
135 changes: 91 additions & 44 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
from abc import ABC, abstractmethod, abstractproperty
from importlib import import_module
from types import TracebackType, ModuleType
from typing import ClassVar, Final, Optional, NamedTuple, Literal, Sequence, Iterable, Type, Protocol, Union, TYPE_CHECKING, cast, List, ContextManager, Dict, Any
from typing import ClassVar, Final, Optional, NamedTuple, Literal, Sequence, Iterable, Type, Protocol, Union, TYPE_CHECKING, cast, List, ContextManager, Dict, Any, Callable, TypeVar, Generic
from contextlib import contextmanager
import datetime # noqa: 251
from copy import deepcopy
import inspect

from dlt.common import logger
from dlt.common.exceptions import IdentifierTooLongException, InvalidDestinationReference, UnknownDestinationModule
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.schema.exceptions import InvalidDatasetName
from dlt.common.schema.utils import get_write_disposition, get_table_format
from dlt.common.configuration import configspec
from dlt.common.configuration import configspec, with_config, resolve_configuration, known_sections
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.accessors import config
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
Expand All @@ -23,7 +24,10 @@
from dlt.common.utils import get_module_name
from dlt.common.configuration.specs import GcpCredentials, AwsCredentialsWithoutDefaults


TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]
TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration")
TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase")


class StorageSchemaInfo(NamedTuple):
Expand Down Expand Up @@ -344,59 +348,102 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable
# the default is to truncate the tables on the staging destination...
return True

TDestinationReferenceArg = Union["DestinationReference", ModuleType, None, str]
TDestinationReferenceArg = Union[str, "Destination", None]


class DestinationReference(Protocol):
__name__: str
"""Name of the destination"""
class Destination(ABC, Generic[TDestinationConfig, TDestinationClient]):
"""A destination factory that can be partially pre-configured
with credentials and other config params.
"""
config_params: Optional[Dict[str, Any]] = None

def __init__(self, **kwargs: Any) -> None:
# Create initial unresolved destination config
# Argument defaults are filtered out here because we only want arguments passed explicitly
# to supersede config from the environment or pipeline args
sig = inspect.signature(self.__class__)
params = sig.parameters
self.config_params = {
k: v for k, v in kwargs.items()
if k not in params or v != params[k].default
}

@property
@abstractmethod
def spec(self) -> Type[TDestinationConfig]:
"""A spec of destination configuration that also contains destination credentials"""
...

@abstractmethod
def capabilities(self) -> DestinationCapabilitiesContext:
"""Destination capabilities ie. supported loader file formats, identifier name lengths, naming conventions, escape function etc."""
...

def client(self, schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> "JobClientBase":
"""A job client responsible for starting and resuming load jobs"""
@property
def name(self) -> str:
return self.__class__.__name__

def spec(self) -> Type[DestinationClientConfiguration]:
"""A spec of destination configuration that also contains destination credentials"""
@property
@abstractmethod
def client_class(self) -> Type[TDestinationClient]:
"""A job client class responsible for starting and resuming load jobs"""
...

def configuration(self, initial_config: TDestinationConfig) -> TDestinationConfig:
"""Get a fully resolved destination config from the initial config
"""
return resolve_configuration(
initial_config,
sections=(known_sections.DESTINATION, self.name),
# Already populated values will supersede resolved env config
explicit_value=self.config_params
)

@staticmethod
def to_name(ref: TDestinationReferenceArg) -> str:
if ref is None:
raise InvalidDestinationReference(ref)
if isinstance(ref, str):
return ref.rsplit(".", 1)[-1]
return ref.name

@staticmethod
def from_name(destination: TDestinationReferenceArg) -> "DestinationReference":
if destination is None:
def from_reference(ref: TDestinationReferenceArg, credentials: Optional[CredentialsConfiguration] = None, **kwargs: Any) -> Optional["Destination[DestinationClientConfiguration, JobClientBase]"]:
"""Instantiate destination from str reference.
The ref can be a destination name or import path pointing to a destination class (e.g. `dlt.destinations.postgres`)
"""
if ref is None:
return None
if isinstance(ref, Destination):
return ref
if not isinstance(ref, str):
raise InvalidDestinationReference(ref)
try:
if "." in ref:
module_path, attr_name = ref.rsplit(".", 1)
dest_module = import_module(module_path)
else:
from dlt import destinations as dest_module
attr_name = ref
except ModuleNotFoundError as e:
raise UnknownDestinationModule(ref) from e

# if destination is a str, get destination reference by dynamically importing module
if isinstance(destination, str):
try:
if "." in destination:
# this is full module name
destination_ref = cast(DestinationReference, import_module(destination))
else:
# from known location
destination_ref = cast(DestinationReference, import_module(f"dlt.destinations.{destination}"))
except ImportError:
if "." in destination:
raise UnknownDestinationModule(destination)
else:
# allow local external module imported without dot
try:
destination_ref = cast(DestinationReference, import_module(destination))
except ImportError:
raise UnknownDestinationModule(destination)
else:
destination_ref = cast(DestinationReference, destination)

# make sure the reference is correct
try:
c = destination_ref.spec()
c.credentials
except Exception:
raise InvalidDestinationReference(destination)
factory: Type[Destination[DestinationClientConfiguration, JobClientBase]] = getattr(dest_module, attr_name)
except AttributeError as e:
raise UnknownDestinationModule(ref) from e
if credentials:
kwargs["credentials"] = credentials
try:
dest = factory(**kwargs)
dest.spec
except Exception as e:
raise InvalidDestinationReference(ref) from e
return dest

return destination_ref
def client(self, schema: Schema, initial_config: TDestinationConfig = config.value) -> TDestinationClient:
"""Returns a configured instance of the destination's job client"""
return self.client_class(schema, self.configuration(initial_config))

@staticmethod
def to_name(destination: TDestinationReferenceArg) -> str:
if isinstance(destination, ModuleType):
return get_module_name(destination)
return destination.split(".")[-1] # type: ignore

TDestination = Destination[DestinationClientConfiguration, JobClientBase]
4 changes: 2 additions & 2 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext
from dlt.common.configuration.paths import get_dlt_data_dir
from dlt.common.configuration.specs import RunConfiguration
from dlt.common.destination import DestinationReference, TDestinationReferenceArg
from dlt.common.destination import Destination, TDestinationReferenceArg, TDestination
from dlt.common.exceptions import DestinationHasFailedJobs, PipelineStateNotAvailable, ResourceNameNotAvailable, SourceSectionNotAvailable
from dlt.common.schema import Schema
from dlt.common.schema.typing import TColumnNames, TColumnSchema, TWriteDisposition
Expand Down Expand Up @@ -177,7 +177,7 @@ class SupportsPipeline(Protocol):
"""Name of the pipeline"""
default_schema_name: str
"""Name of the default schema"""
destination: DestinationReference
destination: TDestination
"""The destination reference which is ModuleType. `destination.__name__` returns the name string"""
dataset_name: str
"""Name of the dataset to which pipeline will be loaded to"""
Expand Down
28 changes: 28 additions & 0 deletions dlt/destinations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from dlt.destinations.impl.postgres.factory import postgres
from dlt.destinations.impl.snowflake.factory import snowflake
from dlt.destinations.impl.filesystem.factory import filesystem
from dlt.destinations.impl.duckdb.factory import duckdb
from dlt.destinations.impl.dummy.factory import dummy
from dlt.destinations.impl.mssql.factory import mssql
from dlt.destinations.impl.bigquery.factory import bigquery
from dlt.destinations.impl.athena.factory import athena
from dlt.destinations.impl.redshift.factory import redshift
from dlt.destinations.impl.qdrant.factory import qdrant
from dlt.destinations.impl.motherduck.factory import motherduck
from dlt.destinations.impl.weaviate.factory import weaviate


__all__ = [
"postgres",
"snowflake",
"filesystem",
"duckdb",
"dummy",
"mssql",
"bigquery",
"athena",
"redshift",
"qdrant",
"motherduck",
"weaviate",
]
29 changes: 0 additions & 29 deletions dlt/destinations/filesystem/__init__.py

This file was deleted.

Empty file.
Loading

0 comments on commit 105795c

Please sign in to comment.