Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sqlalchemy staging dataset support and docs #1841

Merged
merged 5 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
TTableSchema,
TLoaderMergeStrategy,
TTableFormat,
TLoaderReplaceStrategy,
)
from dlt.common.wei import EVM_DECIMAL_PRECISION

Expand Down Expand Up @@ -169,7 +170,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):

supported_merge_strategies: Sequence[TLoaderMergeStrategy] = None
merge_strategies_selector: MergeStrategySelector = None
# TODO: also add `supported_replace_strategies` capability
supported_replace_strategies: Sequence[TLoaderReplaceStrategy] = None

max_parallel_load_jobs: Optional[int] = None
"""The destination can set the maximum amount of parallel load jobs being executed"""
Expand Down
8 changes: 6 additions & 2 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@
from dlt.common.metrics import LoadJobMetrics
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.schema import Schema, TSchemaTables
from dlt.common.schema.typing import C_DLT_LOAD_ID, _TTableSchemaBase, TWriteDisposition
from dlt.common.schema.typing import (
C_DLT_LOAD_ID,
_TTableSchemaBase,
TWriteDisposition,
TLoaderReplaceStrategy,
)
from dlt.common.schema.utils import fill_hints_from_parent_and_clone_table
from dlt.common.configuration import configspec, resolve_configuration, known_sections, NotResolved
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
Expand All @@ -48,7 +53,6 @@
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.storages.load_package import LoadJobInfo, TPipelineStateDoc

TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]
TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration")
TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase")
TDestinationDwhClient = TypeVar("TDestinationDwhClient", bound="DestinationClientDwhConfiguration")
Expand Down
1 change: 1 addition & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class NormalizerInfo(TypedDict, total=True):

TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TLoaderMergeStrategy = Literal["delete-insert", "scd2", "upsert"]
TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]


WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition))
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/athena/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.timestamp_precision = 3
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
caps.merge_strategies_selector = athena_merge_strategies_selector
return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/bigquery/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_clone_table = True
caps.schema_supports_numeric_precision = False # no precision information in BigQuery
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/clickhouse/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_truncate_command = True

caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/databricks/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_multiple_statements = False
caps.supports_clone_table = True
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]
return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/dremio/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_multiple_statements = False
caps.timestamp_precision = 3
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/duckdb/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.alter_add_multi_column = False
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/filesystem/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
"reference",
]
caps.has_case_sensitive_identifiers = True
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/lancedb/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:

caps.decimal_precision = (38, 18)
caps.timestamp_precision = 6
caps.supported_replace_strategies = ["truncate-and-insert"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/motherduck/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.max_parallel_load_jobs = 8
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/mssql/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.max_rows_per_insert = 1000
caps.timestamp_precision = 7
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]

return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/postgres/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.is_max_text_data_type_length_in_bytes = True
caps.supports_ddl_transactions = True
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/qdrant/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.max_text_data_type_length = 8 * 1024 * 1024
caps.is_max_text_data_type_length_in_bytes = False
caps.supports_ddl_transactions = False
caps.supported_replace_strategies = ["truncate-and-insert"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/redshift/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_ddl_transactions = True
caps.alter_add_multi_column = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/snowflake/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.alter_add_multi_column = True
caps.supports_clone_table = True
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]
return caps

@property
Expand Down
26 changes: 23 additions & 3 deletions dlt/destinations/impl/sqlalchemy/db_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
Iterator,
Any,
Sequence,
ContextManager,
AnyStr,
Union,
Tuple,
List,
Dict,
Set,
)
from contextlib import contextmanager
from functools import wraps
Expand All @@ -19,6 +19,7 @@
from sqlalchemy.engine import Connection

from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import PreparedTableSchema
from dlt.destinations.exceptions import (
DatabaseUndefinedRelation,
DatabaseTerminalException,
Expand Down Expand Up @@ -122,6 +123,8 @@ def __init__(
self._current_connection: Optional[Connection] = None
self._current_transaction: Optional[SqlaTransactionWrapper] = None
self.metadata = sa.MetaData()
# Keep a list of datasets already attached on the current connection
self._sqlite_attached_datasets: Set[str] = set()

@property
def engine(self) -> sa.engine.Engine:
Expand Down Expand Up @@ -155,6 +158,7 @@ def close_connection(self) -> None:
self._current_connection.close()
self.engine.dispose()
finally:
self._sqlite_attached_datasets.clear()
self._current_connection = None
self._current_transaction = None

Expand Down Expand Up @@ -234,13 +238,17 @@ def _sqlite_create_dataset(self, dataset_name: str) -> None:
"""Mimic multiple schemas in sqlite using ATTACH DATABASE to
attach a new database file to the current connection.
"""
if dataset_name == "main":
# main always exists
return
if self._sqlite_is_memory_db():
new_db_fn = ":memory:"
else:
new_db_fn = self._sqlite_dataset_filename(dataset_name)

statement = "ATTACH DATABASE :fn AS :name"
self.execute_sql(statement, fn=new_db_fn, name=dataset_name)
self._sqlite_attached_datasets.add(dataset_name)

def _sqlite_drop_dataset(self, dataset_name: str) -> None:
"""Drop a dataset in sqlite by detaching the database file
Expand All @@ -252,13 +260,23 @@ def _sqlite_drop_dataset(self, dataset_name: str) -> None:
if dataset_name != "main": # main is the default database, it cannot be detached
statement = "DETACH DATABASE :name"
self.execute_sql(statement, name=dataset_name)
self._sqlite_attached_datasets.discard(dataset_name)

fn = dbs[dataset_name]
if not fn: # It's a memory database, nothing to do
return
# Delete the database file
Path(fn).unlink()

@contextmanager
def with_alternative_dataset_name(
self, dataset_name: str
) -> Iterator[SqlClientBase[Connection]]:
with super().with_alternative_dataset_name(dataset_name):
if self.dialect_name == "sqlite" and dataset_name not in self._sqlite_attached_datasets:
self._sqlite_reattach_dataset_if_exists(dataset_name)
yield self

def create_dataset(self) -> None:
if self.dialect_name == "sqlite":
return self._sqlite_create_dataset(self.dataset_name)
Expand Down Expand Up @@ -332,8 +350,10 @@ def make_qualified_table_name(self, table_name: str, escape: bool = True) -> str

def fully_qualified_dataset_name(self, escape: bool = True, staging: bool = False) -> str:
if staging:
raise NotImplementedError("Staging not supported")
return self.dialect.identifier_preparer.format_schema(self.dataset_name) # type: ignore[attr-defined, no-any-return]
dataset_name = self.staging_dataset_name
else:
dataset_name = self.dataset_name
return self.dialect.identifier_preparer.format_schema(dataset_name) # type: ignore[attr-defined, no-any-return]

def alter_table_add_columns(self, columns: Sequence[sa.Column]) -> None:
if not columns:
Expand Down
6 changes: 5 additions & 1 deletion dlt/destinations/impl/sqlalchemy/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
if t.TYPE_CHECKING:
# from dlt.destinations.impl.sqlalchemy.sqlalchemy_client import SqlalchemyJobClient
from dlt.destinations.impl.sqlalchemy.sqlalchemy_job_client import SqlalchemyJobClient
from sqlalchemy.engine import Engine


class sqlalchemy(Destination[SqlalchemyClientConfiguration, "SqlalchemyJobClient"]):
Expand All @@ -45,7 +46,10 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_ddl_transactions = True
caps.max_query_parameters = 20_0000
caps.max_rows_per_insert = 10_000 # Set a default to avoid OOM on large datasets
# Multiple concatenated statements are not supported by all engines, so leave them off by default
caps.supports_multiple_statements = False
caps.type_mapper = SqlalchemyTypeMapper
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down Expand Up @@ -74,7 +78,7 @@ def client_class(self) -> t.Type["SqlalchemyJobClient"]:

def __init__(
self,
credentials: t.Union[SqlalchemyCredentials, t.Dict[str, t.Any], str] = None,
credentials: t.Union[SqlalchemyCredentials, t.Dict[str, t.Any], str, "Engine"] = None,
destination_name: t.Optional[str] = None,
environment: t.Optional[str] = None,
engine_args: t.Optional[t.Dict[str, t.Any]] = None,
Expand Down
Loading
Loading