Skip to content

Commit

Permalink
Sqlalchemy staging dataset support and docs (#1841)
Browse files Browse the repository at this point in the history
* Sqlalchemy support replace with staging dataset

* Remember sqlite attached datasets

* Merge fallback to append

* Sqlalchemy destination docs

* Docs snippet language

* Add supported_replace_strategies capapability

* Typo
  • Loading branch information
steinitzu authored Sep 20, 2024
1 parent acf1e36 commit 04e2f50
Show file tree
Hide file tree
Showing 28 changed files with 434 additions and 112 deletions.
3 changes: 2 additions & 1 deletion dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
TTableSchema,
TLoaderMergeStrategy,
TTableFormat,
TLoaderReplaceStrategy,
)
from dlt.common.wei import EVM_DECIMAL_PRECISION

Expand Down Expand Up @@ -169,7 +170,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):

supported_merge_strategies: Sequence[TLoaderMergeStrategy] = None
merge_strategies_selector: MergeStrategySelector = None
# TODO: also add `supported_replace_strategies` capability
supported_replace_strategies: Sequence[TLoaderReplaceStrategy] = None

max_parallel_load_jobs: Optional[int] = None
"""The destination can set the maximum amount of parallel load jobs being executed"""
Expand Down
8 changes: 6 additions & 2 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@
from dlt.common.metrics import LoadJobMetrics
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.schema import Schema, TSchemaTables
from dlt.common.schema.typing import C_DLT_LOAD_ID, _TTableSchemaBase, TWriteDisposition
from dlt.common.schema.typing import (
C_DLT_LOAD_ID,
_TTableSchemaBase,
TWriteDisposition,
TLoaderReplaceStrategy,
)
from dlt.common.schema.utils import fill_hints_from_parent_and_clone_table
from dlt.common.configuration import configspec, resolve_configuration, known_sections, NotResolved
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
Expand All @@ -48,7 +53,6 @@
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.storages.load_package import LoadJobInfo, TPipelineStateDoc

TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]
TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration")
TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase")
TDestinationDwhClient = TypeVar("TDestinationDwhClient", bound="DestinationClientDwhConfiguration")
Expand Down
1 change: 1 addition & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class NormalizerInfo(TypedDict, total=True):

TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TLoaderMergeStrategy = Literal["delete-insert", "scd2", "upsert"]
TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]


WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition))
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/athena/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.timestamp_precision = 3
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
caps.merge_strategies_selector = athena_merge_strategies_selector
return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/bigquery/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_clone_table = True
caps.schema_supports_numeric_precision = False # no precision information in BigQuery
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/clickhouse/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_truncate_command = True

caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/databricks/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_multiple_statements = False
caps.supports_clone_table = True
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]
return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/dremio/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_multiple_statements = False
caps.timestamp_precision = 3
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/duckdb/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.alter_add_multi_column = False
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/filesystem/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
"reference",
]
caps.has_case_sensitive_identifiers = True
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/lancedb/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:

caps.decimal_precision = (38, 18)
caps.timestamp_precision = 6
caps.supported_replace_strategies = ["truncate-and-insert"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/motherduck/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.max_parallel_load_jobs = 8
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/mssql/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.max_rows_per_insert = 1000
caps.timestamp_precision = 7
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]

return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/postgres/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.is_max_text_data_type_length_in_bytes = True
caps.supports_ddl_transactions = True
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/qdrant/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.max_text_data_type_length = 8 * 1024 * 1024
caps.is_max_text_data_type_length_in_bytes = False
caps.supports_ddl_transactions = False
caps.supported_replace_strategies = ["truncate-and-insert"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/redshift/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_ddl_transactions = True
caps.alter_add_multi_column = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/snowflake/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.alter_add_multi_column = True
caps.supports_clone_table = True
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]
return caps

@property
Expand Down
26 changes: 23 additions & 3 deletions dlt/destinations/impl/sqlalchemy/db_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
Iterator,
Any,
Sequence,
ContextManager,
AnyStr,
Union,
Tuple,
List,
Dict,
Set,
)
from contextlib import contextmanager
from functools import wraps
Expand All @@ -19,6 +19,7 @@
from sqlalchemy.engine import Connection

from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import PreparedTableSchema
from dlt.destinations.exceptions import (
DatabaseUndefinedRelation,
DatabaseTerminalException,
Expand Down Expand Up @@ -122,6 +123,8 @@ def __init__(
self._current_connection: Optional[Connection] = None
self._current_transaction: Optional[SqlaTransactionWrapper] = None
self.metadata = sa.MetaData()
# Keep a list of datasets already attached on the current connection
self._sqlite_attached_datasets: Set[str] = set()

@property
def engine(self) -> sa.engine.Engine:
Expand Down Expand Up @@ -155,6 +158,7 @@ def close_connection(self) -> None:
self._current_connection.close()
self.engine.dispose()
finally:
self._sqlite_attached_datasets.clear()
self._current_connection = None
self._current_transaction = None

Expand Down Expand Up @@ -234,13 +238,17 @@ def _sqlite_create_dataset(self, dataset_name: str) -> None:
"""Mimic multiple schemas in sqlite using ATTACH DATABASE to
attach a new database file to the current connection.
"""
if dataset_name == "main":
# main always exists
return
if self._sqlite_is_memory_db():
new_db_fn = ":memory:"
else:
new_db_fn = self._sqlite_dataset_filename(dataset_name)

statement = "ATTACH DATABASE :fn AS :name"
self.execute_sql(statement, fn=new_db_fn, name=dataset_name)
self._sqlite_attached_datasets.add(dataset_name)

def _sqlite_drop_dataset(self, dataset_name: str) -> None:
"""Drop a dataset in sqlite by detaching the database file
Expand All @@ -252,13 +260,23 @@ def _sqlite_drop_dataset(self, dataset_name: str) -> None:
if dataset_name != "main": # main is the default database, it cannot be detached
statement = "DETACH DATABASE :name"
self.execute_sql(statement, name=dataset_name)
self._sqlite_attached_datasets.discard(dataset_name)

fn = dbs[dataset_name]
if not fn: # It's a memory database, nothing to do
return
# Delete the database file
Path(fn).unlink()

@contextmanager
def with_alternative_dataset_name(
self, dataset_name: str
) -> Iterator[SqlClientBase[Connection]]:
with super().with_alternative_dataset_name(dataset_name):
if self.dialect_name == "sqlite" and dataset_name not in self._sqlite_attached_datasets:
self._sqlite_reattach_dataset_if_exists(dataset_name)
yield self

def create_dataset(self) -> None:
if self.dialect_name == "sqlite":
return self._sqlite_create_dataset(self.dataset_name)
Expand Down Expand Up @@ -332,8 +350,10 @@ def make_qualified_table_name(self, table_name: str, escape: bool = True) -> str

def fully_qualified_dataset_name(self, escape: bool = True, staging: bool = False) -> str:
if staging:
raise NotImplementedError("Staging not supported")
return self.dialect.identifier_preparer.format_schema(self.dataset_name) # type: ignore[attr-defined, no-any-return]
dataset_name = self.staging_dataset_name
else:
dataset_name = self.dataset_name
return self.dialect.identifier_preparer.format_schema(dataset_name) # type: ignore[attr-defined, no-any-return]

def alter_table_add_columns(self, columns: Sequence[sa.Column]) -> None:
if not columns:
Expand Down
6 changes: 5 additions & 1 deletion dlt/destinations/impl/sqlalchemy/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
if t.TYPE_CHECKING:
# from dlt.destinations.impl.sqlalchemy.sqlalchemy_client import SqlalchemyJobClient
from dlt.destinations.impl.sqlalchemy.sqlalchemy_job_client import SqlalchemyJobClient
from sqlalchemy.engine import Engine


class sqlalchemy(Destination[SqlalchemyClientConfiguration, "SqlalchemyJobClient"]):
Expand All @@ -45,7 +46,10 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_ddl_transactions = True
caps.max_query_parameters = 20_0000
caps.max_rows_per_insert = 10_000 # Set a default to avoid OOM on large datasets
# Multiple concatenated statements are not supported by all engines, so leave them off by default
caps.supports_multiple_statements = False
caps.type_mapper = SqlalchemyTypeMapper
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down Expand Up @@ -74,7 +78,7 @@ def client_class(self) -> t.Type["SqlalchemyJobClient"]:

def __init__(
self,
credentials: t.Union[SqlalchemyCredentials, t.Dict[str, t.Any], str] = None,
credentials: t.Union[SqlalchemyCredentials, t.Dict[str, t.Any], str, "Engine"] = None,
destination_name: t.Optional[str] = None,
environment: t.Optional[str] = None,
engine_args: t.Optional[t.Dict[str, t.Any]] = None,
Expand Down
Loading

0 comments on commit 04e2f50

Please sign in to comment.