diff --git a/dlt/__init__.py b/dlt/__init__.py index f5dde3f204..728343bdd6 100644 --- a/dlt/__init__.py +++ b/dlt/__init__.py @@ -31,6 +31,7 @@ from dlt.extract.decorators import source, resource, transformer, defer from dlt.pipeline import pipeline as _pipeline, run, attach, Pipeline, dbt, current as _current, mark as _mark from dlt.pipeline import progress +from dlt import destinations pipeline = _pipeline current = _current @@ -64,4 +65,5 @@ "TSecretValue", "TCredentials", "sources", + "destinations", ] diff --git a/dlt/cli/deploy_command.py b/dlt/cli/deploy_command.py index 7634f173b3..a7bdf2e0e7 100644 --- a/dlt/cli/deploy_command.py +++ b/dlt/cli/deploy_command.py @@ -16,7 +16,7 @@ from dlt.version import DLT_PKG_NAME -from dlt.common.destination.reference import DestinationReference +from dlt.common.destination.reference import Destination REQUIREMENTS_GITHUB_ACTION = "requirements_github_action.txt" DLT_DEPLOY_DOCS_URL = "https://dlthub.com/docs/walkthroughs/deploy-a-pipeline" @@ -198,7 +198,7 @@ def __init__( def _generate_workflow(self, *args: Optional[Any]) -> None: self.deployment_method = DeploymentMethods.airflow_composer.value - req_dep = f"{DLT_PKG_NAME}[{DestinationReference.to_name(self.state['destination'])}]" + req_dep = f"{DLT_PKG_NAME}[{Destination.to_name(self.state['destination'])}]" req_dep_line = f"{req_dep}>={pkg_version(DLT_PKG_NAME)}" self.artifacts["requirements_txt"] = req_dep_line diff --git a/dlt/cli/init_command.py b/dlt/cli/init_command.py index c246ac87de..4cec1706b9 100644 --- a/dlt/cli/init_command.py +++ b/dlt/cli/init_command.py @@ -12,7 +12,7 @@ from dlt.common.pipeline import get_dlt_repos_dir from dlt.common.source import _SOURCES from dlt.version import DLT_PKG_NAME, __version__ -from dlt.common.destination import DestinationReference +from dlt.common.destination import Destination from dlt.common.reflection.utils import rewrite_python_script from dlt.common.schema.utils import is_valid_schema_name from dlt.common.schema.exceptions import InvalidSchemaName @@ -160,8 +160,8 @@ def list_verified_sources_command(repo_location: str, branch: str = None) -> Non def init_command(source_name: str, destination_name: str, use_generic_template: bool, repo_location: str, branch: str = None) -> None: # try to import the destination and get config spec - destination_reference = DestinationReference.from_name(destination_name) - destination_spec = destination_reference.spec() + destination_reference = Destination.from_reference(destination_name) + destination_spec = destination_reference.spec fmt.echo("Looking up the init scripts in %s..." % fmt.bold(repo_location)) clone_storage = git.get_fresh_repo_files(repo_location, get_dlt_repos_dir(), branch=branch) diff --git a/dlt/cli/pipeline_command.py b/dlt/cli/pipeline_command.py index b17981c1b1..2d705dc1a3 100644 --- a/dlt/cli/pipeline_command.py +++ b/dlt/cli/pipeline_command.py @@ -212,7 +212,7 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]: fmt.warning(warning) return - fmt.echo("About to drop the following data in dataset %s in destination %s:" % (fmt.bold(drop.info["dataset_name"]), fmt.bold(p.destination.__name__))) + fmt.echo("About to drop the following data in dataset %s in destination %s:" % (fmt.bold(drop.info["dataset_name"]), fmt.bold(p.destination.name))) fmt.echo("%s: %s" % (fmt.style("Selected schema", fg="green"), drop.info["schema_name"])) fmt.echo("%s: %s" % (fmt.style("Selected resource(s)", fg="green"), drop.info["resource_names"])) fmt.echo("%s: %s" % (fmt.style("Table(s) to drop", fg="green"), drop.info["tables"])) diff --git a/dlt/common/configuration/inject.py b/dlt/common/configuration/inject.py index 1880727a0f..f50e947011 100644 --- a/dlt/common/configuration/inject.py +++ b/dlt/common/configuration/inject.py @@ -32,7 +32,8 @@ def with_config( sections: Tuple[str, ...] = (), sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming, auto_pipeline_section: bool = False, - include_defaults: bool = True + include_defaults: bool = True, + accept_partial: bool = False, ) -> TFun: ... @@ -45,7 +46,8 @@ def with_config( sections: Tuple[str, ...] = (), sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming, auto_pipeline_section: bool = False, - include_defaults: bool = True + include_defaults: bool = True, + accept_partial: bool = False, ) -> Callable[[TFun], TFun]: ... @@ -57,7 +59,9 @@ def with_config( sections: Tuple[str, ...] = (), sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming, auto_pipeline_section: bool = False, - include_defaults: bool = True + include_defaults: bool = True, + accept_partial: bool = False, + initial_config: Optional[BaseConfiguration] = None, ) -> Callable[[TFun], TFun]: """Injects values into decorated function arguments following the specification in `spec` or by deriving one from function's signature. @@ -127,7 +131,9 @@ def _wrap(*args: Any, **kwargs: Any) -> Any: curr_sections = sections # if one of arguments is spec the use it as initial value - if spec_arg: + if initial_config: + config = initial_config + elif spec_arg: config = bound_args.arguments.get(spec_arg.name, None) # resolve SPEC, also provide section_context with pipeline_name if pipeline_name_arg: @@ -139,7 +145,7 @@ def _wrap(*args: Any, **kwargs: Any) -> Any: with _RESOLVE_LOCK: with inject_section(section_context): # print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}") - config = resolve_configuration(config or SPEC(), explicit_value=bound_args.arguments) + config = resolve_configuration(config or SPEC(), explicit_value=bound_args.arguments, accept_partial=accept_partial) resolved_params = dict(config) # overwrite or add resolved params for p in sig.parameters.values(): diff --git a/dlt/common/destination/__init__.py b/dlt/common/destination/__init__.py index 88b5d5ef06..4857851fa9 100644 --- a/dlt/common/destination/__init__.py +++ b/dlt/common/destination/__init__.py @@ -1,10 +1,11 @@ from dlt.common.destination.capabilities import DestinationCapabilitiesContext, TLoaderFileFormat, ALL_SUPPORTED_FILE_FORMATS -from dlt.common.destination.reference import DestinationReference, TDestinationReferenceArg +from dlt.common.destination.reference import TDestinationReferenceArg, Destination, TDestination __all__ = [ "DestinationCapabilitiesContext", "TLoaderFileFormat", "ALL_SUPPORTED_FILE_FORMATS", - "DestinationReference", "TDestinationReferenceArg", + "Destination", + "TDestination", ] diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 13172b41e9..1c3560cbbd 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -1,10 +1,11 @@ from abc import ABC, abstractmethod, abstractproperty from importlib import import_module from types import TracebackType, ModuleType -from typing import ClassVar, Final, Optional, NamedTuple, Literal, Sequence, Iterable, Type, Protocol, Union, TYPE_CHECKING, cast, List, ContextManager, Dict, Any +from typing import ClassVar, Final, Optional, NamedTuple, Literal, Sequence, Iterable, Type, Protocol, Union, TYPE_CHECKING, cast, List, ContextManager, Dict, Any, Callable, TypeVar, Generic from contextlib import contextmanager import datetime # noqa: 251 from copy import deepcopy +import inspect from dlt.common import logger from dlt.common.exceptions import IdentifierTooLongException, InvalidDestinationReference, UnknownDestinationModule @@ -12,7 +13,7 @@ from dlt.common.schema.typing import TWriteDisposition from dlt.common.schema.exceptions import InvalidDatasetName from dlt.common.schema.utils import get_write_disposition, get_table_format -from dlt.common.configuration import configspec +from dlt.common.configuration import configspec, with_config, resolve_configuration, known_sections from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration from dlt.common.configuration.accessors import config from dlt.common.destination.capabilities import DestinationCapabilitiesContext @@ -23,7 +24,10 @@ from dlt.common.utils import get_module_name from dlt.common.configuration.specs import GcpCredentials, AwsCredentialsWithoutDefaults + TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"] +TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration") +TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase") class StorageSchemaInfo(NamedTuple): @@ -344,59 +348,102 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable # the default is to truncate the tables on the staging destination... return True -TDestinationReferenceArg = Union["DestinationReference", ModuleType, None, str] +TDestinationReferenceArg = Union[str, "Destination", None] -class DestinationReference(Protocol): - __name__: str - """Name of the destination""" +class Destination(ABC, Generic[TDestinationConfig, TDestinationClient]): + """A destination factory that can be partially pre-configured + with credentials and other config params. + """ + config_params: Optional[Dict[str, Any]] = None + + def __init__(self, **kwargs: Any) -> None: + # Create initial unresolved destination config + # Argument defaults are filtered out here because we only want arguments passed explicitly + # to supersede config from the environment or pipeline args + sig = inspect.signature(self.__class__) + params = sig.parameters + self.config_params = { + k: v for k, v in kwargs.items() + if k not in params or v != params[k].default + } + + @property + @abstractmethod + def spec(self) -> Type[TDestinationConfig]: + """A spec of destination configuration that also contains destination credentials""" + ... + @abstractmethod def capabilities(self) -> DestinationCapabilitiesContext: """Destination capabilities ie. supported loader file formats, identifier name lengths, naming conventions, escape function etc.""" + ... - def client(self, schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> "JobClientBase": - """A job client responsible for starting and resuming load jobs""" + @property + def name(self) -> str: + return self.__class__.__name__ - def spec(self) -> Type[DestinationClientConfiguration]: - """A spec of destination configuration that also contains destination credentials""" + @property + @abstractmethod + def client_class(self) -> Type[TDestinationClient]: + """A job client class responsible for starting and resuming load jobs""" + ... + + def configuration(self, initial_config: TDestinationConfig) -> TDestinationConfig: + """Get a fully resolved destination config from the initial config + """ + return resolve_configuration( + initial_config, + sections=(known_sections.DESTINATION, self.name), + # Already populated values will supersede resolved env config + explicit_value=self.config_params + ) + + @staticmethod + def to_name(ref: TDestinationReferenceArg) -> str: + if ref is None: + raise InvalidDestinationReference(ref) + if isinstance(ref, str): + return ref.rsplit(".", 1)[-1] + return ref.name @staticmethod - def from_name(destination: TDestinationReferenceArg) -> "DestinationReference": - if destination is None: + def from_reference(ref: TDestinationReferenceArg, credentials: Optional[CredentialsConfiguration] = None, **kwargs: Any) -> Optional["Destination[DestinationClientConfiguration, JobClientBase]"]: + """Instantiate destination from str reference. + The ref can be a destination name or import path pointing to a destination class (e.g. `dlt.destinations.postgres`) + """ + if ref is None: return None + if isinstance(ref, Destination): + return ref + if not isinstance(ref, str): + raise InvalidDestinationReference(ref) + try: + if "." in ref: + module_path, attr_name = ref.rsplit(".", 1) + dest_module = import_module(module_path) + else: + from dlt import destinations as dest_module + attr_name = ref + except ModuleNotFoundError as e: + raise UnknownDestinationModule(ref) from e - # if destination is a str, get destination reference by dynamically importing module - if isinstance(destination, str): - try: - if "." in destination: - # this is full module name - destination_ref = cast(DestinationReference, import_module(destination)) - else: - # from known location - destination_ref = cast(DestinationReference, import_module(f"dlt.destinations.{destination}")) - except ImportError: - if "." in destination: - raise UnknownDestinationModule(destination) - else: - # allow local external module imported without dot - try: - destination_ref = cast(DestinationReference, import_module(destination)) - except ImportError: - raise UnknownDestinationModule(destination) - else: - destination_ref = cast(DestinationReference, destination) - - # make sure the reference is correct try: - c = destination_ref.spec() - c.credentials - except Exception: - raise InvalidDestinationReference(destination) + factory: Type[Destination[DestinationClientConfiguration, JobClientBase]] = getattr(dest_module, attr_name) + except AttributeError as e: + raise UnknownDestinationModule(ref) from e + if credentials: + kwargs["credentials"] = credentials + try: + dest = factory(**kwargs) + dest.spec + except Exception as e: + raise InvalidDestinationReference(ref) from e + return dest - return destination_ref + def client(self, schema: Schema, initial_config: TDestinationConfig = config.value) -> TDestinationClient: + """Returns a configured instance of the destination's job client""" + return self.client_class(schema, self.configuration(initial_config)) - @staticmethod - def to_name(destination: TDestinationReferenceArg) -> str: - if isinstance(destination, ModuleType): - return get_module_name(destination) - return destination.split(".")[-1] # type: ignore + +TDestination = Destination[DestinationClientConfiguration, JobClientBase] diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index aeb0bdc68a..ddd9003799 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -14,7 +14,7 @@ from dlt.common.configuration.specs.config_section_context import ConfigSectionContext from dlt.common.configuration.paths import get_dlt_data_dir from dlt.common.configuration.specs import RunConfiguration -from dlt.common.destination import DestinationReference, TDestinationReferenceArg +from dlt.common.destination import Destination, TDestinationReferenceArg, TDestination from dlt.common.exceptions import DestinationHasFailedJobs, PipelineStateNotAvailable, ResourceNameNotAvailable, SourceSectionNotAvailable from dlt.common.schema import Schema from dlt.common.schema.typing import TColumnNames, TColumnSchema, TWriteDisposition @@ -177,7 +177,7 @@ class SupportsPipeline(Protocol): """Name of the pipeline""" default_schema_name: str """Name of the default schema""" - destination: DestinationReference + destination: TDestination """The destination reference which is ModuleType. `destination.__name__` returns the name string""" dataset_name: str """Name of the dataset to which pipeline will be loaded to""" diff --git a/dlt/destinations/__init__.py b/dlt/destinations/__init__.py index e69de29bb2..980c4ce7f2 100644 --- a/dlt/destinations/__init__.py +++ b/dlt/destinations/__init__.py @@ -0,0 +1,28 @@ +from dlt.destinations.impl.postgres.factory import postgres +from dlt.destinations.impl.snowflake.factory import snowflake +from dlt.destinations.impl.filesystem.factory import filesystem +from dlt.destinations.impl.duckdb.factory import duckdb +from dlt.destinations.impl.dummy.factory import dummy +from dlt.destinations.impl.mssql.factory import mssql +from dlt.destinations.impl.bigquery.factory import bigquery +from dlt.destinations.impl.athena.factory import athena +from dlt.destinations.impl.redshift.factory import redshift +from dlt.destinations.impl.qdrant.factory import qdrant +from dlt.destinations.impl.motherduck.factory import motherduck +from dlt.destinations.impl.weaviate.factory import weaviate + + +__all__ = [ + "postgres", + "snowflake", + "filesystem", + "duckdb", + "dummy", + "mssql", + "bigquery", + "athena", + "redshift", + "qdrant", + "motherduck", + "weaviate", +] diff --git a/dlt/destinations/filesystem/__init__.py b/dlt/destinations/filesystem/__init__.py deleted file mode 100644 index 3dc6c62480..0000000000 --- a/dlt/destinations/filesystem/__init__.py +++ /dev/null @@ -1,29 +0,0 @@ -from typing import Type - -from dlt.common.schema.schema import Schema -from dlt.common.configuration import with_config, known_sections -from dlt.common.configuration.accessors import config -from dlt.common.destination import DestinationCapabilitiesContext -from dlt.common.destination.reference import JobClientBase, DestinationClientDwhWithStagingConfiguration - -from dlt.destinations.filesystem.configuration import FilesystemDestinationClientConfiguration - - -@with_config(spec=FilesystemDestinationClientConfiguration, sections=(known_sections.DESTINATION, "filesystem",)) -def _configure(config: FilesystemDestinationClientConfiguration = config.value) -> FilesystemDestinationClientConfiguration: - return config - - -def capabilities() -> DestinationCapabilitiesContext: - return DestinationCapabilitiesContext.generic_capabilities("jsonl") - - -def client(schema: Schema, initial_config: DestinationClientDwhWithStagingConfiguration = config.value) -> JobClientBase: - # import client when creating instance so capabilities and config specs can be accessed without dependencies installed - from dlt.destinations.filesystem.filesystem import FilesystemClient - - return FilesystemClient(schema, _configure(initial_config)) # type: ignore - - -def spec() -> Type[FilesystemDestinationClientConfiguration]: - return FilesystemDestinationClientConfiguration diff --git a/dlt/destinations/impl/__init__.py b/dlt/destinations/impl/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dlt/destinations/athena/__init__.py b/dlt/destinations/impl/athena/__init__.py similarity index 55% rename from dlt/destinations/athena/__init__.py rename to dlt/destinations/impl/athena/__init__.py index 1fd7f14d57..9f0b829819 100644 --- a/dlt/destinations/athena/__init__.py +++ b/dlt/destinations/impl/athena/__init__.py @@ -1,18 +1,7 @@ -from typing import Type - from dlt.common.destination import DestinationCapabilitiesContext -from dlt.common.configuration import with_config, known_sections -from dlt.common.configuration.accessors import config -from dlt.common.schema.schema import Schema from dlt.common.data_writers.escape import escape_athena_identifier from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE -from dlt.destinations.athena.configuration import AthenaClientConfiguration -from dlt.common.destination.reference import JobClientBase, DestinationClientConfiguration - -@with_config(spec=AthenaClientConfiguration, sections=(known_sections.DESTINATION, "athena",)) -def _configure(config: AthenaClientConfiguration = config.value) -> AthenaClientConfiguration: - return config def capabilities() -> DestinationCapabilitiesContext: caps = DestinationCapabilitiesContext() @@ -37,15 +26,3 @@ def capabilities() -> DestinationCapabilitiesContext: caps.timestamp_precision = 3 caps.supports_truncate_command = False return caps - - -def client(schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> JobClientBase: - # import client when creating instance so capabilities and config specs can be accessed without dependencies installed - from dlt.destinations.athena.athena import AthenaClient - return AthenaClient(schema, _configure(initial_config)) # type: ignore - - -def spec() -> Type[DestinationClientConfiguration]: - return AthenaClientConfiguration - - diff --git a/dlt/destinations/athena/athena.py b/dlt/destinations/impl/athena/athena.py similarity index 99% rename from dlt/destinations/athena/athena.py rename to dlt/destinations/impl/athena/athena.py index 44d020c127..f675e7a496 100644 --- a/dlt/destinations/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -27,11 +27,11 @@ from dlt.destinations.typing import DBApi, DBTransaction from dlt.destinations.exceptions import DatabaseTerminalException, DatabaseTransientException, DatabaseUndefinedRelation, LoadJobTerminalException -from dlt.destinations.athena import capabilities +from dlt.destinations.impl.athena import capabilities from dlt.destinations.sql_client import SqlClientBase, DBApiCursorImpl, raise_database_error, raise_open_connection_error from dlt.destinations.typing import DBApiCursor from dlt.destinations.job_client_impl import SqlJobClientWithStaging -from dlt.destinations.athena.configuration import AthenaClientConfiguration +from dlt.destinations.impl.athena.configuration import AthenaClientConfiguration from dlt.destinations.type_mapping import TypeMapper from dlt.destinations import path_utils diff --git a/dlt/destinations/athena/configuration.py b/dlt/destinations/impl/athena/configuration.py similarity index 100% rename from dlt/destinations/athena/configuration.py rename to dlt/destinations/impl/athena/configuration.py diff --git a/dlt/destinations/impl/athena/factory.py b/dlt/destinations/impl/athena/factory.py new file mode 100644 index 0000000000..cc2b027695 --- /dev/null +++ b/dlt/destinations/impl/athena/factory.py @@ -0,0 +1,53 @@ +import typing as t + +from dlt.common.destination import Destination, DestinationCapabilitiesContext +from dlt.destinations.impl.athena.configuration import AthenaClientConfiguration +from dlt.common.configuration.specs import AwsCredentials +from dlt.destinations.impl.athena import capabilities + +if t.TYPE_CHECKING: + from dlt.destinations.impl.athena.athena import AthenaClient + + +class athena(Destination[AthenaClientConfiguration, "AthenaClient"]): + + spec = AthenaClientConfiguration + + def capabilities(self) -> DestinationCapabilitiesContext: + return capabilities() + + @property + def client_class(self) -> t.Type["AthenaClient"]: + from dlt.destinations.impl.athena.athena import AthenaClient + + return AthenaClient + + def __init__( + self, + query_result_bucket: t.Optional[str] = None, + credentials: t.Union[AwsCredentials, t.Dict[str, t.Any], t.Any] = None, + athena_work_group: t.Optional[str] = None, + aws_data_catalog: t.Optional[str] = "awsdatacatalog", + force_iceberg: bool = False, + **kwargs: t.Any, + ) -> None: + """Configure the Athena destination to use in a pipeline. + + All arguments provided here supersede other configuration sources such as environment variables and dlt config files. + + Args: + query_result_bucket: S3 bucket to store query results in + credentials: AWS credentials to connect to the Athena database. + athena_work_group: Athena work group to use + aws_data_catalog: Athena data catalog to use + force_iceberg: Force iceberg tables + **kwargs: Additional arguments passed to the destination config + """ + super().__init__( + query_result_bucket=query_result_bucket, + credentials=credentials, + athena_work_group=athena_work_group, + aws_data_catalog=aws_data_catalog, + force_iceberg=force_iceberg, + **kwargs, + ) diff --git a/dlt/destinations/bigquery/README.md b/dlt/destinations/impl/bigquery/README.md similarity index 100% rename from dlt/destinations/bigquery/README.md rename to dlt/destinations/impl/bigquery/README.md diff --git a/dlt/destinations/bigquery/__init__.py b/dlt/destinations/impl/bigquery/__init__.py similarity index 50% rename from dlt/destinations/bigquery/__init__.py rename to dlt/destinations/impl/bigquery/__init__.py index 3d97e9a929..1304bd72bb 100644 --- a/dlt/destinations/bigquery/__init__.py +++ b/dlt/destinations/impl/bigquery/__init__.py @@ -1,20 +1,7 @@ -from typing import Type from dlt.common.data_writers.escape import escape_bigquery_identifier - -from dlt.common.schema.schema import Schema -from dlt.common.configuration import with_config, known_sections -from dlt.common.configuration.accessors import config from dlt.common.destination import DestinationCapabilitiesContext -from dlt.common.destination.reference import JobClientBase, DestinationClientConfiguration from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE -from dlt.destinations.bigquery.configuration import BigQueryClientConfiguration - - -@with_config(spec=BigQueryClientConfiguration, sections=(known_sections.DESTINATION, "bigquery",)) -def _configure(config: BigQueryClientConfiguration = config.value) -> BigQueryClientConfiguration: - return config - def capabilities() -> DestinationCapabilitiesContext: caps = DestinationCapabilitiesContext() @@ -35,14 +22,3 @@ def capabilities() -> DestinationCapabilitiesContext: caps.supports_ddl_transactions = False return caps - - -def client(schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> JobClientBase: - # import client when creating instance so capabilities and config specs can be accessed without dependencies installed - from dlt.destinations.bigquery.bigquery import BigQueryClient - - return BigQueryClient(schema, _configure(initial_config)) # type: ignore - - -def spec() -> Type[DestinationClientConfiguration]: - return BigQueryClientConfiguration \ No newline at end of file diff --git a/dlt/destinations/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py similarity index 98% rename from dlt/destinations/bigquery/bigquery.py rename to dlt/destinations/impl/bigquery/bigquery.py index 9cc7591f57..440123e46d 100644 --- a/dlt/destinations/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -17,9 +17,9 @@ from dlt.destinations.job_client_impl import SqlJobClientWithStaging from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate, DestinationTransientException, LoadJobNotExistsException, LoadJobTerminalException -from dlt.destinations.bigquery import capabilities -from dlt.destinations.bigquery.configuration import BigQueryClientConfiguration -from dlt.destinations.bigquery.sql_client import BigQuerySqlClient, BQ_TERMINAL_REASONS +from dlt.destinations.impl.bigquery import capabilities +from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration +from dlt.destinations.impl.bigquery.sql_client import BigQuerySqlClient, BQ_TERMINAL_REASONS from dlt.destinations.sql_jobs import SqlMergeJob, SqlStagingCopyJob, SqlJobParams from dlt.destinations.job_impl import NewReferenceJob from dlt.destinations.sql_client import SqlClientBase diff --git a/dlt/destinations/bigquery/configuration.py b/dlt/destinations/impl/bigquery/configuration.py similarity index 100% rename from dlt/destinations/bigquery/configuration.py rename to dlt/destinations/impl/bigquery/configuration.py diff --git a/dlt/destinations/impl/bigquery/factory.py b/dlt/destinations/impl/bigquery/factory.py new file mode 100644 index 0000000000..ce6ace3bf7 --- /dev/null +++ b/dlt/destinations/impl/bigquery/factory.py @@ -0,0 +1,35 @@ +import typing as t + +from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration +from dlt.common.configuration.specs import GcpServiceAccountCredentials +from dlt.destinations.impl.bigquery import capabilities +from dlt.common.destination import Destination, DestinationCapabilitiesContext + +if t.TYPE_CHECKING: + from dlt.destinations.impl.bigquery.bigquery import BigQueryClient + + +class bigquery(Destination[BigQueryClientConfiguration, "BigQueryClient"]): + + spec = BigQueryClientConfiguration + + def capabilities(self) -> DestinationCapabilitiesContext: + return capabilities() + + @property + def client_class(self) -> t.Type["BigQueryClient"]: + from dlt.destinations.impl.bigquery.bigquery import BigQueryClient + + return BigQueryClient + + def __init__( + self, + credentials: t.Optional[GcpServiceAccountCredentials] = None, + location: t.Optional[str] = None, + **kwargs: t.Any, + ) -> None: + super().__init__( + credentials=credentials, + location=location, + **kwargs + ) diff --git a/dlt/destinations/bigquery/sql_client.py b/dlt/destinations/impl/bigquery/sql_client.py similarity index 99% rename from dlt/destinations/bigquery/sql_client.py rename to dlt/destinations/impl/bigquery/sql_client.py index 3d6eb19833..4939add0da 100644 --- a/dlt/destinations/bigquery/sql_client.py +++ b/dlt/destinations/impl/bigquery/sql_client.py @@ -17,7 +17,7 @@ from dlt.destinations.exceptions import DatabaseTerminalException, DatabaseTransientException, DatabaseUndefinedRelation from dlt.destinations.sql_client import DBApiCursorImpl, SqlClientBase, raise_database_error, raise_open_connection_error -from dlt.destinations.bigquery import capabilities +from dlt.destinations.impl.bigquery import capabilities # terminal reasons as returned in BQ gRPC error response # https://cloud.google.com/bigquery/docs/error-messages diff --git a/dlt/destinations/duckdb/__init__.py b/dlt/destinations/impl/duckdb/__init__.py similarity index 54% rename from dlt/destinations/duckdb/__init__.py rename to dlt/destinations/impl/duckdb/__init__.py index d9882cc0eb..5cbc8dea53 100644 --- a/dlt/destinations/duckdb/__init__.py +++ b/dlt/destinations/impl/duckdb/__init__.py @@ -1,20 +1,7 @@ -from typing import Type - -from dlt.common.schema.schema import Schema -from dlt.common.configuration import with_config, known_sections -from dlt.common.configuration.accessors import config from dlt.common.data_writers.escape import escape_postgres_identifier, escape_duckdb_literal from dlt.common.destination import DestinationCapabilitiesContext -from dlt.common.destination.reference import JobClientBase, DestinationClientConfiguration from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE -from dlt.destinations.duckdb.configuration import DuckDbClientConfiguration - - -@with_config(spec=DuckDbClientConfiguration, sections=(known_sections.DESTINATION, "duckdb",)) -def _configure(config: DuckDbClientConfiguration = config.value) -> DuckDbClientConfiguration: - return config - def capabilities() -> DestinationCapabilitiesContext: caps = DestinationCapabilitiesContext() @@ -37,14 +24,3 @@ def capabilities() -> DestinationCapabilitiesContext: caps.supports_truncate_command = False return caps - - -def client(schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> JobClientBase: - # import client when creating instance so capabilities and config specs can be accessed without dependencies installed - from dlt.destinations.duckdb.duck import DuckDbClient - - return DuckDbClient(schema, _configure(initial_config)) # type: ignore - - -def spec() -> Type[DestinationClientConfiguration]: - return DuckDbClientConfiguration diff --git a/dlt/destinations/duckdb/configuration.py b/dlt/destinations/impl/duckdb/configuration.py similarity index 94% rename from dlt/destinations/duckdb/configuration.py rename to dlt/destinations/impl/duckdb/configuration.py index 82ee325ed3..a5f77be8fd 100644 --- a/dlt/destinations/duckdb/configuration.py +++ b/dlt/destinations/impl/duckdb/configuration.py @@ -25,6 +25,7 @@ class DuckDbBaseCredentials(ConnectionStringCredentials): read_only: bool = False # open database read/write def borrow_conn(self, read_only: bool) -> Any: + # TODO: Can this be done in sql client instead? import duckdb if not hasattr(self, "_conn_lock"): @@ -95,6 +96,13 @@ class DuckDbCredentials(DuckDbBaseCredentials): __config_gen_annotations__: ClassVar[List[str]] = [] + def is_partial(self) -> bool: + partial = super().is_partial() + if partial: + return True + # Wait until pipeline context is set up before resolving + return self.database == ":pipeline:" + def on_resolved(self) -> None: # do not set any paths for external database if self.database == ":external:": @@ -126,8 +134,7 @@ def _path_in_pipeline(self, rel_path: str) -> str: if context.is_active(): # pipeline is active, get the working directory return os.path.join(context.pipeline().working_dir, rel_path) - return None - + raise RuntimeError("Attempting to use special duckdb database :pipeline: outside of pipeline context.") def _path_to_pipeline(self, abspath: str) -> None: from dlt.common.configuration.container import Container @@ -173,6 +180,9 @@ def _path_from_pipeline(self, default_path: str) -> Tuple[str, bool]: return default_path, True + def _conn_str(self) -> str: + return self.database + @configspec class DuckDbClientConfiguration(DestinationClientDwhWithStagingConfiguration): diff --git a/dlt/destinations/duckdb/duck.py b/dlt/destinations/impl/duckdb/duck.py similarity index 96% rename from dlt/destinations/duckdb/duck.py rename to dlt/destinations/impl/duckdb/duck.py index 4a2e54f2b6..6e6ec359fe 100644 --- a/dlt/destinations/duckdb/duck.py +++ b/dlt/destinations/impl/duckdb/duck.py @@ -12,9 +12,9 @@ from dlt.destinations.insert_job_client import InsertValuesJobClient -from dlt.destinations.duckdb import capabilities -from dlt.destinations.duckdb.sql_client import DuckDbSqlClient -from dlt.destinations.duckdb.configuration import DuckDbClientConfiguration +from dlt.destinations.impl.duckdb import capabilities +from dlt.destinations.impl.duckdb.sql_client import DuckDbSqlClient +from dlt.destinations.impl.duckdb.configuration import DuckDbClientConfiguration from dlt.destinations.type_mapping import TypeMapper diff --git a/dlt/destinations/impl/duckdb/factory.py b/dlt/destinations/impl/duckdb/factory.py new file mode 100644 index 0000000000..1b882c52a1 --- /dev/null +++ b/dlt/destinations/impl/duckdb/factory.py @@ -0,0 +1,41 @@ +import typing as t + +from dlt.common.destination import Destination, DestinationCapabilitiesContext +from dlt.destinations.impl.duckdb.configuration import DuckDbCredentials, DuckDbClientConfiguration +from dlt.destinations.impl.duckdb import capabilities + +if t.TYPE_CHECKING: + from duckdb import DuckDBPyConnection + from dlt.destinations.impl.duckdb.duck import DuckDbClient + + +class duckdb(Destination[DuckDbClientConfiguration, "DuckDbClient"]): + + spec = DuckDbClientConfiguration + + def capabilities(self) -> DestinationCapabilitiesContext: + return capabilities() + + @property + def client_class(self) -> t.Type["DuckDbClient"]: + from dlt.destinations.impl.duckdb.duck import DuckDbClient + + return DuckDbClient + + def __init__( + self, + credentials: t.Union[DuckDbCredentials, t.Dict[str, t.Any], str, "DuckDBPyConnection"] = None, + create_indexes: bool = False, + **kwargs: t.Any, + ) -> None: + """Configure the DuckDB destination to use in a pipeline. + + All arguments provided here supersede other configuration sources such as environment variables and dlt config files. + + Args: + credentials: Credentials to connect to the duckdb database. Can be an instance of `DuckDbCredentials` or + a path to a database file. Use `:memory:` to create an in-memory database. + create_indexes: Should unique indexes be created + **kwargs: Additional arguments passed to the destination config + """ + super().__init__(credentials=credentials, create_indexes=create_indexes, **kwargs) diff --git a/dlt/destinations/duckdb/sql_client.py b/dlt/destinations/impl/duckdb/sql_client.py similarity index 98% rename from dlt/destinations/duckdb/sql_client.py rename to dlt/destinations/impl/duckdb/sql_client.py index cd2160f676..cb4e1678a2 100644 --- a/dlt/destinations/duckdb/sql_client.py +++ b/dlt/destinations/impl/duckdb/sql_client.py @@ -8,8 +8,8 @@ from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame from dlt.destinations.sql_client import SqlClientBase, DBApiCursorImpl, raise_database_error, raise_open_connection_error -from dlt.destinations.duckdb import capabilities -from dlt.destinations.duckdb.configuration import DuckDbBaseCredentials +from dlt.destinations.impl.duckdb import capabilities +from dlt.destinations.impl.duckdb.configuration import DuckDbBaseCredentials class DuckDBDBApiCursorImpl(DBApiCursorImpl): diff --git a/dlt/destinations/dummy/__init__.py b/dlt/destinations/impl/dummy/__init__.py similarity index 60% rename from dlt/destinations/dummy/__init__.py rename to dlt/destinations/impl/dummy/__init__.py index 7131f0109a..476523cb8f 100644 --- a/dlt/destinations/dummy/__init__.py +++ b/dlt/destinations/impl/dummy/__init__.py @@ -1,12 +1,8 @@ -from typing import Type - -from dlt.common.schema.schema import Schema from dlt.common.configuration import with_config, known_sections from dlt.common.configuration.accessors import config from dlt.common.destination import DestinationCapabilitiesContext -from dlt.common.destination.reference import JobClientBase, DestinationClientConfiguration -from dlt.destinations.dummy.configuration import DummyClientConfiguration +from dlt.destinations.impl.dummy.configuration import DummyClientConfiguration @with_config(spec=DummyClientConfiguration, sections=(known_sections.DESTINATION, "dummy",)) @@ -30,14 +26,3 @@ def capabilities() -> DestinationCapabilitiesContext: caps.supports_ddl_transactions = False return caps - - -def client(schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> JobClientBase: - # import client when creating instance so capabilities and config specs can be accessed without dependencies installed - from dlt.destinations.dummy.dummy import DummyClient - - return DummyClient(schema, _configure(initial_config)) # type: ignore - - -def spec() -> Type[DestinationClientConfiguration]: - return DummyClientConfiguration diff --git a/dlt/destinations/dummy/configuration.py b/dlt/destinations/impl/dummy/configuration.py similarity index 100% rename from dlt/destinations/dummy/configuration.py rename to dlt/destinations/impl/dummy/configuration.py diff --git a/dlt/destinations/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py similarity index 97% rename from dlt/destinations/dummy/dummy.py rename to dlt/destinations/impl/dummy/dummy.py index 92827405ca..0bc061a7dd 100644 --- a/dlt/destinations/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -13,8 +13,8 @@ from dlt.destinations.exceptions import (LoadJobNotExistsException, LoadJobInvalidStateTransitionException, DestinationTerminalException, DestinationTransientException) -from dlt.destinations.dummy import capabilities -from dlt.destinations.dummy.configuration import DummyClientConfiguration +from dlt.destinations.impl.dummy import capabilities +from dlt.destinations.impl.dummy.configuration import DummyClientConfiguration class LoadDummyJob(LoadJob, FollowupJob): diff --git a/dlt/destinations/impl/dummy/factory.py b/dlt/destinations/impl/dummy/factory.py new file mode 100644 index 0000000000..265c77b0f4 --- /dev/null +++ b/dlt/destinations/impl/dummy/factory.py @@ -0,0 +1,30 @@ +import typing as t + +from dlt.common.destination import Destination, DestinationCapabilitiesContext + +from dlt.destinations.impl.dummy.configuration import DummyClientConfiguration, DummyClientCredentials +from dlt.destinations.impl.dummy import capabilities + +if t.TYPE_CHECKING: + from dlt.destinations.impl.dummy.dummy import DummyClient + + +class dummy(Destination[DummyClientConfiguration, "DummyClient"]): + + spec = DummyClientConfiguration + + def capabilities(self) -> DestinationCapabilitiesContext: + return capabilities() + + @property + def client_class(self) -> t.Type["DummyClient"]: + from dlt.destinations.impl.dummy.dummy import DummyClient + + return DummyClient + + def __init__( + self, + credentials: DummyClientCredentials = None, + **kwargs: t.Any, + ) -> None: + super().__init__(credentials=credentials, **kwargs) diff --git a/dlt/destinations/impl/filesystem/__init__.py b/dlt/destinations/impl/filesystem/__init__.py new file mode 100644 index 0000000000..12e83216cf --- /dev/null +++ b/dlt/destinations/impl/filesystem/__init__.py @@ -0,0 +1,5 @@ +from dlt.common.destination import DestinationCapabilitiesContext + + +def capabilities() -> DestinationCapabilitiesContext: + return DestinationCapabilitiesContext.generic_capabilities("jsonl") diff --git a/dlt/destinations/filesystem/configuration.py b/dlt/destinations/impl/filesystem/configuration.py similarity index 100% rename from dlt/destinations/filesystem/configuration.py rename to dlt/destinations/impl/filesystem/configuration.py diff --git a/dlt/destinations/impl/filesystem/factory.py b/dlt/destinations/impl/filesystem/factory.py new file mode 100644 index 0000000000..4e2a716d79 --- /dev/null +++ b/dlt/destinations/impl/filesystem/factory.py @@ -0,0 +1,50 @@ +import typing as t + +from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration +from dlt.destinations.impl.filesystem import capabilities +from dlt.common.destination import Destination, DestinationCapabilitiesContext +from dlt.common.storages.configuration import FileSystemCredentials + +if t.TYPE_CHECKING: + from dlt.destinations.impl.filesystem.filesystem import FilesystemClient + + +class filesystem(Destination[FilesystemDestinationClientConfiguration, "FilesystemClient"]): + + spec = FilesystemDestinationClientConfiguration + + def capabilities(self) -> DestinationCapabilitiesContext: + return capabilities() + + @property + def client_class(self) -> t.Type["FilesystemClient"]: + from dlt.destinations.impl.filesystem.filesystem import FilesystemClient + + return FilesystemClient + + def __init__( + self, + bucket_url: str = None, + credentials: t.Union[FileSystemCredentials, t.Dict[str, t.Any], t.Any] = None, + **kwargs: t.Any, + ) -> None: + """Configure the filesystem destination to use in a pipeline and load data to local or remote filesystem. + + All arguments provided here supersede other configuration sources such as environment variables and dlt config files. + + The `bucket_url` determines the protocol to be used: + + - Local folder: `file:///path/to/directory` + - AWS S3 (and S3 compatible storages): `s3://bucket-name + - Azure Blob Storage: `az://container-name + - Google Cloud Storage: `gs://bucket-name + - Memory fs: `memory://m` + + Args: + bucket_url: The fsspec compatible bucket url to use for the destination. + credentials: Credentials to connect to the filesystem. The type of credentials should correspond to + the bucket protocol. For example, for AWS S3, the credentials should be an instance of `AwsCredentials`. + A dictionary with the credentials parameters can also be provided. + **kwargs: Additional arguments passed to the destination config + """ + super().__init__(bucket_url=bucket_url, credentials=credentials, **kwargs) diff --git a/dlt/destinations/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py similarity index 98% rename from dlt/destinations/filesystem/filesystem.py rename to dlt/destinations/impl/filesystem/filesystem.py index 766f384024..fe349aac6b 100644 --- a/dlt/destinations/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -12,8 +12,8 @@ from dlt.common.destination.reference import NewLoadJob, TLoadJobState, LoadJob, JobClientBase, FollowupJob, WithStagingDataset from dlt.destinations.job_impl import EmptyLoadJob -from dlt.destinations.filesystem import capabilities -from dlt.destinations.filesystem.configuration import FilesystemDestinationClientConfiguration +from dlt.destinations.impl.filesystem import capabilities +from dlt.destinations.impl.filesystem.configuration import FilesystemDestinationClientConfiguration from dlt.destinations.job_impl import NewReferenceJob from dlt.destinations import path_utils diff --git a/dlt/destinations/motherduck/__init__.py b/dlt/destinations/impl/motherduck/__init__.py similarity index 51% rename from dlt/destinations/motherduck/__init__.py rename to dlt/destinations/impl/motherduck/__init__.py index eae67eaa74..74c0e36ef3 100644 --- a/dlt/destinations/motherduck/__init__.py +++ b/dlt/destinations/impl/motherduck/__init__.py @@ -1,20 +1,7 @@ -from typing import Type - -from dlt.common.schema.schema import Schema -from dlt.common.configuration import with_config, known_sections -from dlt.common.configuration.accessors import config from dlt.common.data_writers.escape import escape_postgres_identifier, escape_duckdb_literal from dlt.common.destination import DestinationCapabilitiesContext -from dlt.common.destination.reference import JobClientBase, DestinationClientConfiguration from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE -from dlt.destinations.motherduck.configuration import MotherDuckClientConfiguration - - -@with_config(spec=MotherDuckClientConfiguration, sections=(known_sections.DESTINATION, "motherduck",)) -def _configure(config: MotherDuckClientConfiguration = config.value) -> MotherDuckClientConfiguration: - return config - def capabilities() -> DestinationCapabilitiesContext: caps = DestinationCapabilitiesContext() @@ -35,14 +22,3 @@ def capabilities() -> DestinationCapabilitiesContext: caps.supports_truncate_command = False return caps - - -def client(schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> JobClientBase: - # import client when creating instance so capabilities and config specs can be accessed without dependencies installed - from dlt.destinations.motherduck.motherduck import MotherDuckClient - - return MotherDuckClient(schema, _configure(initial_config)) # type: ignore - - -def spec() -> Type[DestinationClientConfiguration]: - return MotherDuckClientConfiguration diff --git a/dlt/destinations/motherduck/configuration.py b/dlt/destinations/impl/motherduck/configuration.py similarity index 97% rename from dlt/destinations/motherduck/configuration.py rename to dlt/destinations/impl/motherduck/configuration.py index 18d480c945..a376f1a5aa 100644 --- a/dlt/destinations/motherduck/configuration.py +++ b/dlt/destinations/impl/motherduck/configuration.py @@ -7,7 +7,7 @@ from dlt.common.utils import digest128 from dlt.common.configuration.exceptions import ConfigurationValueError -from dlt.destinations.duckdb.configuration import DuckDbBaseCredentials +from dlt.destinations.impl.duckdb.configuration import DuckDbBaseCredentials MOTHERDUCK_DRIVERNAME = "md" diff --git a/dlt/destinations/impl/motherduck/factory.py b/dlt/destinations/impl/motherduck/factory.py new file mode 100644 index 0000000000..17cf4a76b4 --- /dev/null +++ b/dlt/destinations/impl/motherduck/factory.py @@ -0,0 +1,41 @@ +import typing as t + +from dlt.common.destination import Destination, DestinationCapabilitiesContext +from dlt.destinations.impl.motherduck.configuration import MotherDuckCredentials, MotherDuckClientConfiguration +from dlt.destinations.impl.motherduck import capabilities + +if t.TYPE_CHECKING: + from duckdb import DuckDBPyConnection + from dlt.destinations.impl.motherduck.motherduck import MotherDuckClient + + +class motherduck(Destination[MotherDuckClientConfiguration, "MotherDuckClient"]): + + spec = MotherDuckClientConfiguration + + def capabilities(self) -> DestinationCapabilitiesContext: + return capabilities() + + @property + def client_class(self) -> t.Type["MotherDuckClient"]: + from dlt.destinations.impl.motherduck.motherduck import MotherDuckClient + + return MotherDuckClient + + def __init__( + self, + credentials: t.Union[MotherDuckCredentials, str, t.Dict[str, t.Any], "DuckDBPyConnection"] = None, + create_indexes: bool = False, + **kwargs: t.Any, + ) -> None: + """Configure the MotherDuck destination to use in a pipeline. + + All arguments provided here supersede other configuration sources such as environment variables and dlt config files. + + Args: + credentials: Credentials to connect to the MotherDuck database. Can be an instance of `MotherDuckCredentials` or + a connection string in the format `md:///?token=` + create_indexes: Should unique indexes be created + **kwargs: Additional arguments passed to the destination config + """ + super().__init__(credentials=credentials, create_indexes=create_indexes, **kwargs) diff --git a/dlt/destinations/motherduck/motherduck.py b/dlt/destinations/impl/motherduck/motherduck.py similarity index 70% rename from dlt/destinations/motherduck/motherduck.py rename to dlt/destinations/impl/motherduck/motherduck.py index 93c0ed163b..9822f2b7b6 100644 --- a/dlt/destinations/motherduck/motherduck.py +++ b/dlt/destinations/impl/motherduck/motherduck.py @@ -4,10 +4,10 @@ from dlt.common.schema import Schema -from dlt.destinations.duckdb.duck import DuckDbClient -from dlt.destinations.motherduck import capabilities -from dlt.destinations.motherduck.sql_client import MotherDuckSqlClient -from dlt.destinations.motherduck.configuration import MotherDuckClientConfiguration +from dlt.destinations.impl.duckdb.duck import DuckDbClient +from dlt.destinations.impl.motherduck import capabilities +from dlt.destinations.impl.motherduck.sql_client import MotherDuckSqlClient +from dlt.destinations.impl.motherduck.configuration import MotherDuckClientConfiguration class MotherDuckClient(DuckDbClient): diff --git a/dlt/destinations/motherduck/sql_client.py b/dlt/destinations/impl/motherduck/sql_client.py similarity index 83% rename from dlt/destinations/motherduck/sql_client.py rename to dlt/destinations/impl/motherduck/sql_client.py index 2fc664a2e8..672c377fd9 100644 --- a/dlt/destinations/motherduck/sql_client.py +++ b/dlt/destinations/impl/motherduck/sql_client.py @@ -8,9 +8,9 @@ from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame from dlt.destinations.sql_client import SqlClientBase, DBApiCursorImpl, raise_database_error, raise_open_connection_error -from dlt.destinations.duckdb.sql_client import DuckDbSqlClient, DuckDBDBApiCursorImpl -from dlt.destinations.motherduck import capabilities -from dlt.destinations.motherduck.configuration import MotherDuckCredentials +from dlt.destinations.impl.duckdb.sql_client import DuckDbSqlClient, DuckDBDBApiCursorImpl +from dlt.destinations.impl.motherduck import capabilities +from dlt.destinations.impl.motherduck.configuration import MotherDuckCredentials class MotherDuckSqlClient(DuckDbSqlClient): diff --git a/dlt/destinations/mssql/README.md b/dlt/destinations/impl/mssql/README.md similarity index 100% rename from dlt/destinations/mssql/README.md rename to dlt/destinations/impl/mssql/README.md diff --git a/dlt/destinations/mssql/__init__.py b/dlt/destinations/impl/mssql/__init__.py similarity index 57% rename from dlt/destinations/mssql/__init__.py rename to dlt/destinations/impl/mssql/__init__.py index 56051a324e..40e971cacf 100644 --- a/dlt/destinations/mssql/__init__.py +++ b/dlt/destinations/impl/mssql/__init__.py @@ -1,21 +1,8 @@ -from typing import Type - -from dlt.common.schema.schema import Schema -from dlt.common.configuration import with_config, known_sections -from dlt.common.configuration.accessors import config from dlt.common.data_writers.escape import escape_postgres_identifier, escape_mssql_literal from dlt.common.destination import DestinationCapabilitiesContext -from dlt.common.destination.reference import JobClientBase, DestinationClientConfiguration from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE from dlt.common.wei import EVM_DECIMAL_PRECISION -from dlt.destinations.mssql.configuration import MsSqlClientConfiguration - - -@with_config(spec=MsSqlClientConfiguration, sections=(known_sections.DESTINATION, "mssql",)) -def _configure(config: MsSqlClientConfiguration = config.value) -> MsSqlClientConfiguration: - return config - def capabilities() -> DestinationCapabilitiesContext: caps = DestinationCapabilitiesContext() @@ -39,14 +26,3 @@ def capabilities() -> DestinationCapabilitiesContext: caps.timestamp_precision = 7 return caps - - -def client(schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> JobClientBase: - # import client when creating instance so capabilities and config specs can be accessed without dependencies installed - from dlt.destinations.mssql.mssql import MsSqlClient - - return MsSqlClient(schema, _configure(initial_config)) # type: ignore[arg-type] - - -def spec() -> Type[DestinationClientConfiguration]: - return MsSqlClientConfiguration diff --git a/dlt/destinations/mssql/configuration.py b/dlt/destinations/impl/mssql/configuration.py similarity index 100% rename from dlt/destinations/mssql/configuration.py rename to dlt/destinations/impl/mssql/configuration.py diff --git a/dlt/destinations/impl/mssql/factory.py b/dlt/destinations/impl/mssql/factory.py new file mode 100644 index 0000000000..c98531ca79 --- /dev/null +++ b/dlt/destinations/impl/mssql/factory.py @@ -0,0 +1,41 @@ +import typing as t + +from dlt.common.destination import Destination, DestinationCapabilitiesContext + +from dlt.destinations.impl.mssql.configuration import MsSqlCredentials, MsSqlClientConfiguration +from dlt.destinations.impl.mssql import capabilities + +if t.TYPE_CHECKING: + from dlt.destinations.impl.mssql.mssql import MsSqlClient + + +class mssql(Destination[MsSqlClientConfiguration, "MsSqlClient"]): + + spec = MsSqlClientConfiguration + + def capabilities(self) -> DestinationCapabilitiesContext: + return capabilities() + + @property + def client_class(self) -> t.Type["MsSqlClient"]: + from dlt.destinations.impl.mssql.mssql import MsSqlClient + + return MsSqlClient + + def __init__( + self, + credentials: t.Union[MsSqlCredentials, t.Dict[str, t.Any], str] = None, + create_indexes: bool = True, + **kwargs: t.Any, + ) -> None: + """Configure the MsSql destination to use in a pipeline. + + All arguments provided here supersede other configuration sources such as environment variables and dlt config files. + + Args: + credentials: Credentials to connect to the mssql database. Can be an instance of `MsSqlCredentials` or + a connection string in the format `mssql://user:password@host:port/database` + create_indexes: Should unique indexes be created + **kwargs: Additional arguments passed to the destination config + """ + super().__init__(credentials=credentials, create_indexes=create_indexes, **kwargs) diff --git a/dlt/destinations/mssql/mssql.py b/dlt/destinations/impl/mssql/mssql.py similarity index 97% rename from dlt/destinations/mssql/mssql.py rename to dlt/destinations/impl/mssql/mssql.py index cd999441ff..851122f20c 100644 --- a/dlt/destinations/mssql/mssql.py +++ b/dlt/destinations/impl/mssql/mssql.py @@ -12,9 +12,9 @@ from dlt.destinations.insert_job_client import InsertValuesJobClient -from dlt.destinations.mssql import capabilities -from dlt.destinations.mssql.sql_client import PyOdbcMsSqlClient -from dlt.destinations.mssql.configuration import MsSqlClientConfiguration +from dlt.destinations.impl.mssql import capabilities +from dlt.destinations.impl.mssql.sql_client import PyOdbcMsSqlClient +from dlt.destinations.impl.mssql.configuration import MsSqlClientConfiguration from dlt.destinations.sql_client import SqlClientBase from dlt.destinations.type_mapping import TypeMapper diff --git a/dlt/destinations/mssql/sql_client.py b/dlt/destinations/impl/mssql/sql_client.py similarity index 97% rename from dlt/destinations/mssql/sql_client.py rename to dlt/destinations/impl/mssql/sql_client.py index 4dd983a334..5372fa3626 100644 --- a/dlt/destinations/mssql/sql_client.py +++ b/dlt/destinations/impl/mssql/sql_client.py @@ -13,8 +13,8 @@ from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction from dlt.destinations.sql_client import DBApiCursorImpl, SqlClientBase, raise_database_error, raise_open_connection_error -from dlt.destinations.mssql.configuration import MsSqlCredentials -from dlt.destinations.mssql import capabilities +from dlt.destinations.impl.mssql.configuration import MsSqlCredentials +from dlt.destinations.impl.mssql import capabilities def handle_datetimeoffset(dto_value: bytes) -> datetime: diff --git a/dlt/destinations/postgres/README.md b/dlt/destinations/impl/postgres/README.md similarity index 100% rename from dlt/destinations/postgres/README.md rename to dlt/destinations/impl/postgres/README.md diff --git a/dlt/destinations/postgres/__init__.py b/dlt/destinations/impl/postgres/__init__.py similarity index 58% rename from dlt/destinations/postgres/__init__.py rename to dlt/destinations/impl/postgres/__init__.py index e8904c075f..009174ecc9 100644 --- a/dlt/destinations/postgres/__init__.py +++ b/dlt/destinations/impl/postgres/__init__.py @@ -1,20 +1,9 @@ -from typing import Type - -from dlt.common.schema.schema import Schema -from dlt.common.configuration import with_config, known_sections -from dlt.common.configuration.accessors import config from dlt.common.data_writers.escape import escape_postgres_identifier, escape_postgres_literal from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import JobClientBase, DestinationClientConfiguration from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE from dlt.common.wei import EVM_DECIMAL_PRECISION -from dlt.destinations.postgres.configuration import PostgresClientConfiguration - - -@with_config(spec=PostgresClientConfiguration, sections=(known_sections.DESTINATION, "postgres",)) -def _configure(config: PostgresClientConfiguration = config.value) -> PostgresClientConfiguration: - return config def capabilities() -> DestinationCapabilitiesContext: @@ -39,12 +28,3 @@ def capabilities() -> DestinationCapabilitiesContext: return caps -def client(schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> JobClientBase: - # import client when creating instance so capabilities and config specs can be accessed without dependencies installed - from dlt.destinations.postgres.postgres import PostgresClient - - return PostgresClient(schema, _configure(initial_config)) # type: ignore - - -def spec() -> Type[DestinationClientConfiguration]: - return PostgresClientConfiguration diff --git a/dlt/destinations/postgres/configuration.py b/dlt/destinations/impl/postgres/configuration.py similarity index 100% rename from dlt/destinations/postgres/configuration.py rename to dlt/destinations/impl/postgres/configuration.py diff --git a/dlt/destinations/impl/postgres/factory.py b/dlt/destinations/impl/postgres/factory.py new file mode 100644 index 0000000000..33971eb642 --- /dev/null +++ b/dlt/destinations/impl/postgres/factory.py @@ -0,0 +1,41 @@ +import typing as t + +from dlt.common.destination import Destination, DestinationCapabilitiesContext + +from dlt.destinations.impl.postgres.configuration import PostgresCredentials, PostgresClientConfiguration +from dlt.destinations.impl.postgres import capabilities + +if t.TYPE_CHECKING: + from dlt.destinations.impl.postgres.postgres import PostgresClient + + +class postgres(Destination[PostgresClientConfiguration, "PostgresClient"]): + + spec = PostgresClientConfiguration + + def capabilities(self) -> DestinationCapabilitiesContext: + return capabilities() + + @property + def client_class(self) -> t.Type["PostgresClient"]: + from dlt.destinations.impl.postgres.postgres import PostgresClient + + return PostgresClient + + def __init__( + self, + credentials: t.Union[PostgresCredentials, t.Dict[str, t.Any], str] = None, + create_indexes: bool = True, + **kwargs: t.Any, + ) -> None: + """Configure the Postgres destination to use in a pipeline. + + All arguments provided here supersede other configuration sources such as environment variables and dlt config files. + + Args: + credentials: Credentials to connect to the postgres database. Can be an instance of `PostgresCredentials` or + a connection string in the format `postgres://user:password@host:port/database` + create_indexes: Should unique indexes be created + **kwargs: Additional arguments passed to the destination config + """ + super().__init__(credentials=credentials, create_indexes=create_indexes, **kwargs) diff --git a/dlt/destinations/postgres/postgres.py b/dlt/destinations/impl/postgres/postgres.py similarity index 95% rename from dlt/destinations/postgres/postgres.py rename to dlt/destinations/impl/postgres/postgres.py index 2812d1d4c4..03c42f4d75 100644 --- a/dlt/destinations/postgres/postgres.py +++ b/dlt/destinations/impl/postgres/postgres.py @@ -11,9 +11,9 @@ from dlt.destinations.insert_job_client import InsertValuesJobClient -from dlt.destinations.postgres import capabilities -from dlt.destinations.postgres.sql_client import Psycopg2SqlClient -from dlt.destinations.postgres.configuration import PostgresClientConfiguration +from dlt.destinations.impl.postgres import capabilities +from dlt.destinations.impl.postgres.sql_client import Psycopg2SqlClient +from dlt.destinations.impl.postgres.configuration import PostgresClientConfiguration from dlt.destinations.sql_client import SqlClientBase from dlt.destinations.type_mapping import TypeMapper diff --git a/dlt/destinations/postgres/sql_client.py b/dlt/destinations/impl/postgres/sql_client.py similarity index 97% rename from dlt/destinations/postgres/sql_client.py rename to dlt/destinations/impl/postgres/sql_client.py index 079a0ae477..b6c4c1a1be 100644 --- a/dlt/destinations/postgres/sql_client.py +++ b/dlt/destinations/impl/postgres/sql_client.py @@ -16,8 +16,8 @@ from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction from dlt.destinations.sql_client import DBApiCursorImpl, SqlClientBase, raise_database_error, raise_open_connection_error -from dlt.destinations.postgres.configuration import PostgresCredentials -from dlt.destinations.postgres import capabilities +from dlt.destinations.impl.postgres.configuration import PostgresCredentials +from dlt.destinations.impl.postgres import capabilities class Psycopg2SqlClient(SqlClientBase["psycopg2.connection"], DBTransaction): diff --git a/dlt/destinations/impl/qdrant/__init__.py b/dlt/destinations/impl/qdrant/__init__.py new file mode 100644 index 0000000000..1a2c466b14 --- /dev/null +++ b/dlt/destinations/impl/qdrant/__init__.py @@ -0,0 +1,18 @@ +from dlt.common.destination import DestinationCapabilitiesContext +from dlt.destinations.impl.qdrant.qdrant_adapter import qdrant_adapter + + +def capabilities() -> DestinationCapabilitiesContext: + caps = DestinationCapabilitiesContext() + caps.preferred_loader_file_format = "jsonl" + caps.supported_loader_file_formats = ["jsonl"] + + caps.max_identifier_length = 200 + caps.max_column_identifier_length = 1024 + caps.max_query_length = 8 * 1024 * 1024 + caps.is_max_query_length_in_bytes = False + caps.max_text_data_type_length = 8 * 1024 * 1024 + caps.is_max_text_data_type_length_in_bytes = False + caps.supports_ddl_transactions = False + + return caps diff --git a/dlt/destinations/qdrant/configuration.py b/dlt/destinations/impl/qdrant/configuration.py similarity index 100% rename from dlt/destinations/qdrant/configuration.py rename to dlt/destinations/impl/qdrant/configuration.py diff --git a/dlt/destinations/impl/qdrant/factory.py b/dlt/destinations/impl/qdrant/factory.py new file mode 100644 index 0000000000..316b5ae434 --- /dev/null +++ b/dlt/destinations/impl/qdrant/factory.py @@ -0,0 +1,30 @@ +import typing as t + +from dlt.common.destination import Destination, DestinationCapabilitiesContext + +from dlt.destinations.impl.qdrant.configuration import QdrantCredentials, QdrantClientConfiguration +from dlt.destinations.impl.qdrant import capabilities + +if t.TYPE_CHECKING: + from dlt.destinations.impl.qdrant.qdrant_client import QdrantClient + + +class qdrant(Destination[QdrantClientConfiguration, "QdrantClient"]): + + spec = QdrantClientConfiguration + + def capabilities(self) -> DestinationCapabilitiesContext: + return capabilities() + + @property + def client_class(self) -> t.Type["QdrantClient"]: + from dlt.destinations.impl.qdrant.qdrant_client import QdrantClient + + return QdrantClient + + def __init__( + self, + credentials: t.Union[QdrantCredentials, t.Dict[str, t.Any]] = None, + **kwargs: t.Any, + ) -> None: + super().__init__(credentials=credentials, **kwargs) diff --git a/dlt/destinations/qdrant/qdrant_adapter.py b/dlt/destinations/impl/qdrant/qdrant_adapter.py similarity index 100% rename from dlt/destinations/qdrant/qdrant_adapter.py rename to dlt/destinations/impl/qdrant/qdrant_adapter.py diff --git a/dlt/destinations/qdrant/qdrant_client.py b/dlt/destinations/impl/qdrant/qdrant_client.py similarity index 98% rename from dlt/destinations/qdrant/qdrant_client.py rename to dlt/destinations/impl/qdrant/qdrant_client.py index cba87e9528..029530d624 100644 --- a/dlt/destinations/qdrant/qdrant_client.py +++ b/dlt/destinations/impl/qdrant/qdrant_client.py @@ -11,9 +11,9 @@ from dlt.destinations.job_impl import EmptyLoadJob from dlt.destinations.job_client_impl import StorageSchemaInfo, StateInfo -from dlt.destinations.qdrant import capabilities -from dlt.destinations.qdrant.configuration import QdrantClientConfiguration -from dlt.destinations.qdrant.qdrant_adapter import VECTORIZE_HINT +from dlt.destinations.impl.qdrant import capabilities +from dlt.destinations.impl.qdrant.configuration import QdrantClientConfiguration +from dlt.destinations.impl.qdrant.qdrant_adapter import VECTORIZE_HINT from qdrant_client import QdrantClient as QC, models from qdrant_client.qdrant_fastembed import uuid @@ -406,4 +406,4 @@ def _collection_exists(self, table_name: str, qualify_table_name: bool = True) - except UnexpectedResponse as e: if e.status_code == 404: return False - raise e \ No newline at end of file + raise e diff --git a/dlt/destinations/redshift/README.md b/dlt/destinations/impl/redshift/README.md similarity index 100% rename from dlt/destinations/redshift/README.md rename to dlt/destinations/impl/redshift/README.md diff --git a/dlt/destinations/redshift/__init__.py b/dlt/destinations/impl/redshift/__init__.py similarity index 52% rename from dlt/destinations/redshift/__init__.py rename to dlt/destinations/impl/redshift/__init__.py index 96741e86cd..8a8cae84b4 100644 --- a/dlt/destinations/redshift/__init__.py +++ b/dlt/destinations/impl/redshift/__init__.py @@ -1,20 +1,7 @@ -from typing import Type - -from dlt.common.schema.schema import Schema -from dlt.common.configuration import with_config, known_sections -from dlt.common.configuration.accessors import config from dlt.common.data_writers.escape import escape_redshift_identifier, escape_redshift_literal from dlt.common.destination import DestinationCapabilitiesContext -from dlt.common.destination.reference import JobClientBase, DestinationClientConfiguration from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE -from dlt.destinations.redshift.configuration import RedshiftClientConfiguration - - -@with_config(spec=RedshiftClientConfiguration, sections=(known_sections.DESTINATION, "redshift",)) -def _configure(config: RedshiftClientConfiguration = config.value) -> RedshiftClientConfiguration: - return config - def capabilities() -> DestinationCapabilitiesContext: caps = DestinationCapabilitiesContext() @@ -36,14 +23,3 @@ def capabilities() -> DestinationCapabilitiesContext: caps.alter_add_multi_column = False return caps - - -def client(schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> JobClientBase: - # import client when creating instance so capabilities and config specs can be accessed without dependencies installed - from dlt.destinations.redshift.redshift import RedshiftClient - - return RedshiftClient(schema, _configure(initial_config)) # type: ignore - - -def spec() -> Type[DestinationClientConfiguration]: - return RedshiftClientConfiguration diff --git a/dlt/destinations/redshift/configuration.py b/dlt/destinations/impl/redshift/configuration.py similarity index 88% rename from dlt/destinations/redshift/configuration.py rename to dlt/destinations/impl/redshift/configuration.py index 7cb13b996f..7018445773 100644 --- a/dlt/destinations/redshift/configuration.py +++ b/dlt/destinations/impl/redshift/configuration.py @@ -4,7 +4,7 @@ from dlt.common.configuration import configspec from dlt.common.utils import digest128 -from dlt.destinations.postgres.configuration import PostgresCredentials, PostgresClientConfiguration +from dlt.destinations.impl.postgres.configuration import PostgresCredentials, PostgresClientConfiguration @configspec diff --git a/dlt/destinations/impl/redshift/factory.py b/dlt/destinations/impl/redshift/factory.py new file mode 100644 index 0000000000..7648b35851 --- /dev/null +++ b/dlt/destinations/impl/redshift/factory.py @@ -0,0 +1,45 @@ +import typing as t + +from dlt.common.destination import Destination, DestinationCapabilitiesContext + +from dlt.destinations.impl.redshift.configuration import RedshiftCredentials, RedshiftClientConfiguration +from dlt.destinations.impl.redshift import capabilities + +if t.TYPE_CHECKING: + from dlt.destinations.impl.redshift.redshift import RedshiftClient + + +class redshift(Destination[RedshiftClientConfiguration, "RedshiftClient"]): + + spec = RedshiftClientConfiguration + + def capabilities(self) -> DestinationCapabilitiesContext: + return capabilities() + + @property + def client_class(self) -> t.Type["RedshiftClient"]: + from dlt.destinations.impl.redshift.redshift import RedshiftClient + + return RedshiftClient + + def __init__( + self, + credentials: t.Union[RedshiftCredentials, t.Dict[str, t.Any], str] = None, + create_indexes: bool = True, + staging_iam_role: t.Optional[str] = None, + **kwargs: t.Any, + ) -> None: + """Configure the Redshift destination to use in a pipeline. + + All arguments provided here supersede other configuration sources such as environment variables and dlt config files. + + Args: + credentials: Credentials to connect to the redshift database. Can be an instance of `RedshiftCredentials` or + a connection string in the format `redshift://user:password@host:port/database` + create_indexes: Should unique indexes be created + staging_iam_role: IAM role to use for staging data in S3 + **kwargs: Additional arguments passed to the destination config + """ + super().__init__( + credentials=credentials, create_indexes=create_indexes, staging_iam_role=staging_iam_role, **kwargs + ) diff --git a/dlt/destinations/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py similarity index 97% rename from dlt/destinations/redshift/redshift.py rename to dlt/destinations/impl/redshift/redshift.py index 888f27ae7c..2124807bc1 100644 --- a/dlt/destinations/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -1,7 +1,7 @@ import platform import os -from dlt.destinations.postgres.sql_client import Psycopg2SqlClient +from dlt.destinations.impl.postgres.sql_client import Psycopg2SqlClient from dlt.common.schema.utils import table_schema_has_type, table_schema_has_type_with_precision if platform.python_implementation() == "PyPy": @@ -25,8 +25,8 @@ from dlt.destinations.exceptions import DatabaseTerminalException, LoadJobTerminalException from dlt.destinations.job_client_impl import CopyRemoteFileLoadJob, LoadJob -from dlt.destinations.redshift import capabilities -from dlt.destinations.redshift.configuration import RedshiftClientConfiguration +from dlt.destinations.impl.redshift import capabilities +from dlt.destinations.impl.redshift.configuration import RedshiftClientConfiguration from dlt.destinations.job_impl import NewReferenceJob from dlt.destinations.sql_client import SqlClientBase from dlt.destinations.type_mapping import TypeMapper diff --git a/dlt/destinations/snowflake/__init__.py b/dlt/destinations/impl/snowflake/__init__.py similarity index 52% rename from dlt/destinations/snowflake/__init__.py rename to dlt/destinations/impl/snowflake/__init__.py index 5d32bc41fd..12e118eeab 100644 --- a/dlt/destinations/snowflake/__init__.py +++ b/dlt/destinations/impl/snowflake/__init__.py @@ -1,20 +1,8 @@ -from typing import Type from dlt.common.data_writers.escape import escape_bigquery_identifier - -from dlt.common.schema.schema import Schema -from dlt.common.configuration import with_config, known_sections -from dlt.common.configuration.accessors import config from dlt.common.destination import DestinationCapabilitiesContext -from dlt.common.destination.reference import JobClientBase, DestinationClientConfiguration from dlt.common.data_writers.escape import escape_snowflake_identifier from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE -from dlt.destinations.snowflake.configuration import SnowflakeClientConfiguration - - -@with_config(spec=SnowflakeClientConfiguration, sections=(known_sections.DESTINATION, "snowflake",)) -def _configure(config: SnowflakeClientConfiguration = config.value) -> SnowflakeClientConfiguration: - return config def capabilities() -> DestinationCapabilitiesContext: @@ -35,14 +23,3 @@ def capabilities() -> DestinationCapabilitiesContext: caps.supports_ddl_transactions = True caps.alter_add_multi_column = True return caps - - -def client(schema: Schema, initial_config: DestinationClientConfiguration = config.value) -> JobClientBase: - # import client when creating instance so capabilities and config specs can be accessed without dependencies installed - from dlt.destinations.snowflake.snowflake import SnowflakeClient - - return SnowflakeClient(schema, _configure(initial_config)) # type: ignore - - -def spec() -> Type[DestinationClientConfiguration]: - return SnowflakeClientConfiguration diff --git a/dlt/destinations/snowflake/configuration.py b/dlt/destinations/impl/snowflake/configuration.py similarity index 100% rename from dlt/destinations/snowflake/configuration.py rename to dlt/destinations/impl/snowflake/configuration.py diff --git a/dlt/destinations/impl/snowflake/factory.py b/dlt/destinations/impl/snowflake/factory.py new file mode 100644 index 0000000000..1201f406b0 --- /dev/null +++ b/dlt/destinations/impl/snowflake/factory.py @@ -0,0 +1,41 @@ +import typing as t + +from dlt.destinations.impl.snowflake.configuration import SnowflakeCredentials, SnowflakeClientConfiguration +from dlt.destinations.impl.snowflake import capabilities +from dlt.common.destination import Destination, DestinationCapabilitiesContext + +if t.TYPE_CHECKING: + from dlt.destinations.impl.snowflake.snowflake import SnowflakeClient + + +class snowflake(Destination[SnowflakeClientConfiguration, "SnowflakeClient"]): + + spec = SnowflakeClientConfiguration + + def capabilities(self) -> DestinationCapabilitiesContext: + return capabilities() + + @property + def client_class(self) -> t.Type["SnowflakeClient"]: + from dlt.destinations.impl.snowflake.snowflake import SnowflakeClient + + return SnowflakeClient + + def __init__( + self, + credentials: t.Union[SnowflakeCredentials, t.Dict[str, t.Any], str] = None, + stage_name: t.Optional[str] = None, + keep_staged_files: bool = True, + **kwargs: t.Any, + ) -> None: + """Configure the Snowflake destination to use in a pipeline. + + All arguments provided here supersede other configuration sources such as environment variables and dlt config files. + + Args: + credentials: Credentials to connect to the snowflake database. Can be an instance of `SnowflakeCredentials` or + a connection string in the format `snowflake://user:password@host:port/database` + stage_name: Name of an existing stage to use for loading data. Default uses implicit stage per table + keep_staged_files: Whether to delete or keep staged files after loading + """ + super().__init__(credentials=credentials, stage_name=stage_name, keep_staged_files=keep_staged_files, **kwargs) diff --git a/dlt/destinations/snowflake/snowflake.py b/dlt/destinations/impl/snowflake/snowflake.py similarity index 97% rename from dlt/destinations/snowflake/snowflake.py rename to dlt/destinations/impl/snowflake/snowflake.py index f433ec7e7d..ead3e810d2 100644 --- a/dlt/destinations/snowflake/snowflake.py +++ b/dlt/destinations/impl/snowflake/snowflake.py @@ -14,11 +14,11 @@ from dlt.destinations.job_impl import EmptyLoadJob from dlt.destinations.exceptions import LoadJobTerminalException -from dlt.destinations.snowflake import capabilities -from dlt.destinations.snowflake.configuration import SnowflakeClientConfiguration -from dlt.destinations.snowflake.sql_client import SnowflakeSqlClient +from dlt.destinations.impl.snowflake import capabilities +from dlt.destinations.impl.snowflake.configuration import SnowflakeClientConfiguration +from dlt.destinations.impl.snowflake.sql_client import SnowflakeSqlClient from dlt.destinations.sql_jobs import SqlStagingCopyJob, SqlJobParams -from dlt.destinations.snowflake.sql_client import SnowflakeSqlClient +from dlt.destinations.impl.snowflake.sql_client import SnowflakeSqlClient from dlt.destinations.job_impl import NewReferenceJob from dlt.destinations.sql_client import SqlClientBase from dlt.destinations.type_mapping import TypeMapper diff --git a/dlt/destinations/snowflake/sql_client.py b/dlt/destinations/impl/snowflake/sql_client.py similarity index 98% rename from dlt/destinations/snowflake/sql_client.py rename to dlt/destinations/impl/snowflake/sql_client.py index 40cdc990a0..139a5ebb7a 100644 --- a/dlt/destinations/snowflake/sql_client.py +++ b/dlt/destinations/impl/snowflake/sql_client.py @@ -7,8 +7,8 @@ from dlt.destinations.exceptions import DatabaseTerminalException, DatabaseTransientException, DatabaseUndefinedRelation from dlt.destinations.sql_client import DBApiCursorImpl, SqlClientBase, raise_database_error, raise_open_connection_error from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame -from dlt.destinations.snowflake.configuration import SnowflakeCredentials -from dlt.destinations.snowflake import capabilities +from dlt.destinations.impl.snowflake.configuration import SnowflakeCredentials +from dlt.destinations.impl.snowflake import capabilities class SnowflakeCursorImpl(DBApiCursorImpl): native_cursor: snowflake_lib.cursor.SnowflakeCursor # type: ignore[assignment] diff --git a/dlt/destinations/weaviate/README.md b/dlt/destinations/impl/weaviate/README.md similarity index 100% rename from dlt/destinations/weaviate/README.md rename to dlt/destinations/impl/weaviate/README.md diff --git a/dlt/destinations/impl/weaviate/__init__.py b/dlt/destinations/impl/weaviate/__init__.py new file mode 100644 index 0000000000..143e0260d2 --- /dev/null +++ b/dlt/destinations/impl/weaviate/__init__.py @@ -0,0 +1,19 @@ +from dlt.common.destination import DestinationCapabilitiesContext +from dlt.destinations.impl.weaviate.weaviate_adapter import weaviate_adapter + + +def capabilities() -> DestinationCapabilitiesContext: + caps = DestinationCapabilitiesContext() + caps.preferred_loader_file_format = "jsonl" + caps.supported_loader_file_formats = ["jsonl"] + + caps.max_identifier_length = 200 + caps.max_column_identifier_length = 1024 + caps.max_query_length = 8 * 1024 * 1024 + caps.is_max_query_length_in_bytes = False + caps.max_text_data_type_length = 8 * 1024 * 1024 + caps.is_max_text_data_type_length_in_bytes = False + caps.supports_ddl_transactions = False + caps.naming_convention = "dlt.destinations.impl.weaviate.naming" + + return caps diff --git a/dlt/destinations/weaviate/ci_naming.py b/dlt/destinations/impl/weaviate/ci_naming.py similarity index 100% rename from dlt/destinations/weaviate/ci_naming.py rename to dlt/destinations/impl/weaviate/ci_naming.py diff --git a/dlt/destinations/weaviate/configuration.py b/dlt/destinations/impl/weaviate/configuration.py similarity index 100% rename from dlt/destinations/weaviate/configuration.py rename to dlt/destinations/impl/weaviate/configuration.py diff --git a/dlt/destinations/weaviate/exceptions.py b/dlt/destinations/impl/weaviate/exceptions.py similarity index 100% rename from dlt/destinations/weaviate/exceptions.py rename to dlt/destinations/impl/weaviate/exceptions.py diff --git a/dlt/destinations/impl/weaviate/factory.py b/dlt/destinations/impl/weaviate/factory.py new file mode 100644 index 0000000000..b29d02b1a7 --- /dev/null +++ b/dlt/destinations/impl/weaviate/factory.py @@ -0,0 +1,47 @@ +import typing as t + +from dlt.common.destination import Destination, DestinationCapabilitiesContext + +from dlt.destinations.impl.weaviate.configuration import WeaviateCredentials, WeaviateClientConfiguration +from dlt.destinations.impl.weaviate import capabilities + +if t.TYPE_CHECKING: + from dlt.destinations.impl.weaviate.weaviate_client import WeaviateClient + + +class weaviate(Destination[WeaviateClientConfiguration, "WeaviateClient"]): + + spec = WeaviateClientConfiguration + + def capabilities(self) -> DestinationCapabilitiesContext: + return capabilities() + + @property + def client_class(self) -> t.Type["WeaviateClient"]: + from dlt.destinations.impl.weaviate.weaviate_client import WeaviateClient + + return WeaviateClient + + def __init__( + self, + credentials: t.Union[WeaviateCredentials, t.Dict[str, t.Any]] = None, + vectorizer: str = None, + module_config: t.Dict[str, t.Dict[str, str]] = None, + **kwargs: t.Any, + ) -> None: + """Configure the Weaviate destination to use in a pipeline. + + All destination config parameters can be provided as arguments here and will supersede other config sources (such as dlt config files and environment variables). + + Args: + credentials: Weaviate credentials containing URL, API key and optional headers + vectorizer: The name of the Weaviate vectorizer to use + module_config: The configuration for the Weaviate modules + **kwargs: Additional arguments forwarded to the destination config + """ + super().__init__( + credentials=credentials, + vectorizer=vectorizer, + module_config=module_config, + **kwargs + ) diff --git a/dlt/destinations/weaviate/naming.py b/dlt/destinations/impl/weaviate/naming.py similarity index 100% rename from dlt/destinations/weaviate/naming.py rename to dlt/destinations/impl/weaviate/naming.py diff --git a/dlt/destinations/weaviate/weaviate_adapter.py b/dlt/destinations/impl/weaviate/weaviate_adapter.py similarity index 100% rename from dlt/destinations/weaviate/weaviate_adapter.py rename to dlt/destinations/impl/weaviate/weaviate_adapter.py diff --git a/dlt/destinations/weaviate/weaviate_client.py b/dlt/destinations/impl/weaviate/weaviate_client.py similarity index 98% rename from dlt/destinations/weaviate/weaviate_client.py rename to dlt/destinations/impl/weaviate/weaviate_client.py index d47f08ab59..099cdc7368 100644 --- a/dlt/destinations/weaviate/weaviate_client.py +++ b/dlt/destinations/impl/weaviate/weaviate_client.py @@ -41,13 +41,13 @@ from dlt.common.data_types import TDataType from dlt.common.storages import FileStorage -from dlt.destinations.weaviate.weaviate_adapter import VECTORIZE_HINT, TOKENIZATION_HINT +from dlt.destinations.impl.weaviate.weaviate_adapter import VECTORIZE_HINT, TOKENIZATION_HINT from dlt.destinations.job_impl import EmptyLoadJob from dlt.destinations.job_client_impl import StorageSchemaInfo, StateInfo -from dlt.destinations.weaviate import capabilities -from dlt.destinations.weaviate.configuration import WeaviateClientConfiguration -from dlt.destinations.weaviate.exceptions import PropertyNameConflict, WeaviateBatchError +from dlt.destinations.impl.weaviate import capabilities +from dlt.destinations.impl.weaviate.configuration import WeaviateClientConfiguration +from dlt.destinations.impl.weaviate.exceptions import PropertyNameConflict, WeaviateBatchError from dlt.destinations.type_mapping import TypeMapper diff --git a/dlt/destinations/qdrant/__init__.py b/dlt/destinations/qdrant/__init__.py deleted file mode 100644 index 7a8619ffcd..0000000000 --- a/dlt/destinations/qdrant/__init__.py +++ /dev/null @@ -1,53 +0,0 @@ -from typing import Type - -from dlt.common.schema.schema import Schema -from dlt.common.configuration import with_config, known_sections -from dlt.common.configuration.accessors import config -from dlt.common.destination.reference import ( - JobClientBase, - DestinationClientConfiguration, -) -from dlt.common.destination import DestinationCapabilitiesContext -from dlt.destinations.qdrant.qdrant_adapter import qdrant_adapter - -from dlt.destinations.qdrant.configuration import QdrantClientConfiguration - - -@with_config( - spec=QdrantClientConfiguration, - sections=( - known_sections.DESTINATION, - "qdrant", - ), -) -def _configure( - config: QdrantClientConfiguration = config.value, -) -> QdrantClientConfiguration: - return config - - -def capabilities() -> DestinationCapabilitiesContext: - caps = DestinationCapabilitiesContext() - caps.preferred_loader_file_format = "jsonl" - caps.supported_loader_file_formats = ["jsonl"] - - caps.max_identifier_length = 200 - caps.max_column_identifier_length = 1024 - caps.max_query_length = 8 * 1024 * 1024 - caps.is_max_query_length_in_bytes = False - caps.max_text_data_type_length = 8 * 1024 * 1024 - caps.is_max_text_data_type_length_in_bytes = False - caps.supports_ddl_transactions = False - - return caps - - -def client( - schema: Schema, initial_config: DestinationClientConfiguration = config.value -) -> JobClientBase: - from dlt.destinations.qdrant.qdrant_client import QdrantClient - return QdrantClient(schema, _configure(initial_config)) # type: ignore - - -def spec() -> Type[QdrantClientConfiguration]: - return QdrantClientConfiguration diff --git a/dlt/destinations/weaviate/__init__.py b/dlt/destinations/weaviate/__init__.py deleted file mode 100644 index ebd87aea0c..0000000000 --- a/dlt/destinations/weaviate/__init__.py +++ /dev/null @@ -1,55 +0,0 @@ -from typing import Type - -from dlt.common.schema.schema import Schema -from dlt.common.configuration import with_config, known_sections -from dlt.common.configuration.accessors import config -from dlt.common.destination.reference import ( - JobClientBase, - DestinationClientConfiguration, -) -from dlt.common.destination import DestinationCapabilitiesContext - -from dlt.destinations.weaviate.weaviate_adapter import weaviate_adapter -from dlt.destinations.weaviate.configuration import WeaviateClientConfiguration - - -@with_config( - spec=WeaviateClientConfiguration, - sections=( - known_sections.DESTINATION, - "weaviate", - ), -) -def _configure( - config: WeaviateClientConfiguration = config.value, -) -> WeaviateClientConfiguration: - return config - - -def capabilities() -> DestinationCapabilitiesContext: - caps = DestinationCapabilitiesContext() - caps.preferred_loader_file_format = "jsonl" - caps.supported_loader_file_formats = ["jsonl"] - - caps.max_identifier_length = 200 - caps.max_column_identifier_length = 1024 - caps.max_query_length = 8 * 1024 * 1024 - caps.is_max_query_length_in_bytes = False - caps.max_text_data_type_length = 8 * 1024 * 1024 - caps.is_max_text_data_type_length_in_bytes = False - caps.supports_ddl_transactions = False - caps.naming_convention = "dlt.destinations.weaviate.naming" - - return caps - - -def client( - schema: Schema, initial_config: DestinationClientConfiguration = config.value -) -> JobClientBase: - from dlt.destinations.weaviate.weaviate_client import WeaviateClient - - return WeaviateClient(schema, _configure(initial_config)) # type: ignore - - -def spec() -> Type[WeaviateClientConfiguration]: - return WeaviateClientConfiguration diff --git a/dlt/helpers/streamlit_helper.py b/dlt/helpers/streamlit_helper.py index 7921e4e2e1..e43e794bf6 100644 --- a/dlt/helpers/streamlit_helper.py +++ b/dlt/helpers/streamlit_helper.py @@ -120,7 +120,7 @@ def _query_data_live(query: str, schema_name: str = None) -> pd.DataFrame: schema_names = ", ".join(sorted(pipeline.schema_names)) st.markdown(f""" * pipeline name: **{pipeline.pipeline_name}** - * destination: **{str(credentials)}** in **{pipeline.destination.__name__}** + * destination: **{str(credentials)}** in **{pipeline.destination.name}** * dataset name: **{pipeline.dataset_name}** * default schema name: **{pipeline.default_schema_name}** * all schema names: **{schema_names}** diff --git a/dlt/load/load.py b/dlt/load/load.py index beae130789..725f8589f5 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -20,7 +20,7 @@ from dlt.common.schema import Schema, TSchemaTables from dlt.common.schema.typing import TTableSchema, TWriteDisposition from dlt.common.storages import LoadStorage -from dlt.common.destination.reference import DestinationClientDwhConfiguration, FollowupJob, JobClientBase, WithStagingDataset, DestinationReference, LoadJob, NewLoadJob, TLoadJobState, DestinationClientConfiguration, SupportsStagingDestination +from dlt.common.destination.reference import DestinationClientDwhConfiguration, FollowupJob, JobClientBase, WithStagingDataset, Destination, LoadJob, NewLoadJob, TLoadJobState, DestinationClientConfiguration, SupportsStagingDestination, TDestination from dlt.destinations.job_impl import EmptyLoadJob @@ -34,8 +34,8 @@ class Load(Runnable[Executor]): @with_config(spec=LoaderConfiguration, sections=(known_sections.LOAD,)) def __init__( self, - destination: DestinationReference, - staging_destination: DestinationReference = None, + destination: TDestination, + staging_destination: TDestination = None, collector: Collector = NULL_COLLECTOR, is_storage_owner: bool = False, config: LoaderConfiguration = config.value, @@ -54,7 +54,6 @@ def __init__( self._processed_load_ids: Dict[str, str] = {} """Load ids to dataset name""" - def create_storage(self, is_storage_owner: bool) -> LoadStorage: supported_file_formats = self.capabilities.supported_loader_file_formats if self.staging_destination: diff --git a/dlt/pipeline/__init__.py b/dlt/pipeline/__init__.py index 71c37c40ba..af7dd12294 100644 --- a/dlt/pipeline/__init__.py +++ b/dlt/pipeline/__init__.py @@ -7,12 +7,13 @@ from dlt.common.configuration import with_config from dlt.common.configuration.container import Container from dlt.common.configuration.inject import get_orig_args, last_config -from dlt.common.destination.reference import DestinationReference, TDestinationReferenceArg +from dlt.common.destination import Destination, TDestinationReferenceArg from dlt.common.pipeline import LoadInfo, PipelineContext, get_dlt_pipelines_dir from dlt.pipeline.configuration import PipelineConfiguration, ensure_correct_pipeline_kwargs from dlt.pipeline.pipeline import Pipeline from dlt.pipeline.progress import _from_name as collector_from_name, TCollectorArg, _NULL_COLLECTOR +from dlt.pipeline.deprecations import credentials_argument_deprecated @overload @@ -104,6 +105,8 @@ def pipeline( # is any of the arguments different from defaults has_arguments = bool(orig_args[0]) or any(orig_args[1].values()) + credentials_argument_deprecated("pipeline", credentials, destination) + if not has_arguments: context = Container()[PipelineContext] # if pipeline instance is already active then return it, otherwise create a new one @@ -116,8 +119,8 @@ def pipeline( if not pipelines_dir: pipelines_dir = get_dlt_pipelines_dir() - destination = DestinationReference.from_name(destination or kwargs["destination_name"]) - staging = DestinationReference.from_name(staging or kwargs.get("staging_name", None)) if staging is not None else None + destination = Destination.from_reference(destination or kwargs["destination_name"]) + staging = Destination.from_reference(staging or kwargs.get("staging_name", None)) if staging is not None else None progress = collector_from_name(progress) # create new pipeline instance @@ -224,7 +227,7 @@ def run( Returns: LoadInfo: Information on loaded data including the list of package ids and failed job statuses. Please not that `dlt` will not raise if a single job terminally fails. Such information is provided via LoadInfo. """ - destination = DestinationReference.from_name(destination) + destination = Destination.from_reference(destination, credentials=credentials) return pipeline().run( data, destination=destination, diff --git a/dlt/pipeline/deprecations.py b/dlt/pipeline/deprecations.py new file mode 100644 index 0000000000..138167c8d3 --- /dev/null +++ b/dlt/pipeline/deprecations.py @@ -0,0 +1,20 @@ +import typing as t +import warnings + +from dlt.common.destination import Destination, TDestinationReferenceArg + + +def credentials_argument_deprecated( + caller_name: str, credentials: t.Optional[t.Any], destination: TDestinationReferenceArg = None +) -> None: + if credentials is None: + return + + dest_name = Destination.to_name(destination) if destination else "postgres" + + warnings.warn( + f"The `credentials argument` to {caller_name} is deprecated and will be removed in a future version. " + f"Pass the same credentials to the `destination` instance instead, e.g. {caller_name}(destination=dlt.destinations.{dest_name}(credentials=...))", + DeprecationWarning, + stacklevel=2, + ) diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index b948ad8040..465eccfdb6 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -24,8 +24,8 @@ from dlt.common.typing import TFun, TSecretValue, is_optional_type from dlt.common.runners import pool_runner as runner from dlt.common.storages import LiveSchemaStorage, NormalizeStorage, LoadStorage, SchemaStorage, FileStorage, NormalizeStorageConfiguration, SchemaStorageConfiguration, LoadStorageConfiguration -from dlt.common.destination import DestinationCapabilitiesContext -from dlt.common.destination.reference import (DestinationClientDwhConfiguration, WithStateSync, DestinationReference, JobClientBase, DestinationClientConfiguration, +from dlt.common.destination import DestinationCapabilitiesContext, TDestination +from dlt.common.destination.reference import (DestinationClientDwhConfiguration, WithStateSync, Destination, JobClientBase, DestinationClientConfiguration, TDestinationReferenceArg, DestinationClientStagingConfiguration, DestinationClientStagingConfiguration, DestinationClientDwhWithStagingConfiguration) from dlt.common.destination.capabilities import INTERNAL_LOADER_FILE_FORMATS @@ -52,6 +52,7 @@ from dlt.pipeline.state_sync import STATE_ENGINE_VERSION, load_state_from_destination, merge_state_if_changed, migrate_state, state_resource, json_encode_state, json_decode_state from dlt.common.schema.utils import normalize_schema_name +from dlt.pipeline.deprecations import credentials_argument_deprecated def with_state_sync(may_extract_state: bool = False) -> Callable[[TFun], TFun]: @@ -166,9 +167,9 @@ class Pipeline(SupportsPipeline): """A directory where the pipelines' working directories are created""" working_dir: str """A working directory of the pipeline""" - destination: DestinationReference = None - staging: DestinationReference = None - """The destination reference which is ModuleType. `destination.__name__` returns the name string""" + destination: TDestination = None + staging: TDestination = None + """The destination reference which is ModuleType. `destination.name` returns the name string""" dataset_name: str = None """Name of the dataset to which pipeline will be loaded to""" credentials: Any = None @@ -183,8 +184,8 @@ def __init__( pipeline_name: str, pipelines_dir: str, pipeline_salt: TSecretValue, - destination: DestinationReference, - staging: DestinationReference, + destination: TDestination, + staging: TDestination, dataset_name: str, credentials: Any, import_schema_path: str, @@ -342,6 +343,9 @@ def load( # set destination and default dataset if provided self._set_destinations(destination, None) self._set_dataset_name(dataset_name) + + credentials_argument_deprecated("pipeline.load", credentials, destination) + self.credentials = credentials or self.credentials # check if any schema is present, if not then no data was extracted @@ -449,6 +453,8 @@ def run( self._set_destinations(destination, staging) self._set_dataset_name(dataset_name) + credentials_argument_deprecated("pipeline.run", credentials, self.destination) + # sync state with destination if self.config.restore_from_destination and not self.full_refresh and not self._state_restored and (self.destination or destination): self.sync_destination(destination, staging, dataset_name) @@ -732,7 +738,7 @@ def _sql_job_client(self, schema: Schema, credentials: Any = None) -> SqlJobClie if isinstance(client, SqlJobClientBase): return client else: - raise SqlClientNotAvailable(self.pipeline_name, self.destination.__name__) + raise SqlClientNotAvailable(self.pipeline_name, self.destination.name) def _get_normalize_storage(self) -> NormalizeStorage: return NormalizeStorage(True, self._normalize_storage_config) @@ -893,7 +899,7 @@ def _extract_source(self, storage: ExtractorStorage, source: DltSource, max_para return extract_id - def _get_destination_client_initial_config(self, destination: DestinationReference = None, credentials: Any = None, as_staging: bool = False) -> DestinationClientConfiguration: + def _get_destination_client_initial_config(self, destination: TDestination = None, credentials: Any = None, as_staging: bool = False) -> DestinationClientConfiguration: destination = destination or self.destination if not destination: raise PipelineConfigMissing( @@ -903,7 +909,7 @@ def _get_destination_client_initial_config(self, destination: DestinationReferen "Please provide `destination` argument to `pipeline`, `run` or `load` method directly or via .dlt config.toml file or environment variable." ) # create initial destination client config - client_spec = destination.spec() + client_spec = destination.spec # initialize explicit credentials if not as_staging: # explicit credentials passed to dlt.pipeline should not be applied to staging @@ -999,17 +1005,19 @@ def _set_context(self, is_active: bool) -> None: del self._container[DestinationCapabilitiesContext] def _set_destinations(self, destination: TDestinationReferenceArg, staging: TDestinationReferenceArg) -> None: - destination_mod = DestinationReference.from_name(destination) - self.destination = destination_mod or self.destination + # destination_mod = DestinationReference.from_name(destination) + if destination: + self.destination = Destination.from_reference(destination) if destination and not self.destination.capabilities().supported_loader_file_formats and not staging: - logger.warning(f"The destination {destination_mod.__name__} requires the filesystem staging destination to be set, but it was not provided. Setting it to 'filesystem'.") + logger.warning(f"The destination {self.destination.name} requires the filesystem staging destination to be set, but it was not provided. Setting it to 'filesystem'.") staging = "filesystem" if staging: - staging_module = DestinationReference.from_name(staging) - if staging_module and not issubclass(staging_module.spec(), DestinationClientStagingConfiguration): - raise DestinationNoStagingMode(staging_module.__name__) + # staging_module = DestinationReference.from_name(staging) + staging_module = Destination.from_reference(staging) + if staging_module and not issubclass(staging_module.spec, DestinationClientStagingConfiguration): + raise DestinationNoStagingMode(staging_module.name) self.staging = staging_module or self.staging with self._maybe_destination_capabilities(): @@ -1028,8 +1036,10 @@ def _maybe_destination_capabilities(self, loader_file_format: TLoaderFileFormat caps = injected_caps.__enter__() caps.preferred_loader_file_format = self._resolve_loader_file_format( - DestinationReference.to_name(self.destination), - DestinationReference.to_name(self.staging) if self.staging else None, + self.destination.name, + # DestinationReference.to_name(self.destination), + self.staging.name if self.staging else None, + # DestinationReference.to_name(self.staging) if self.staging else None, destination_caps, stage_caps, loader_file_format) caps.supported_loader_file_formats = ( destination_caps.supported_staging_file_formats if stage_caps else None @@ -1157,12 +1167,12 @@ def _restore_state_from_destination(self) -> Optional[TPipelineState]: if isinstance(job_client, WithStateSync): state = load_state_from_destination(self.pipeline_name, job_client) if state is None: - logger.info(f"The state was not found in the destination {self.destination.__name__}:{dataset_name}") + logger.info(f"The state was not found in the destination {self.destination.name}:{dataset_name}") else: - logger.info(f"The state was restored from the destination {self.destination.__name__}:{dataset_name}") + logger.info(f"The state was restored from the destination {self.destination.name}:{dataset_name}") else: state = None - logger.info(f"Destination does not support metadata storage {self.destination.__name__}:{dataset_name}") + logger.info(f"Destination does not support metadata storage {self.destination.name}:{dataset_name}") return state finally: # restore the use_single_dataset option @@ -1177,17 +1187,17 @@ def _get_schemas_from_destination(self, schema_names: Sequence[str], always_down if not self._schema_storage.has_schema(schema.name) or always_download: with self._get_destination_clients(schema)[0] as job_client: if not isinstance(job_client, WithStateSync): - logger.info(f"Destination does not support metadata storage {self.destination.__name__}") + logger.info(f"Destination does not support metadata storage {self.destination.name}") return restored_schemas schema_info = job_client.get_stored_schema() if schema_info is None: - logger.info(f"The schema {schema.name} was not found in the destination {self.destination.__name__}:{self.dataset_name}") + logger.info(f"The schema {schema.name} was not found in the destination {self.destination.name}:{self.dataset_name}") # try to import schema with contextlib.suppress(FileNotFoundError): self._schema_storage.load_schema(schema.name) else: schema = Schema.from_dict(json.loads(schema_info.schema)) - logger.info(f"The schema {schema.name} version {schema.version} hash {schema.stored_version_hash} was restored from the destination {self.destination.__name__}:{self.dataset_name}") + logger.info(f"The schema {schema.name} version {schema.version} hash {schema.stored_version_hash} was restored from the destination {self.destination.name}:{self.dataset_name}") restored_schemas.append(schema) return restored_schemas @@ -1244,7 +1254,7 @@ def _state_to_props(self, state: TPipelineState) -> None: if prop in state["_local"] and not prop.startswith("_"): setattr(self, prop, state["_local"][prop]) # type: ignore if "destination" in state: - self._set_destinations(DestinationReference.from_name(self.destination), DestinationReference.from_name(self.staging) if "staging" in state else None ) + self._set_destinations(self.destination, self.staging if "staging" in state else None ) def _props_to_state(self, state: TPipelineState) -> None: """Write pipeline props to `state`""" @@ -1255,9 +1265,9 @@ def _props_to_state(self, state: TPipelineState) -> None: if not prop.startswith("_"): state["_local"][prop] = getattr(self, prop) # type: ignore if self.destination: - state["destination"] = self.destination.__name__ + state["destination"] = self.destination.name if self.staging: - state["staging"] = self.staging.__name__ + state["staging"] = self.staging.name state["schema_names"] = self._schema_storage.list_schemas() def _save_state(self, state: TPipelineState) -> None: diff --git a/dlt/pipeline/track.py b/dlt/pipeline/track.py index ec42bc788f..07e9a2d137 100644 --- a/dlt/pipeline/track.py +++ b/dlt/pipeline/track.py @@ -9,7 +9,7 @@ from dlt.common.runtime.segment import track as dlthub_telemetry_track from dlt.common.runtime.slack import send_slack_message from dlt.common.pipeline import LoadInfo, ExtractInfo, SupportsPipeline -from dlt.common.destination import DestinationReference +from dlt.common.destination import Destination from dlt.pipeline.typing import TPipelineStep from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace @@ -21,7 +21,7 @@ def _add_sentry_tags(span: Span, pipeline: SupportsPipeline) -> None: span.set_tag("pipeline_name", pipeline.pipeline_name) if pipeline.destination: - span.set_tag("destination", pipeline.destination.__name__) + span.set_tag("destination", pipeline.destination.name) if pipeline.dataset_name: span.set_tag("dataset_name", pipeline.dataset_name) except ImportError: @@ -87,7 +87,7 @@ def on_end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: S props = { "elapsed": (step.finished_at - trace.started_at).total_seconds(), "success": step.step_exception is None, - "destination_name": DestinationReference.to_name(pipeline.destination) if pipeline.destination else None, + "destination_name": pipeline.destination.name if pipeline.destination else None, "pipeline_name_hash": digest128(pipeline.pipeline_name), "dataset_name_hash": digest128(pipeline.dataset_name) if pipeline.dataset_name else None, "default_schema_name_hash": digest128(pipeline.default_schema_name) if pipeline.default_schema_name else None, @@ -107,4 +107,4 @@ def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline) -> None: if pipeline.runtime_config.sentry_dsn: # print(f"---END SENTRY TX: {trace.transaction_id} SCOPE: {Hub.current.scope}") with contextlib.suppress(Exception): - Hub.current.scope.span.__exit__(None, None, None) \ No newline at end of file + Hub.current.scope.span.__exit__(None, None, None) diff --git a/docs/website/docs/dlt-ecosystem/destinations/weaviate.md b/docs/website/docs/dlt-ecosystem/destinations/weaviate.md index 8c626266a4..fe7dafc243 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/weaviate.md +++ b/docs/website/docs/dlt-ecosystem/destinations/weaviate.md @@ -252,7 +252,7 @@ it will be normalized to: so your best course of action is to clean up the data yourself before loading and use default naming convention. Nevertheless you can configure the alternative in `config.toml`: ```toml [schema] -naming="dlt.destinations.weaviate.ci_naming" +naming="dlt.destinations.weaviate.impl.ci_naming" ``` ## Additional destination options diff --git a/docs/website/docs/getting-started-snippets.py b/docs/website/docs/getting-started-snippets.py index c4bd789834..be21a7f757 100644 --- a/docs/website/docs/getting-started-snippets.py +++ b/docs/website/docs/getting-started-snippets.py @@ -290,7 +290,7 @@ def pdf_to_weaviate_snippet() -> None: import os import dlt - from dlt.destinations.weaviate import weaviate_adapter + from dlt.destinations.impl.weaviate import weaviate_adapter from PyPDF2 import PdfReader diff --git a/tests/cli/test_pipeline_command.py b/tests/cli/test_pipeline_command.py index 401517f3c5..19bb5fa277 100644 --- a/tests/cli/test_pipeline_command.py +++ b/tests/cli/test_pipeline_command.py @@ -45,7 +45,7 @@ def test_pipeline_command_operations(repo_dir: str, project_files: FileStorage) pipeline_command.pipeline_command("info", "chess_pipeline", None, 0) _out = buf.getvalue() # do we have duckdb destination - assert "dlt.destinations.duckdb" in _out + assert "destination: duckdb" in _out print(_out) with io.StringIO() as buf, contextlib.redirect_stdout(buf): diff --git a/tests/common/data_writers/test_data_writers.py b/tests/common/data_writers/test_data_writers.py index 66b8f765c7..9d655bc4db 100644 --- a/tests/common/data_writers/test_data_writers.py +++ b/tests/common/data_writers/test_data_writers.py @@ -5,7 +5,7 @@ from dlt.common import pendulum, json from dlt.common.typing import AnyFun # from dlt.destinations.postgres import capabilities -from dlt.destinations.redshift import capabilities as redshift_caps +from dlt.destinations.impl.redshift import capabilities as redshift_caps from dlt.common.data_writers.escape import escape_redshift_identifier, escape_bigquery_identifier, escape_redshift_literal, escape_postgres_literal, escape_duckdb_literal from dlt.common.data_writers.writers import DataWriter, InsertValuesWriter, JsonlWriter, ParquetDataWriter diff --git a/tests/common/test_destination.py b/tests/common/test_destination.py index 7afa10ed68..5483a95f45 100644 --- a/tests/common/test_destination.py +++ b/tests/common/test_destination.py @@ -1,6 +1,7 @@ import pytest -from dlt.common.destination.reference import DestinationClientDwhConfiguration, DestinationReference +from dlt.common.destination.reference import DestinationClientDwhConfiguration, Destination +from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.exceptions import InvalidDestinationReference, UnknownDestinationModule from dlt.common.schema import Schema from dlt.common.schema.exceptions import InvalidDatasetName @@ -11,24 +12,24 @@ def test_import_unknown_destination() -> None: # standard destination with pytest.raises(UnknownDestinationModule): - DestinationReference.from_name("meltdb") + Destination.from_reference("meltdb") # custom module with pytest.raises(UnknownDestinationModule): - DestinationReference.from_name("melt.db") + Destination.from_reference("melt.db") def test_invalid_destination_reference() -> None: with pytest.raises(InvalidDestinationReference): - DestinationReference.from_name("tests.load.cases.fake_destination") + Destination.from_reference("tests.load.cases.fake_destination.not_a_destination") def test_import_all_destinations() -> None: # this must pass without the client dependencies being imported - for module in ACTIVE_DESTINATIONS: - dest = DestinationReference.from_name(module) - assert dest.__name__ == "dlt.destinations." + module + for dest_name in ACTIVE_DESTINATIONS: + dest = Destination.from_reference(dest_name) + assert dest.name == dest_name dest.spec() - dest.capabilities() + assert isinstance(dest.capabilities(), DestinationCapabilitiesContext) def test_normalize_dataset_name() -> None: diff --git a/tests/helpers/dbt_tests/local/test_dbt_utils.py b/tests/helpers/dbt_tests/local/test_dbt_utils.py index 71e570bd69..133ecf1617 100644 --- a/tests/helpers/dbt_tests/local/test_dbt_utils.py +++ b/tests/helpers/dbt_tests/local/test_dbt_utils.py @@ -7,7 +7,7 @@ from dlt.common.storages import FileStorage from dlt.common.utils import uniq_id -from dlt.destinations.postgres.configuration import PostgresCredentials +from dlt.destinations.impl.postgres.configuration import PostgresCredentials from dlt.helpers.dbt.dbt_utils import DBTProcessingError, initialize_dbt_logging, run_dbt_command, is_incremental_schema_out_of_sync_error from tests.utils import test_storage, preserve_environ diff --git a/tests/helpers/dbt_tests/test_runner_dbt_versions.py b/tests/helpers/dbt_tests/test_runner_dbt_versions.py index b418bf15b6..1037908e59 100644 --- a/tests/helpers/dbt_tests/test_runner_dbt_versions.py +++ b/tests/helpers/dbt_tests/test_runner_dbt_versions.py @@ -14,8 +14,8 @@ from dlt.common.runners.synth_pickle import decode_obj, encode_obj from dlt.common.typing import AnyFun -from dlt.destinations.postgres.postgres import PostgresClient -from dlt.destinations.bigquery import BigQueryClientConfiguration +from dlt.destinations.impl.postgres.postgres import PostgresClient +from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration from dlt.helpers.dbt.configuration import DBTRunnerConfiguration from dlt.helpers.dbt.exceptions import PrerequisitesException, DBTProcessingError from dlt.helpers.dbt import package_runner, create_venv, _create_dbt_deps, _default_profile_name, DEFAULT_DBT_VERSION diff --git a/tests/load/bigquery/test_bigquery_client.py b/tests/load/bigquery/test_bigquery_client.py index 145898cde3..abbaf8d414 100644 --- a/tests/load/bigquery/test_bigquery_client.py +++ b/tests/load/bigquery/test_bigquery_client.py @@ -14,7 +14,7 @@ from dlt.common.storages import FileStorage from dlt.common.utils import digest128, uniq_id, custom_environ -from dlt.destinations.bigquery.bigquery import BigQueryClient, BigQueryClientConfiguration +from dlt.destinations.impl.bigquery.bigquery import BigQueryClient, BigQueryClientConfiguration from dlt.destinations.exceptions import LoadJobNotExistsException, LoadJobTerminalException from tests.utils import TEST_STORAGE_ROOT, delete_test_storage, preserve_environ @@ -242,7 +242,7 @@ def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage) @pytest.mark.parametrize('location', ["US", "EU"]) def test_bigquery_location(location: str, file_storage: FileStorage) -> None: - with cm_yield_client_with_storage("bigquery", default_config_values={"location": location}) as client: + with cm_yield_client_with_storage("bigquery", default_config_values={"credentials": {"location": location}}) as client: user_table_name = prepare_table(client) load_json = { "_dlt_id": uniq_id(), diff --git a/tests/load/bigquery/test_bigquery_table_builder.py b/tests/load/bigquery/test_bigquery_table_builder.py index a3222ba020..0d8ab1c8c2 100644 --- a/tests/load/bigquery/test_bigquery_table_builder.py +++ b/tests/load/bigquery/test_bigquery_table_builder.py @@ -8,8 +8,8 @@ from dlt.common.configuration import resolve_configuration from dlt.common.configuration.specs import GcpServiceAccountCredentialsWithoutDefaults -from dlt.destinations.bigquery.bigquery import BigQueryClient -from dlt.destinations.bigquery.configuration import BigQueryClientConfiguration +from dlt.destinations.impl.bigquery.bigquery import BigQueryClient +from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate from tests.load.utils import TABLE_UPDATE diff --git a/tests/load/cases/fake_destination.py b/tests/load/cases/fake_destination.py index 152b2db918..016cc19020 100644 --- a/tests/load/cases/fake_destination.py +++ b/tests/load/cases/fake_destination.py @@ -1 +1,6 @@ -# module that is used to test wrong destination references \ No newline at end of file +# module that is used to test wrong destination references + + +class not_a_destination: + def __init__(self, **kwargs) -> None: + pass diff --git a/tests/load/duckdb/test_duckdb_client.py b/tests/load/duckdb/test_duckdb_client.py index 6c362a6b76..ddfc681a84 100644 --- a/tests/load/duckdb/test_duckdb_client.py +++ b/tests/load/duckdb/test_duckdb_client.py @@ -6,7 +6,8 @@ from dlt.common.configuration.resolve import resolve_configuration from dlt.common.configuration.utils import get_resolved_traces -from dlt.destinations.duckdb.configuration import DUCK_DB_NAME, DuckDbClientConfiguration, DuckDbCredentials, DEFAULT_DUCK_DB_NAME +from dlt.destinations.impl.duckdb.configuration import DUCK_DB_NAME, DuckDbClientConfiguration, DuckDbCredentials, DEFAULT_DUCK_DB_NAME +from dlt.destinations import duckdb from tests.load.pipeline.utils import drop_pipeline, assert_table from tests.utils import patch_home_dir, autouse_test_storage, preserve_environ, TEST_STORAGE_ROOT @@ -46,13 +47,13 @@ def test_duckdb_open_conn_default() -> None: def test_duckdb_database_path() -> None: # resolve without any path provided c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset")) - assert c.credentials.database.lower() == os.path.abspath("quack.duckdb").lower() + assert c.credentials._conn_str().lower() == os.path.abspath("quack.duckdb").lower() # resolve without any path but with pipeline context p = dlt.pipeline(pipeline_name="quack_pipeline") c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset")) # still cwd db_path = os.path.abspath(os.path.join(".", "quack_pipeline.duckdb")) - assert c.credentials.database.lower() == db_path.lower() + assert c.credentials._conn_str().lower() == db_path.lower() # we do not keep default duckdb path in the local state with pytest.raises(KeyError): p.get_local_state_val("duckdb_database") @@ -69,7 +70,7 @@ def test_duckdb_database_path() -> None: # test special :pipeline: path to create in pipeline folder c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset", credentials=":pipeline:")) db_path = os.path.abspath(os.path.join(p.working_dir, DEFAULT_DUCK_DB_NAME)) - assert c.credentials.database.lower() == db_path.lower() + assert c.credentials._conn_str().lower() == db_path.lower() # connect conn = c.credentials.borrow_conn(read_only=False) c.credentials.return_conn(conn) @@ -80,7 +81,7 @@ def test_duckdb_database_path() -> None: # provide relative path db_path = "_storage/test_quack.duckdb" c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset", credentials="duckdb:///_storage/test_quack.duckdb")) - assert c.credentials.database.lower() == os.path.abspath(db_path).lower() + assert c.credentials._conn_str().lower() == os.path.abspath(db_path).lower() conn = c.credentials.borrow_conn(read_only=False) c.credentials.return_conn(conn) assert os.path.isfile(db_path) @@ -90,7 +91,7 @@ def test_duckdb_database_path() -> None: db_path = os.path.abspath("_storage/abs_test_quack.duckdb") c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset", credentials=f"duckdb:///{db_path}")) assert os.path.isabs(c.credentials.database) - assert c.credentials.database.lower() == db_path.lower() + assert c.credentials._conn_str().lower() == db_path.lower() conn = c.credentials.borrow_conn(read_only=False) c.credentials.return_conn(conn) assert os.path.isfile(db_path) @@ -99,7 +100,7 @@ def test_duckdb_database_path() -> None: # set just path as credentials db_path = "_storage/path_test_quack.duckdb" c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset", credentials=db_path)) - assert c.credentials.database.lower() == os.path.abspath(db_path).lower() + assert c.credentials._conn_str().lower() == os.path.abspath(db_path).lower() conn = c.credentials.borrow_conn(read_only=False) c.credentials.return_conn(conn) assert os.path.isfile(db_path) @@ -108,7 +109,7 @@ def test_duckdb_database_path() -> None: db_path = os.path.abspath("_storage/abs_path_test_quack.duckdb") c = resolve_configuration(DuckDbClientConfiguration(dataset_name="test_dataset", credentials=db_path)) assert os.path.isabs(c.credentials.database) - assert c.credentials.database.lower() == db_path.lower() + assert c.credentials._conn_str().lower() == db_path.lower() conn = c.credentials.borrow_conn(read_only=False) c.credentials.return_conn(conn) assert os.path.isfile(db_path) @@ -128,7 +129,7 @@ def test_keeps_initial_db_path() -> None: print(p.pipelines_dir) with p.sql_client() as conn: # still cwd - assert conn.credentials.database.lower() == os.path.abspath(db_path).lower() + assert conn.credentials._conn_str().lower() == os.path.abspath(db_path).lower() # but it is kept in the local state assert p.get_local_state_val("duckdb_database").lower() == os.path.abspath(db_path).lower() @@ -138,7 +139,7 @@ def test_keeps_initial_db_path() -> None: with p.sql_client() as conn: # still cwd assert p.get_local_state_val("duckdb_database").lower() == os.path.abspath(db_path).lower() - assert conn.credentials.database.lower() == os.path.abspath(db_path).lower() + assert conn.credentials._conn_str().lower() == os.path.abspath(db_path).lower() # now create a new pipeline dlt.pipeline(pipeline_name="not_quack", destination="dummy") @@ -147,12 +148,12 @@ def test_keeps_initial_db_path() -> None: assert p.get_local_state_val("duckdb_database").lower() == os.path.abspath(db_path).lower() # new pipeline context took over # TODO: restore pipeline context on each call - assert conn.credentials.database.lower() != os.path.abspath(db_path).lower() + assert conn.credentials._conn_str().lower() != os.path.abspath(db_path).lower() def test_duckdb_database_delete() -> None: db_path = "_storage/path_test_quack.duckdb" - p = dlt.pipeline(pipeline_name="quack_pipeline", credentials=db_path, destination="duckdb") + p = dlt.pipeline(pipeline_name="quack_pipeline", destination=duckdb(credentials=db_path)) p.run([1, 2, 3], table_name="table", dataset_name="dataset") # attach the pipeline p = dlt.attach(pipeline_name="quack_pipeline") diff --git a/tests/load/duckdb/test_duckdb_table_builder.py b/tests/load/duckdb/test_duckdb_table_builder.py index 247d134b06..a5870763fc 100644 --- a/tests/load/duckdb/test_duckdb_table_builder.py +++ b/tests/load/duckdb/test_duckdb_table_builder.py @@ -5,8 +5,8 @@ from dlt.common.utils import uniq_id from dlt.common.schema import Schema -from dlt.destinations.duckdb.duck import DuckDbClient -from dlt.destinations.duckdb.configuration import DuckDbClientConfiguration +from dlt.destinations.impl.duckdb.duck import DuckDbClient +from dlt.destinations.impl.duckdb.configuration import DuckDbClientConfiguration from tests.load.utils import TABLE_UPDATE diff --git a/tests/load/duckdb/test_motherduck_client.py b/tests/load/duckdb/test_motherduck_client.py index 4a167fa016..582847bfa2 100644 --- a/tests/load/duckdb/test_motherduck_client.py +++ b/tests/load/duckdb/test_motherduck_client.py @@ -3,7 +3,7 @@ from dlt.common.configuration.resolve import resolve_configuration -from dlt.destinations.motherduck.configuration import MotherDuckCredentials, MotherDuckClientConfiguration +from dlt.destinations.impl.motherduck.configuration import MotherDuckCredentials, MotherDuckClientConfiguration from tests.utils import patch_home_dir, preserve_environ, skip_if_not_active diff --git a/tests/load/filesystem/test_filesystem_client.py b/tests/load/filesystem/test_filesystem_client.py index f290892e18..0055f37716 100644 --- a/tests/load/filesystem/test_filesystem_client.py +++ b/tests/load/filesystem/test_filesystem_client.py @@ -6,7 +6,7 @@ from dlt.common.utils import digest128, uniq_id from dlt.common.storages import LoadStorage, FileStorage -from dlt.destinations.filesystem.filesystem import LoadFilesystemJob, FilesystemDestinationClientConfiguration +from dlt.destinations.impl.filesystem.filesystem import LoadFilesystemJob, FilesystemDestinationClientConfiguration from tests.load.filesystem.utils import perform_load from tests.utils import clean_test_storage, init_test_logging diff --git a/tests/load/filesystem/utils.py b/tests/load/filesystem/utils.py index eebfa6e87c..8186e82c3b 100644 --- a/tests/load/filesystem/utils.py +++ b/tests/load/filesystem/utils.py @@ -5,16 +5,16 @@ from dlt.load import Load from dlt.common.configuration.container import Container from dlt.common.configuration.specs.config_section_context import ConfigSectionContext -from dlt.common.destination.reference import DestinationReference, LoadJob +from dlt.common.destination.reference import Destination, LoadJob, TDestination from dlt.destinations import filesystem -from dlt.destinations.filesystem.filesystem import FilesystemClient +from dlt.destinations.impl.filesystem.filesystem import FilesystemClient from dlt.destinations.job_impl import EmptyLoadJob from tests.load.utils import prepare_load_package def setup_loader(dataset_name: str) -> Load: - destination: DestinationReference = filesystem # type: ignore[assignment] - config = filesystem.spec()(dataset_name=dataset_name) + destination: TDestination = filesystem() # type: ignore[assignment] + config = filesystem.spec(dataset_name=dataset_name) # setup loader with Container().injectable_context(ConfigSectionContext(sections=('filesystem',))): return Load( diff --git a/tests/load/mssql/test_mssql_credentials.py b/tests/load/mssql/test_mssql_credentials.py index 9b57692bb2..5428246247 100644 --- a/tests/load/mssql/test_mssql_credentials.py +++ b/tests/load/mssql/test_mssql_credentials.py @@ -1,6 +1,6 @@ from dlt.common.configuration import resolve_configuration -from dlt.destinations.mssql.configuration import MsSqlCredentials +from dlt.destinations.impl.mssql.configuration import MsSqlCredentials diff --git a/tests/load/mssql/test_mssql_table_builder.py b/tests/load/mssql/test_mssql_table_builder.py index 4f5a6637d6..114d94a20f 100644 --- a/tests/load/mssql/test_mssql_table_builder.py +++ b/tests/load/mssql/test_mssql_table_builder.py @@ -7,8 +7,8 @@ pytest.importorskip("dlt.destinations.mssql.mssql", reason="MSSQL ODBC driver not installed") -from dlt.destinations.mssql.mssql import MsSqlClient -from dlt.destinations.mssql.configuration import MsSqlClientConfiguration, MsSqlCredentials +from dlt.destinations.impl.mssql.mssql import MsSqlClient +from dlt.destinations.impl.mssql.configuration import MsSqlClientConfiguration, MsSqlCredentials from tests.load.utils import TABLE_UPDATE diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 8e810015f2..dce65bc8d7 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -4,7 +4,7 @@ import dlt, os from dlt.common.utils import uniq_id from dlt.common.storages.load_storage import LoadJobInfo -from dlt.destinations.filesystem.filesystem import FilesystemClient, LoadFilesystemJob +from dlt.destinations.impl.filesystem.filesystem import FilesystemClient, LoadFilesystemJob from dlt.common.schema.typing import LOADS_TABLE_NAME from tests.utils import skip_if_not_active diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index 99071a7ac6..2fc4aad1a8 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -8,7 +8,7 @@ from dlt.common.pipeline import SupportsPipeline from dlt.common import json, sleep -from dlt.common.destination.reference import DestinationReference +from dlt.common.destination import Destination from dlt.common.schema.schema import Schema from dlt.common.schema.typing import VERSION_TABLE_NAME from dlt.common.typing import TDataItem @@ -66,8 +66,8 @@ def data_fun() -> Iterator[Any]: # mock the correct destinations (never do that in normal code) with p.managed_state(): p._set_destinations( - DestinationReference.from_name(destination_config.destination), - DestinationReference.from_name(destination_config.staging) if destination_config.staging else None + Destination.from_reference(destination_config.destination), + Destination.from_reference(destination_config.staging) if destination_config.staging else None ) # does not reset the dataset name assert p.dataset_name in possible_dataset_names diff --git a/tests/load/pipeline/utils.py b/tests/load/pipeline/utils.py index 752571591c..113585f669 100644 --- a/tests/load/pipeline/utils.py +++ b/tests/load/pipeline/utils.py @@ -16,7 +16,7 @@ from tests.load.utils import DestinationTestConfiguration, destinations_configs if TYPE_CHECKING: - from dlt.destinations.filesystem.filesystem import FilesystemClient + from dlt.destinations.impl.filesystem.filesystem import FilesystemClient @pytest.fixture(autouse=True) def drop_pipeline(request) -> Iterator[None]: @@ -67,7 +67,7 @@ def _drop_dataset(schema_name: str) -> None: def _is_filesystem(p: dlt.Pipeline) -> bool: if not p.destination: return False - return p.destination.__name__.rsplit('.', 1)[-1] == 'filesystem' + return p.destination.name == 'filesystem' def assert_table(p: dlt.Pipeline, table_name: str, table_data: List[Any], schema_name: str = None, info: LoadInfo = None) -> None: diff --git a/tests/load/postgres/test_postgres_client.py b/tests/load/postgres/test_postgres_client.py index dcc242cf50..65ac61cfd4 100644 --- a/tests/load/postgres/test_postgres_client.py +++ b/tests/load/postgres/test_postgres_client.py @@ -7,9 +7,9 @@ from dlt.common.storages import FileStorage from dlt.common.utils import uniq_id -from dlt.destinations.postgres.configuration import PostgresCredentials -from dlt.destinations.postgres.postgres import PostgresClient -from dlt.destinations.postgres.sql_client import psycopg2 +from dlt.destinations.impl.postgres.configuration import PostgresCredentials +from dlt.destinations.impl.postgres.postgres import PostgresClient +from dlt.destinations.impl.postgres.sql_client import psycopg2 from tests.utils import TEST_STORAGE_ROOT, delete_test_storage, skipifpypy, preserve_environ from tests.load.utils import expect_load_file, prepare_table, yield_client_with_storage diff --git a/tests/load/postgres/test_postgres_table_builder.py b/tests/load/postgres/test_postgres_table_builder.py index 165c62a468..1d6965c0c0 100644 --- a/tests/load/postgres/test_postgres_table_builder.py +++ b/tests/load/postgres/test_postgres_table_builder.py @@ -5,8 +5,8 @@ from dlt.common.utils import uniq_id from dlt.common.schema import Schema -from dlt.destinations.postgres.postgres import PostgresClient -from dlt.destinations.postgres.configuration import PostgresClientConfiguration, PostgresCredentials +from dlt.destinations.impl.postgres.postgres import PostgresClient +from dlt.destinations.impl.postgres.configuration import PostgresClientConfiguration, PostgresCredentials from tests.load.utils import TABLE_UPDATE diff --git a/tests/load/qdrant/test_pipeline.py b/tests/load/qdrant/test_pipeline.py index 303a5de69f..760eec4631 100644 --- a/tests/load/qdrant/test_pipeline.py +++ b/tests/load/qdrant/test_pipeline.py @@ -5,8 +5,8 @@ from dlt.common import json from dlt.common.utils import uniq_id -from dlt.destinations.qdrant.qdrant_adapter import qdrant_adapter, VECTORIZE_HINT -from dlt.destinations.qdrant.qdrant_client import QdrantClient +from dlt.destinations.impl.qdrant.qdrant_adapter import qdrant_adapter, VECTORIZE_HINT +from dlt.destinations.impl.qdrant.qdrant_client import QdrantClient from tests.pipeline.utils import assert_load_info from tests.load.qdrant.utils import drop_active_pipeline_data, assert_collection diff --git a/tests/load/qdrant/utils.py b/tests/load/qdrant/utils.py index 96b582a28e..1dfacbee7f 100644 --- a/tests/load/qdrant/utils.py +++ b/tests/load/qdrant/utils.py @@ -5,7 +5,7 @@ from dlt.common.pipeline import PipelineContext from dlt.common.configuration.container import Container -from dlt.destinations.qdrant.qdrant_client import QdrantClient +from dlt.destinations.impl.qdrant.qdrant_client import QdrantClient def assert_unordered_list_equal(list1: List[Any], list2: List[Any]) -> None: diff --git a/tests/load/redshift/test_redshift_client.py b/tests/load/redshift/test_redshift_client.py index 9839965b70..7f617024df 100644 --- a/tests/load/redshift/test_redshift_client.py +++ b/tests/load/redshift/test_redshift_client.py @@ -12,8 +12,8 @@ from dlt.common.utils import uniq_id from dlt.destinations.exceptions import DatabaseTerminalException -from dlt.destinations.redshift.configuration import RedshiftCredentials -from dlt.destinations.redshift.redshift import RedshiftClient, psycopg2 +from dlt.destinations.impl.redshift.configuration import RedshiftCredentials +from dlt.destinations.impl.redshift.redshift import RedshiftClient, psycopg2 from tests.common.utils import COMMON_TEST_CASES_PATH from tests.utils import TEST_STORAGE_ROOT, autouse_test_storage, skipifpypy diff --git a/tests/load/redshift/test_redshift_table_builder.py b/tests/load/redshift/test_redshift_table_builder.py index 8c61ccc1f2..2e0feb44e7 100644 --- a/tests/load/redshift/test_redshift_table_builder.py +++ b/tests/load/redshift/test_redshift_table_builder.py @@ -6,8 +6,8 @@ from dlt.common.schema import Schema from dlt.common.configuration import resolve_configuration -from dlt.destinations.redshift.redshift import RedshiftClient -from dlt.destinations.redshift.configuration import RedshiftClientConfiguration, RedshiftCredentials +from dlt.destinations.impl.redshift.redshift import RedshiftClient +from dlt.destinations.impl.redshift.configuration import RedshiftClientConfiguration, RedshiftCredentials from tests.load.utils import TABLE_UPDATE diff --git a/tests/load/snowflake/test_snowflake_configuration.py b/tests/load/snowflake/test_snowflake_configuration.py index 7108ad06e5..abf80a1241 100644 --- a/tests/load/snowflake/test_snowflake_configuration.py +++ b/tests/load/snowflake/test_snowflake_configuration.py @@ -9,7 +9,7 @@ from dlt.common.configuration.exceptions import ConfigurationValueError from dlt.common.utils import digest128 -from dlt.destinations.snowflake.configuration import SnowflakeClientConfiguration, SnowflakeCredentials +from dlt.destinations.impl.snowflake.configuration import SnowflakeClientConfiguration, SnowflakeCredentials from tests.common.configuration.utils import environment diff --git a/tests/load/snowflake/test_snowflake_table_builder.py b/tests/load/snowflake/test_snowflake_table_builder.py index 81164625f9..9ede1c8d13 100644 --- a/tests/load/snowflake/test_snowflake_table_builder.py +++ b/tests/load/snowflake/test_snowflake_table_builder.py @@ -5,8 +5,8 @@ from dlt.common.utils import uniq_id from dlt.common.schema import Schema -from dlt.destinations.snowflake.snowflake import SnowflakeClient -from dlt.destinations.snowflake.configuration import SnowflakeClientConfiguration, SnowflakeCredentials +from dlt.destinations.impl.snowflake.snowflake import SnowflakeClient +from dlt.destinations.impl.snowflake.configuration import SnowflakeClientConfiguration, SnowflakeCredentials from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate from tests.load.utils import TABLE_UPDATE diff --git a/tests/load/test_dummy_client.py b/tests/load/test_dummy_client.py index 1216906967..9edc49a607 100644 --- a/tests/load/test_dummy_client.py +++ b/tests/load/test_dummy_client.py @@ -11,14 +11,14 @@ from dlt.common.storages import FileStorage, LoadStorage from dlt.common.storages.load_storage import JobWithUnsupportedWriterException from dlt.common.utils import uniq_id -from dlt.common.destination.reference import DestinationReference, LoadJob +from dlt.common.destination.reference import Destination, LoadJob, TDestination from dlt.load import Load from dlt.destinations.job_impl import EmptyLoadJob from dlt.destinations import dummy -from dlt.destinations.dummy import dummy as dummy_impl -from dlt.destinations.dummy.configuration import DummyClientConfiguration +from dlt.destinations.impl.dummy import dummy as dummy_impl +from dlt.destinations.impl.dummy.configuration import DummyClientConfiguration from dlt.load.exceptions import LoadClientJobFailed, LoadClientJobRetry from dlt.common.schema.utils import get_top_level_table @@ -445,7 +445,7 @@ def run_all(load: Load) -> None: def setup_loader(delete_completed_jobs: bool = False, client_config: DummyClientConfiguration = None) -> Load: # reset jobs for a test dummy_impl.JOBS = {} - destination: DestinationReference = dummy # type: ignore[assignment] + destination: TDestination = dummy() # type: ignore[assignment] client_config = client_config or DummyClientConfiguration(loader_file_format="jsonl") # patch destination to provide client_config # destination.client = lambda schema: dummy_impl.DummyClient(schema, client_config) diff --git a/tests/load/test_insert_job_client.py b/tests/load/test_insert_job_client.py index 95e63a79f2..86049b035a 100644 --- a/tests/load/test_insert_job_client.py +++ b/tests/load/test_insert_job_client.py @@ -52,7 +52,7 @@ def test_simple_load(client: InsertValuesJobClient, file_storage: FileStorage) - def test_loading_errors(client: InsertValuesJobClient, file_storage: FileStorage) -> None: # test expected dbiapi exceptions for supported destinations import duckdb - from dlt.destinations.postgres.sql_client import psycopg2 + from dlt.destinations.impl.postgres.sql_client import psycopg2 TNotNullViolation = psycopg2.errors.NotNullViolation TNumericValueOutOfRange = psycopg2.errors.NumericValueOutOfRange diff --git a/tests/load/utils.py b/tests/load/utils.py index be2097c879..f591f51585 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -12,8 +12,8 @@ from dlt.common.configuration import resolve_configuration from dlt.common.configuration.container import Container from dlt.common.configuration.specs.config_section_context import ConfigSectionContext -from dlt.common.destination.reference import DestinationClientDwhConfiguration, DestinationReference, JobClientBase, LoadJob, DestinationClientStagingConfiguration, WithStagingDataset, TDestinationReferenceArg -from dlt.common.destination import TLoaderFileFormat +from dlt.common.destination.reference import DestinationClientDwhConfiguration, JobClientBase, LoadJob, DestinationClientStagingConfiguration, WithStagingDataset, TDestinationReferenceArg +from dlt.common.destination import TLoaderFileFormat, Destination from dlt.common.data_writers import DataWriter from dlt.common.schema import TColumnSchema, TTableSchemaColumns, Schema from dlt.common.storages import SchemaStorage, FileStorage, SchemaStorageConfiguration @@ -229,15 +229,15 @@ def yield_client( ) -> Iterator[SqlJobClientBase]: os.environ.pop("DATASET_NAME", None) # import destination reference by name - destination = import_module(f"dlt.destinations.{destination_name}") + destination = Destination.from_reference(destination_name) # create initial config dest_config: DestinationClientDwhConfiguration = None - dest_config = destination.spec()() + dest_config = destination.spec() # type: ignore[assignment] dest_config.dataset_name = dataset_name # type: ignore[misc] # TODO: Why is dataset_name final? if default_config_values is not None: # apply the values to credentials, if dict is provided it will be used as default - dest_config.credentials = default_config_values # type: ignore[assignment] + # dest_config.credentials = default_config_values # type: ignore[assignment] # also apply to config dest_config.update(default_config_values) # get event default schema @@ -261,7 +261,7 @@ def yield_client( # lookup for credentials in the section that is destination name with Container().injectable_context(ConfigSectionContext(sections=("destination", destination_name,))): - with destination.client(schema, dest_config) as client: + with destination.client(schema, dest_config) as client: # type: ignore[assignment] yield client @contextlib.contextmanager diff --git a/tests/load/weaviate/test_naming.py b/tests/load/weaviate/test_naming.py index a965201425..25258a2479 100644 --- a/tests/load/weaviate/test_naming.py +++ b/tests/load/weaviate/test_naming.py @@ -1,7 +1,7 @@ import dlt, pytest -from dlt.destinations.weaviate.naming import NamingConvention -from dlt.destinations.weaviate.ci_naming import NamingConvention as CINamingConvention +from dlt.destinations.impl.weaviate.naming import NamingConvention +from dlt.destinations.impl.weaviate.ci_naming import NamingConvention as CINamingConvention from tests.common.utils import load_yml_case diff --git a/tests/load/weaviate/test_pipeline.py b/tests/load/weaviate/test_pipeline.py index 339c94575e..691281c63e 100644 --- a/tests/load/weaviate/test_pipeline.py +++ b/tests/load/weaviate/test_pipeline.py @@ -6,10 +6,10 @@ from dlt.common.schema import Schema from dlt.common.utils import uniq_id -from dlt.destinations.weaviate import weaviate_adapter -from dlt.destinations.weaviate.exceptions import PropertyNameConflict -from dlt.destinations.weaviate.weaviate_adapter import VECTORIZE_HINT, TOKENIZATION_HINT -from dlt.destinations.weaviate.weaviate_client import WeaviateClient +from dlt.destinations.impl.weaviate import weaviate_adapter +from dlt.destinations.impl.weaviate.exceptions import PropertyNameConflict +from dlt.destinations.impl.weaviate.weaviate_adapter import VECTORIZE_HINT, TOKENIZATION_HINT +from dlt.destinations.impl.weaviate.weaviate_client import WeaviateClient from dlt.pipeline.exceptions import PipelineStepFailed from tests.pipeline.utils import assert_load_info @@ -374,7 +374,7 @@ def test_vectorize_property_without_data() -> None: # set the naming convention to case insensitive # os.environ["SCHEMA__NAMING"] = "direct" - dlt.config["schema.naming"] = "dlt.destinations.weaviate.ci_naming" + dlt.config["schema.naming"] = "dlt.destinations.impl.weaviate.ci_naming" # create new schema with changed naming convention p = p.drop() info = p.run(weaviate_adapter(["there are", "no stop", "words in here"], vectorize="vAlue"), primary_key="vALue", columns={"vAlue": {"data_type": "text"}}) diff --git a/tests/load/weaviate/test_weaviate_client.py b/tests/load/weaviate/test_weaviate_client.py index d102610f68..ca9d853d98 100644 --- a/tests/load/weaviate/test_weaviate_client.py +++ b/tests/load/weaviate/test_weaviate_client.py @@ -9,8 +9,8 @@ from dlt.common.schema.typing import TWriteDisposition, TColumnSchema, TTableSchemaColumns from dlt.destinations import weaviate -from dlt.destinations.weaviate.exceptions import PropertyNameConflict -from dlt.destinations.weaviate.weaviate_client import WeaviateClient +from dlt.destinations.impl.weaviate.exceptions import PropertyNameConflict +from dlt.destinations.impl.weaviate.weaviate_client import WeaviateClient from dlt.common.storages.file_storage import FileStorage from dlt.common.schema.utils import new_table @@ -27,9 +27,10 @@ def drop_weaviate_schema() -> Iterator[None]: def get_client_instance(schema: Schema) -> WeaviateClient: - config = weaviate.spec()(dataset_name="ClientTest" + uniq_id()) - with Container().injectable_context(ConfigSectionContext(sections=('destination', 'weaviate'))): - return weaviate.client(schema, config) # type: ignore[return-value] + dest = weaviate(dataset_name="ClientTest" + uniq_id()) + return dest.client(schema, dest.spec()) + # with Container().injectable_context(ConfigSectionContext(sections=('destination', 'weaviate'))): + # return dest.client(schema, config) @pytest.fixture(scope='function') @@ -44,7 +45,7 @@ def ci_client() -> Iterator[WeaviateClient]: def make_client(naming_convention: str) -> Iterator[WeaviateClient]: schema = Schema('test_schema', { - 'names': f"dlt.destinations.weaviate.{naming_convention}", + 'names': f"dlt.destinations.impl.weaviate.{naming_convention}", 'json': None }) _client = get_client_instance(schema) diff --git a/tests/load/weaviate/utils.py b/tests/load/weaviate/utils.py index d5568b0598..ed378191e6 100644 --- a/tests/load/weaviate/utils.py +++ b/tests/load/weaviate/utils.py @@ -6,8 +6,8 @@ from dlt.common.configuration.container import Container from dlt.common.schema.utils import get_columns_names_with_prop -from dlt.destinations.weaviate.weaviate_client import WeaviateClient -from dlt.destinations.weaviate.weaviate_adapter import VECTORIZE_HINT, TOKENIZATION_HINT +from dlt.destinations.impl.weaviate.weaviate_client import WeaviateClient +from dlt.destinations.impl.weaviate.weaviate_adapter import VECTORIZE_HINT, TOKENIZATION_HINT def assert_unordered_list_equal(list1: List[Any], list2: List[Any]) -> None: diff --git a/tests/normalize/utils.py b/tests/normalize/utils.py index 3ee14948c1..0ce099d4b6 100644 --- a/tests/normalize/utils.py +++ b/tests/normalize/utils.py @@ -1,10 +1,10 @@ from typing import Mapping, cast -from dlt.destinations.duckdb import capabilities as duck_insert_caps -from dlt.destinations.redshift import capabilities as rd_insert_caps -from dlt.destinations.postgres import capabilities as pg_insert_caps -from dlt.destinations.bigquery import capabilities as jsonl_caps -from dlt.destinations.filesystem import capabilities as filesystem_caps +from dlt.destinations.impl.duckdb import capabilities as duck_insert_caps +from dlt.destinations.impl.redshift import capabilities as rd_insert_caps +from dlt.destinations.impl.postgres import capabilities as pg_insert_caps +from dlt.destinations.impl.bigquery import capabilities as jsonl_caps +from dlt.destinations.impl.filesystem import capabilities as filesystem_caps DEFAULT_CAPS = pg_insert_caps diff --git a/tests/pipeline/test_dlt_versions.py b/tests/pipeline/test_dlt_versions.py index 09d8e98d82..2f383c1c0a 100644 --- a/tests/pipeline/test_dlt_versions.py +++ b/tests/pipeline/test_dlt_versions.py @@ -10,8 +10,8 @@ from dlt.common.storages import FileStorage from dlt.common.schema.typing import LOADS_TABLE_NAME, VERSION_TABLE_NAME, TStoredSchema from dlt.common.configuration.resolve import resolve_configuration -from dlt.destinations.duckdb.configuration import DuckDbClientConfiguration -from dlt.destinations.duckdb.sql_client import DuckDbSqlClient +from dlt.destinations.impl.duckdb.configuration import DuckDbClientConfiguration +from dlt.destinations.impl.duckdb.sql_client import DuckDbSqlClient from tests.utils import TEST_STORAGE_ROOT, test_storage diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index c778e47cd6..3fcb38d915 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -21,7 +21,9 @@ from dlt.common.runtime.collector import AliveCollector, EnlightenCollector, LogCollector, TqdmCollector from dlt.common.schema.utils import new_column, new_table from dlt.common.utils import uniq_id +from dlt.common.schema import Schema +from dlt.destinations import filesystem, redshift, dummy from dlt.extract.exceptions import InvalidResourceDataTypeBasic, PipeGenInvalid, SourceExhausted from dlt.extract.extract import ExtractorStorage from dlt.extract.source import DltResource, DltSource @@ -173,7 +175,7 @@ def test_configured_destination(environment) -> None: p = dlt.pipeline() assert p.destination is not None - assert p.destination.__name__.endswith("postgres") + assert p.destination.name.endswith("postgres") assert p.pipeline_name == "postgres_pipe" @@ -228,6 +230,56 @@ def test_destination_explicit_credentials(environment: Any) -> None: assert config.credentials.is_resolved() +def test_destination_staging_config(environment: Any) -> None: + fs_dest = filesystem("file:///testing-bucket") + p = dlt.pipeline( + pipeline_name="staging_pipeline", + destination=redshift(credentials="redshift://loader:loader@localhost:5432/dlt_data"), + staging=fs_dest + ) + schema = Schema("foo") + p._inject_schema(schema) + initial_config = p._get_destination_client_initial_config(p.staging, as_staging=True) + staging_config = fs_dest.configuration(initial_config) # type: ignore[arg-type] + + # Ensure that as_staging flag is set in the final resolved conifg + assert staging_config.as_staging is True + + +def test_destination_factory_defaults_resolve_from_config(environment: Any) -> None: + """Params passed explicitly to destination supersede config values. + Env config values supersede default values. + """ + environment["FAIL_PROB"] = "0.3" + environment["RETRY_PROB"] = "0.8" + p = dlt.pipeline(pipeline_name="dummy_pipeline", destination=dummy(retry_prob=0.5)) + + client = p.destination_client() + + assert client.config.fail_prob == 0.3 # type: ignore[attr-defined] + assert client.config.retry_prob == 0.5 # type: ignore[attr-defined] + + +def test_destination_credentials_in_factory(environment: Any) -> None: + os.environ['DESTINATION__REDSHIFT__CREDENTIALS'] = "redshift://abc:123@localhost:5432/some_db" + + redshift_dest = redshift("redshift://abc:123@localhost:5432/other_db") + + p = dlt.pipeline(pipeline_name="dummy_pipeline", destination=redshift_dest) + + initial_config = p._get_destination_client_initial_config(p.destination) + dest_config = redshift_dest.configuration(initial_config) # type: ignore[arg-type] + # Explicit factory arg supersedes config + assert dest_config.credentials.database == "other_db" + + redshift_dest = redshift() + p = dlt.pipeline(pipeline_name="dummy_pipeline", destination=redshift_dest) + + initial_config = p._get_destination_client_initial_config(p.destination) + dest_config = redshift_dest.configuration(initial_config) # type: ignore[arg-type] + assert dest_config.credentials.database == "some_db" + + @pytest.mark.skip(reason="does not work on CI. probably takes right credentials from somewhere....") def test_destination_explicit_invalid_credentials_filesystem(environment: Any) -> None: # if string cannot be parsed diff --git a/tests/pipeline/test_pipeline_state.py b/tests/pipeline/test_pipeline_state.py index 14b881eedc..0e8dea2145 100644 --- a/tests/pipeline/test_pipeline_state.py +++ b/tests/pipeline/test_pipeline_state.py @@ -48,8 +48,8 @@ def test_restore_state_props() -> None: assert state["destination"].endswith("redshift") assert state["staging"].endswith("filesystem") # also instances are restored - assert p.destination.__name__.endswith("redshift") - assert p.staging.__name__.endswith("filesystem") + assert p.destination.name.endswith("redshift") + assert p.staging.name.endswith("filesystem") def test_managed_state() -> None: diff --git a/tests/tools/clean_redshift.py b/tests/tools/clean_redshift.py index 7444d69685..27680b26cd 100644 --- a/tests/tools/clean_redshift.py +++ b/tests/tools/clean_redshift.py @@ -1,5 +1,5 @@ -from dlt.destinations.postgres.postgres import PostgresClient -from dlt.destinations.postgres.sql_client import psycopg2 +from dlt.destinations.impl.postgres.postgres import PostgresClient +from dlt.destinations.impl.postgres.sql_client import psycopg2 from psycopg2.errors import InsufficientPrivilege, InternalError_, SyntaxError CONNECTION_STRING = ""