Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parametrized destinations #746

Merged
merged 30 commits into from
Nov 18, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3cdab60
Move destination modules to subfolder
steinitzu Nov 11, 2023
3b53861
Mockup destination factory
steinitzu Nov 9, 2023
4b449e3
Destination factory replacing reference and dest __init__
steinitzu Nov 14, 2023
f195fa7
Update factories
steinitzu Nov 11, 2023
340f637
Defer duckdb credentials resolving in pipeline context
steinitzu Nov 12, 2023
4ae4ceb
Simplify destination config resolution
steinitzu Nov 14, 2023
bccfa9d
capabilities are callable
steinitzu Nov 14, 2023
3bde2f4
bigquery, athena factories
steinitzu Nov 14, 2023
0d59d51
Add rest of factories
steinitzu Nov 14, 2023
3ee262d
Cleanup
steinitzu Nov 14, 2023
9243b2f
Destination type vars
steinitzu Nov 14, 2023
9ad561c
Cleanup
steinitzu Nov 14, 2023
007d7a3
Fix test
steinitzu Nov 14, 2023
62b5a57
Create initial config from non-defaults only
steinitzu Nov 14, 2023
d77b54a
Update naming convention path
steinitzu Nov 14, 2023
0d4ad49
Fix config in bigquery location test
steinitzu Nov 14, 2023
f657071
Only keep non-default config args in factory
steinitzu Nov 14, 2023
56e922a
Resolve duckdb credentials in pipeline context
steinitzu Nov 15, 2023
08a8fdc
Cleanup
steinitzu Nov 15, 2023
2567ca0
Union credentials arguments
steinitzu Nov 15, 2023
3537c03
Common tests without dest dependencies
steinitzu Nov 15, 2023
25a937a
Forward all athena arguments
steinitzu Nov 16, 2023
97f1afc
Delete commented code
steinitzu Nov 16, 2023
fe24e14
Reference docstrings
steinitzu Nov 16, 2023
481a7cb
Add deprecation warning for credentials argument
steinitzu Nov 16, 2023
d91402c
Init docstrings for destination factories
steinitzu Nov 16, 2023
fc92929
Fix tests
steinitzu Nov 17, 2023
13ec6fb
Destination name in output
steinitzu Nov 17, 2023
1f16c8f
Correct exception in unknown destination test
steinitzu Nov 17, 2023
2541d49
Merge branch 'devel' into sthor/parametrized-destination
rudolfix Nov 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from dlt.extract.decorators import source, resource, transformer, defer
from dlt.pipeline import pipeline as _pipeline, run, attach, Pipeline, dbt, current as _current, mark as _mark
from dlt.pipeline import progress
from dlt import destinations

pipeline = _pipeline
current = _current
Expand Down Expand Up @@ -64,4 +65,5 @@
"TSecretValue",
"TCredentials",
"sources",
"destinations",
]
4 changes: 2 additions & 2 deletions dlt/cli/deploy_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from dlt.version import DLT_PKG_NAME

from dlt.common.destination.reference import DestinationReference
from dlt.common.destination.reference import Destination

REQUIREMENTS_GITHUB_ACTION = "requirements_github_action.txt"
DLT_DEPLOY_DOCS_URL = "https://dlthub.com/docs/walkthroughs/deploy-a-pipeline"
Expand Down Expand Up @@ -198,7 +198,7 @@ def __init__(
def _generate_workflow(self, *args: Optional[Any]) -> None:
self.deployment_method = DeploymentMethods.airflow_composer.value

req_dep = f"{DLT_PKG_NAME}[{DestinationReference.to_name(self.state['destination'])}]"
req_dep = f"{DLT_PKG_NAME}[{Destination.to_name(self.state['destination'])}]"
req_dep_line = f"{req_dep}>={pkg_version(DLT_PKG_NAME)}"

self.artifacts["requirements_txt"] = req_dep_line
Expand Down
6 changes: 3 additions & 3 deletions dlt/cli/init_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dlt.common.pipeline import get_dlt_repos_dir
from dlt.common.source import _SOURCES
from dlt.version import DLT_PKG_NAME, __version__
from dlt.common.destination import DestinationReference
from dlt.common.destination import Destination
from dlt.common.reflection.utils import rewrite_python_script
from dlt.common.schema.utils import is_valid_schema_name
from dlt.common.schema.exceptions import InvalidSchemaName
Expand Down Expand Up @@ -160,8 +160,8 @@ def list_verified_sources_command(repo_location: str, branch: str = None) -> Non

def init_command(source_name: str, destination_name: str, use_generic_template: bool, repo_location: str, branch: str = None) -> None:
# try to import the destination and get config spec
destination_reference = DestinationReference.from_name(destination_name)
destination_spec = destination_reference.spec()
destination_reference = Destination.from_reference(destination_name)
destination_spec = destination_reference.spec

fmt.echo("Looking up the init scripts in %s..." % fmt.bold(repo_location))
clone_storage = git.get_fresh_repo_files(repo_location, get_dlt_repos_dir(), branch=branch)
Expand Down
2 changes: 1 addition & 1 deletion dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
fmt.warning(warning)
return

fmt.echo("About to drop the following data in dataset %s in destination %s:" % (fmt.bold(drop.info["dataset_name"]), fmt.bold(p.destination.__name__)))
fmt.echo("About to drop the following data in dataset %s in destination %s:" % (fmt.bold(drop.info["dataset_name"]), fmt.bold(p.destination.name)))
fmt.echo("%s: %s" % (fmt.style("Selected schema", fg="green"), drop.info["schema_name"]))
fmt.echo("%s: %s" % (fmt.style("Selected resource(s)", fg="green"), drop.info["resource_names"]))
fmt.echo("%s: %s" % (fmt.style("Table(s) to drop", fg="green"), drop.info["tables"]))
Expand Down
16 changes: 11 additions & 5 deletions dlt/common/configuration/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def with_config(
sections: Tuple[str, ...] = (),
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True
include_defaults: bool = True,
accept_partial: bool = False,
) -> TFun:
...

Expand All @@ -45,7 +46,8 @@ def with_config(
sections: Tuple[str, ...] = (),
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True
include_defaults: bool = True,
accept_partial: bool = False,
) -> Callable[[TFun], TFun]:
...

Expand All @@ -57,7 +59,9 @@ def with_config(
sections: Tuple[str, ...] = (),
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: Optional[BaseConfiguration] = None,
) -> Callable[[TFun], TFun]:
"""Injects values into decorated function arguments following the specification in `spec` or by deriving one from function's signature.

Expand Down Expand Up @@ -127,7 +131,9 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
curr_sections = sections

# if one of arguments is spec the use it as initial value
if spec_arg:
if initial_config:
config = initial_config
elif spec_arg:
config = bound_args.arguments.get(spec_arg.name, None)
# resolve SPEC, also provide section_context with pipeline_name
if pipeline_name_arg:
Expand All @@ -139,7 +145,7 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
with _RESOLVE_LOCK:
with inject_section(section_context):
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
config = resolve_configuration(config or SPEC(), explicit_value=bound_args.arguments)
config = resolve_configuration(config or SPEC(), explicit_value=bound_args.arguments, accept_partial=accept_partial)
resolved_params = dict(config)
# overwrite or add resolved params
for p in sig.parameters.values():
Expand Down
5 changes: 3 additions & 2 deletions dlt/common/destination/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from dlt.common.destination.capabilities import DestinationCapabilitiesContext, TLoaderFileFormat, ALL_SUPPORTED_FILE_FORMATS
from dlt.common.destination.reference import DestinationReference, TDestinationReferenceArg
from dlt.common.destination.reference import TDestinationReferenceArg, Destination, TDestination

__all__ = [
"DestinationCapabilitiesContext",
"TLoaderFileFormat",
"ALL_SUPPORTED_FILE_FORMATS",
"DestinationReference",
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
"TDestinationReferenceArg",
"Destination",
"TDestination",
]
135 changes: 91 additions & 44 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
from abc import ABC, abstractmethod, abstractproperty
from importlib import import_module
from types import TracebackType, ModuleType
from typing import ClassVar, Final, Optional, NamedTuple, Literal, Sequence, Iterable, Type, Protocol, Union, TYPE_CHECKING, cast, List, ContextManager, Dict, Any
from typing import ClassVar, Final, Optional, NamedTuple, Literal, Sequence, Iterable, Type, Protocol, Union, TYPE_CHECKING, cast, List, ContextManager, Dict, Any, Callable, TypeVar, Generic
from contextlib import contextmanager
import datetime # noqa: 251
from copy import deepcopy
import inspect

from dlt.common import logger
from dlt.common.exceptions import IdentifierTooLongException, InvalidDestinationReference, UnknownDestinationModule
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.schema.exceptions import InvalidDatasetName
from dlt.common.schema.utils import get_write_disposition, get_table_format
from dlt.common.configuration import configspec
from dlt.common.configuration import configspec, with_config, resolve_configuration, known_sections
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.accessors import config
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
Expand All @@ -23,7 +24,10 @@
from dlt.common.utils import get_module_name
from dlt.common.configuration.specs import GcpCredentials, AwsCredentialsWithoutDefaults


TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]
TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration")
TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase")


class StorageSchemaInfo(NamedTuple):
Expand Down Expand Up @@ -344,59 +348,102 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable
# the default is to truncate the tables on the staging destination...
return True

TDestinationReferenceArg = Union["DestinationReference", ModuleType, None, str]
TDestinationReferenceArg = Union[str, "Destination", None]


class DestinationReference(Protocol):
__name__: str
"""Name of the destination"""
class Destination(ABC, Generic[TDestinationConfig, TDestinationClient]):
"""A destination factory that can be partially pre-configured
with credentials and other config params.
"""
config_params: Optional[Dict[str, Any]] = None

def __init__(self, **kwargs: Any) -> None:
# Create initial unresolved destination config
# Argument defaults are filtered out here because we only want arguments passed explicitly
# to supersede config from the environment or pipeline args
sig = inspect.signature(self.__class__)
params = sig.parameters
self.config_params = {
k: v for k, v in kwargs.items()
if k not in params or v != params[k].default
}

@property
@abstractmethod
def spec(self) -> Type[TDestinationConfig]:
"""A spec of destination configuration that also contains destination credentials"""
...

@abstractmethod
def capabilities(self) -> DestinationCapabilitiesContext:
"""Destination capabilities ie. supported loader file formats, identifier name lengths, naming conventions, escape function etc."""
...

def client(self, schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> "JobClientBase":
"""A job client responsible for starting and resuming load jobs"""
@property
def name(self) -> str:
return self.__class__.__name__

def spec(self) -> Type[DestinationClientConfiguration]:
"""A spec of destination configuration that also contains destination credentials"""
@property
@abstractmethod
def client_class(self) -> Type[TDestinationClient]:
"""A job client class responsible for starting and resuming load jobs"""
...

def configuration(self, initial_config: TDestinationConfig) -> TDestinationConfig:
"""Get a fully resolved destination config from the initial config
"""
return resolve_configuration(
initial_config,
sections=(known_sections.DESTINATION, self.name),
# Already populated values will supersede resolved env config
explicit_value=self.config_params
)

@staticmethod
def to_name(ref: TDestinationReferenceArg) -> str:
if ref is None:
raise InvalidDestinationReference(ref)
if isinstance(ref, str):
return ref.rsplit(".", 1)[-1]
return ref.name

@staticmethod
def from_name(destination: TDestinationReferenceArg) -> "DestinationReference":
if destination is None:
def from_reference(ref: TDestinationReferenceArg, credentials: Optional[CredentialsConfiguration] = None, **kwargs: Any) -> Optional["Destination[DestinationClientConfiguration, JobClientBase]"]:
"""Instantiate destination from str reference.
The ref can be a destination name or import path pointing to a destination class (e.g. `dlt.destinations.postgres`)
"""
if ref is None:
return None
if isinstance(ref, Destination):
return ref
if not isinstance(ref, str):
raise InvalidDestinationReference(ref)
try:
if "." in ref:
module_path, attr_name = ref.rsplit(".", 1)
dest_module = import_module(module_path)
else:
from dlt import destinations as dest_module
attr_name = ref
except ModuleNotFoundError as e:
raise UnknownDestinationModule(ref) from e

# if destination is a str, get destination reference by dynamically importing module
if isinstance(destination, str):
try:
if "." in destination:
# this is full module name
destination_ref = cast(DestinationReference, import_module(destination))
else:
# from known location
destination_ref = cast(DestinationReference, import_module(f"dlt.destinations.{destination}"))
except ImportError:
if "." in destination:
raise UnknownDestinationModule(destination)
else:
# allow local external module imported without dot
try:
destination_ref = cast(DestinationReference, import_module(destination))
except ImportError:
raise UnknownDestinationModule(destination)
else:
destination_ref = cast(DestinationReference, destination)

# make sure the reference is correct
try:
c = destination_ref.spec()
c.credentials
except Exception:
raise InvalidDestinationReference(destination)
factory: Type[Destination[DestinationClientConfiguration, JobClientBase]] = getattr(dest_module, attr_name)
except AttributeError as e:
raise UnknownDestinationModule(ref) from e
if credentials:
kwargs["credentials"] = credentials
try:
dest = factory(**kwargs)
dest.spec
except Exception as e:
raise InvalidDestinationReference(ref) from e
return dest

return destination_ref
def client(self, schema: Schema, initial_config: TDestinationConfig = config.value) -> TDestinationClient:
"""Returns a configured instance of the destination's job client"""
return self.client_class(schema, self.configuration(initial_config))

@staticmethod
def to_name(destination: TDestinationReferenceArg) -> str:
if isinstance(destination, ModuleType):
return get_module_name(destination)
return destination.split(".")[-1] # type: ignore

TDestination = Destination[DestinationClientConfiguration, JobClientBase]
4 changes: 2 additions & 2 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext
from dlt.common.configuration.paths import get_dlt_data_dir
from dlt.common.configuration.specs import RunConfiguration
from dlt.common.destination import DestinationReference, TDestinationReferenceArg
from dlt.common.destination import Destination, TDestinationReferenceArg, TDestination
from dlt.common.exceptions import DestinationHasFailedJobs, PipelineStateNotAvailable, ResourceNameNotAvailable, SourceSectionNotAvailable
from dlt.common.schema import Schema
from dlt.common.schema.typing import TColumnNames, TColumnSchema, TWriteDisposition
Expand Down Expand Up @@ -177,7 +177,7 @@ class SupportsPipeline(Protocol):
"""Name of the pipeline"""
default_schema_name: str
"""Name of the default schema"""
destination: DestinationReference
destination: TDestination
"""The destination reference which is ModuleType. `destination.__name__` returns the name string"""
dataset_name: str
"""Name of the dataset to which pipeline will be loaded to"""
Expand Down
28 changes: 28 additions & 0 deletions dlt/destinations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from dlt.destinations.impl.postgres.factory import postgres
from dlt.destinations.impl.snowflake.factory import snowflake
from dlt.destinations.impl.filesystem.factory import filesystem
from dlt.destinations.impl.duckdb.factory import duckdb
from dlt.destinations.impl.dummy.factory import dummy
from dlt.destinations.impl.mssql.factory import mssql
from dlt.destinations.impl.bigquery.factory import bigquery
from dlt.destinations.impl.athena.factory import athena
from dlt.destinations.impl.redshift.factory import redshift
from dlt.destinations.impl.qdrant.factory import qdrant
from dlt.destinations.impl.motherduck.factory import motherduck
from dlt.destinations.impl.weaviate.factory import weaviate


__all__ = [
"postgres",
"snowflake",
"filesystem",
"duckdb",
"dummy",
"mssql",
"bigquery",
"athena",
"redshift",
"qdrant",
"motherduck",
"weaviate",
]
29 changes: 0 additions & 29 deletions dlt/destinations/filesystem/__init__.py

This file was deleted.

Empty file.
Loading