diff --git a/dlt/common/destination/capabilities.py b/dlt/common/destination/capabilities.py index 8f0dce79ce..9ef8fad96e 100644 --- a/dlt/common/destination/capabilities.py +++ b/dlt/common/destination/capabilities.py @@ -33,6 +33,7 @@ TTableSchema, TLoaderMergeStrategy, TTableFormat, + TLoaderReplaceStrategy, ) from dlt.common.wei import EVM_DECIMAL_PRECISION @@ -169,7 +170,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext): supported_merge_strategies: Sequence[TLoaderMergeStrategy] = None merge_strategies_selector: MergeStrategySelector = None - # TODO: also add `supported_replace_strategies` capability + supported_replace_strategies: Sequence[TLoaderReplaceStrategy] = None max_parallel_load_jobs: Optional[int] = None """The destination can set the maximum amount of parallel load jobs being executed""" diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 9e27b66335..f0d950f0e9 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -31,7 +31,12 @@ from dlt.common.metrics import LoadJobMetrics from dlt.common.normalizers.naming import NamingConvention from dlt.common.schema import Schema, TSchemaTables -from dlt.common.schema.typing import C_DLT_LOAD_ID, _TTableSchemaBase, TWriteDisposition +from dlt.common.schema.typing import ( + C_DLT_LOAD_ID, + _TTableSchemaBase, + TWriteDisposition, + TLoaderReplaceStrategy, +) from dlt.common.schema.utils import fill_hints_from_parent_and_clone_table from dlt.common.configuration import configspec, resolve_configuration, known_sections, NotResolved from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration @@ -48,7 +53,6 @@ from dlt.common.storages.load_storage import ParsedLoadJobFileName from dlt.common.storages.load_package import LoadJobInfo, TPipelineStateDoc -TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"] TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration") TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase") TDestinationDwhClient = TypeVar("TDestinationDwhClient", bound="DestinationClientDwhConfiguration") diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index 9221cca7ff..2247358331 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -218,6 +218,7 @@ class NormalizerInfo(TypedDict, total=True): TWriteDisposition = Literal["skip", "append", "replace", "merge"] TLoaderMergeStrategy = Literal["delete-insert", "scd2", "upsert"] +TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"] WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition)) diff --git a/dlt/destinations/impl/athena/factory.py b/dlt/destinations/impl/athena/factory.py index 5a7ae1ba8c..1749c135cc 100644 --- a/dlt/destinations/impl/athena/factory.py +++ b/dlt/destinations/impl/athena/factory.py @@ -138,6 +138,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.timestamp_precision = 3 caps.supports_truncate_command = False caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"] + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] caps.merge_strategies_selector = athena_merge_strategies_selector return caps diff --git a/dlt/destinations/impl/bigquery/factory.py b/dlt/destinations/impl/bigquery/factory.py index 7f4fd74825..32a6eb6f82 100644 --- a/dlt/destinations/impl/bigquery/factory.py +++ b/dlt/destinations/impl/bigquery/factory.py @@ -118,6 +118,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_clone_table = True caps.schema_supports_numeric_precision = False # no precision information in BigQuery caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"] + caps.supported_replace_strategies = [ + "truncate-and-insert", + "insert-from-staging", + "staging-optimized", + ] return caps diff --git a/dlt/destinations/impl/clickhouse/factory.py b/dlt/destinations/impl/clickhouse/factory.py index 696c2783ca..7a9e16464f 100644 --- a/dlt/destinations/impl/clickhouse/factory.py +++ b/dlt/destinations/impl/clickhouse/factory.py @@ -139,6 +139,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_truncate_command = True caps.supported_merge_strategies = ["delete-insert", "scd2"] + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps diff --git a/dlt/destinations/impl/databricks/factory.py b/dlt/destinations/impl/databricks/factory.py index b02f191423..a73a575901 100644 --- a/dlt/destinations/impl/databricks/factory.py +++ b/dlt/destinations/impl/databricks/factory.py @@ -134,6 +134,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_multiple_statements = False caps.supports_clone_table = True caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"] + caps.supported_replace_strategies = [ + "truncate-and-insert", + "insert-from-staging", + "staging-optimized", + ] return caps @property diff --git a/dlt/destinations/impl/dremio/factory.py b/dlt/destinations/impl/dremio/factory.py index 29ec6257e6..997ab419c1 100644 --- a/dlt/destinations/impl/dremio/factory.py +++ b/dlt/destinations/impl/dremio/factory.py @@ -109,6 +109,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_multiple_statements = False caps.timestamp_precision = 3 caps.supported_merge_strategies = ["delete-insert", "scd2"] + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps @property diff --git a/dlt/destinations/impl/duckdb/factory.py b/dlt/destinations/impl/duckdb/factory.py index 6c2011c549..e3d261d9d6 100644 --- a/dlt/destinations/impl/duckdb/factory.py +++ b/dlt/destinations/impl/duckdb/factory.py @@ -148,6 +148,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.alter_add_multi_column = False caps.supports_truncate_command = False caps.supported_merge_strategies = ["delete-insert", "scd2"] + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps diff --git a/dlt/destinations/impl/filesystem/factory.py b/dlt/destinations/impl/filesystem/factory.py index c5218f14a3..2463da58fa 100644 --- a/dlt/destinations/impl/filesystem/factory.py +++ b/dlt/destinations/impl/filesystem/factory.py @@ -51,6 +51,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: "reference", ] caps.has_case_sensitive_identifiers = True + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps @property diff --git a/dlt/destinations/impl/lancedb/factory.py b/dlt/destinations/impl/lancedb/factory.py index 339453133f..8ce2217007 100644 --- a/dlt/destinations/impl/lancedb/factory.py +++ b/dlt/destinations/impl/lancedb/factory.py @@ -40,6 +40,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.decimal_precision = (38, 18) caps.timestamp_precision = 6 + caps.supported_replace_strategies = ["truncate-and-insert"] return caps diff --git a/dlt/destinations/impl/motherduck/factory.py b/dlt/destinations/impl/motherduck/factory.py index ac5dc70b57..fec1049584 100644 --- a/dlt/destinations/impl/motherduck/factory.py +++ b/dlt/destinations/impl/motherduck/factory.py @@ -40,6 +40,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_truncate_command = False caps.supported_merge_strategies = ["delete-insert", "scd2"] caps.max_parallel_load_jobs = 8 + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps diff --git a/dlt/destinations/impl/mssql/factory.py b/dlt/destinations/impl/mssql/factory.py index 2fd668bdb6..1dbac8e8f5 100644 --- a/dlt/destinations/impl/mssql/factory.py +++ b/dlt/destinations/impl/mssql/factory.py @@ -109,6 +109,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.max_rows_per_insert = 1000 caps.timestamp_precision = 7 caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"] + caps.supported_replace_strategies = [ + "truncate-and-insert", + "insert-from-staging", + "staging-optimized", + ] return caps diff --git a/dlt/destinations/impl/postgres/factory.py b/dlt/destinations/impl/postgres/factory.py index 1a33d44577..bde0e35f3d 100644 --- a/dlt/destinations/impl/postgres/factory.py +++ b/dlt/destinations/impl/postgres/factory.py @@ -142,6 +142,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.is_max_text_data_type_length_in_bytes = True caps.supports_ddl_transactions = True caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"] + caps.supported_replace_strategies = [ + "truncate-and-insert", + "insert-from-staging", + "staging-optimized", + ] return caps diff --git a/dlt/destinations/impl/qdrant/factory.py b/dlt/destinations/impl/qdrant/factory.py index f994948d91..49c4511c8d 100644 --- a/dlt/destinations/impl/qdrant/factory.py +++ b/dlt/destinations/impl/qdrant/factory.py @@ -25,6 +25,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.max_text_data_type_length = 8 * 1024 * 1024 caps.is_max_text_data_type_length_in_bytes = False caps.supports_ddl_transactions = False + caps.supported_replace_strategies = ["truncate-and-insert"] return caps diff --git a/dlt/destinations/impl/redshift/factory.py b/dlt/destinations/impl/redshift/factory.py index 20b7df859f..cab30f8e33 100644 --- a/dlt/destinations/impl/redshift/factory.py +++ b/dlt/destinations/impl/redshift/factory.py @@ -135,6 +135,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_ddl_transactions = True caps.alter_add_multi_column = False caps.supported_merge_strategies = ["delete-insert", "scd2"] + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps diff --git a/dlt/destinations/impl/snowflake/factory.py b/dlt/destinations/impl/snowflake/factory.py index 6c2369a5aa..3ed4b39276 100644 --- a/dlt/destinations/impl/snowflake/factory.py +++ b/dlt/destinations/impl/snowflake/factory.py @@ -121,6 +121,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.alter_add_multi_column = True caps.supports_clone_table = True caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"] + caps.supported_replace_strategies = [ + "truncate-and-insert", + "insert-from-staging", + "staging-optimized", + ] return caps @property diff --git a/dlt/destinations/impl/sqlalchemy/db_api_client.py b/dlt/destinations/impl/sqlalchemy/db_api_client.py index c6c8ba53d6..829fe8db82 100644 --- a/dlt/destinations/impl/sqlalchemy/db_api_client.py +++ b/dlt/destinations/impl/sqlalchemy/db_api_client.py @@ -3,12 +3,12 @@ Iterator, Any, Sequence, - ContextManager, AnyStr, Union, Tuple, List, Dict, + Set, ) from contextlib import contextmanager from functools import wraps @@ -19,6 +19,7 @@ from sqlalchemy.engine import Connection from dlt.common.destination import DestinationCapabilitiesContext +from dlt.common.destination.reference import PreparedTableSchema from dlt.destinations.exceptions import ( DatabaseUndefinedRelation, DatabaseTerminalException, @@ -122,6 +123,8 @@ def __init__( self._current_connection: Optional[Connection] = None self._current_transaction: Optional[SqlaTransactionWrapper] = None self.metadata = sa.MetaData() + # Keep a list of datasets already attached on the current connection + self._sqlite_attached_datasets: Set[str] = set() @property def engine(self) -> sa.engine.Engine: @@ -155,6 +158,7 @@ def close_connection(self) -> None: self._current_connection.close() self.engine.dispose() finally: + self._sqlite_attached_datasets.clear() self._current_connection = None self._current_transaction = None @@ -234,6 +238,9 @@ def _sqlite_create_dataset(self, dataset_name: str) -> None: """Mimic multiple schemas in sqlite using ATTACH DATABASE to attach a new database file to the current connection. """ + if dataset_name == "main": + # main always exists + return if self._sqlite_is_memory_db(): new_db_fn = ":memory:" else: @@ -241,6 +248,7 @@ def _sqlite_create_dataset(self, dataset_name: str) -> None: statement = "ATTACH DATABASE :fn AS :name" self.execute_sql(statement, fn=new_db_fn, name=dataset_name) + self._sqlite_attached_datasets.add(dataset_name) def _sqlite_drop_dataset(self, dataset_name: str) -> None: """Drop a dataset in sqlite by detaching the database file @@ -252,6 +260,7 @@ def _sqlite_drop_dataset(self, dataset_name: str) -> None: if dataset_name != "main": # main is the default database, it cannot be detached statement = "DETACH DATABASE :name" self.execute_sql(statement, name=dataset_name) + self._sqlite_attached_datasets.discard(dataset_name) fn = dbs[dataset_name] if not fn: # It's a memory database, nothing to do @@ -259,6 +268,15 @@ def _sqlite_drop_dataset(self, dataset_name: str) -> None: # Delete the database file Path(fn).unlink() + @contextmanager + def with_alternative_dataset_name( + self, dataset_name: str + ) -> Iterator[SqlClientBase[Connection]]: + with super().with_alternative_dataset_name(dataset_name): + if self.dialect_name == "sqlite" and dataset_name not in self._sqlite_attached_datasets: + self._sqlite_reattach_dataset_if_exists(dataset_name) + yield self + def create_dataset(self) -> None: if self.dialect_name == "sqlite": return self._sqlite_create_dataset(self.dataset_name) @@ -332,8 +350,10 @@ def make_qualified_table_name(self, table_name: str, escape: bool = True) -> str def fully_qualified_dataset_name(self, escape: bool = True, staging: bool = False) -> str: if staging: - raise NotImplementedError("Staging not supported") - return self.dialect.identifier_preparer.format_schema(self.dataset_name) # type: ignore[attr-defined, no-any-return] + dataset_name = self.staging_dataset_name + else: + dataset_name = self.dataset_name + return self.dialect.identifier_preparer.format_schema(dataset_name) # type: ignore[attr-defined, no-any-return] def alter_table_add_columns(self, columns: Sequence[sa.Column]) -> None: if not columns: diff --git a/dlt/destinations/impl/sqlalchemy/factory.py b/dlt/destinations/impl/sqlalchemy/factory.py index 10372cda34..360dd89192 100644 --- a/dlt/destinations/impl/sqlalchemy/factory.py +++ b/dlt/destinations/impl/sqlalchemy/factory.py @@ -21,6 +21,7 @@ if t.TYPE_CHECKING: # from dlt.destinations.impl.sqlalchemy.sqlalchemy_client import SqlalchemyJobClient from dlt.destinations.impl.sqlalchemy.sqlalchemy_job_client import SqlalchemyJobClient + from sqlalchemy.engine import Engine class sqlalchemy(Destination[SqlalchemyClientConfiguration, "SqlalchemyJobClient"]): @@ -45,7 +46,10 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_ddl_transactions = True caps.max_query_parameters = 20_0000 caps.max_rows_per_insert = 10_000 # Set a default to avoid OOM on large datasets + # Multiple concatenated statements are not supported by all engines, so leave them off by default + caps.supports_multiple_statements = False caps.type_mapper = SqlalchemyTypeMapper + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps @@ -74,7 +78,7 @@ def client_class(self) -> t.Type["SqlalchemyJobClient"]: def __init__( self, - credentials: t.Union[SqlalchemyCredentials, t.Dict[str, t.Any], str] = None, + credentials: t.Union[SqlalchemyCredentials, t.Dict[str, t.Any], str, "Engine"] = None, destination_name: t.Optional[str] = None, environment: t.Optional[str] = None, engine_args: t.Optional[t.Dict[str, t.Any]] = None, diff --git a/dlt/destinations/impl/sqlalchemy/load_jobs.py b/dlt/destinations/impl/sqlalchemy/load_jobs.py new file mode 100644 index 0000000000..c8486dc0f0 --- /dev/null +++ b/dlt/destinations/impl/sqlalchemy/load_jobs.py @@ -0,0 +1,136 @@ +from typing import IO, Any, Dict, Iterator, List, Sequence, TYPE_CHECKING, Optional +import math + +import sqlalchemy as sa + +from dlt.common.destination.reference import ( + RunnableLoadJob, + HasFollowupJobs, + PreparedTableSchema, +) +from dlt.common.storages import FileStorage +from dlt.common.json import json, PY_DATETIME_DECODERS +from dlt.destinations.sql_jobs import SqlFollowupJob, SqlJobParams + +from dlt.destinations.impl.sqlalchemy.db_api_client import SqlalchemyClient + +if TYPE_CHECKING: + from dlt.destinations.impl.sqlalchemy.sqlalchemy_job_client import SqlalchemyJobClient + + +class SqlalchemyJsonLInsertJob(RunnableLoadJob, HasFollowupJobs): + def __init__(self, file_path: str, table: sa.Table) -> None: + super().__init__(file_path) + self._job_client: "SqlalchemyJobClient" = None + self.table = table + + def _open_load_file(self) -> IO[bytes]: + return FileStorage.open_zipsafe_ro(self._file_path, "rb") + + def _iter_data_items(self) -> Iterator[Dict[str, Any]]: + all_cols = {col.name: None for col in self.table.columns} + with FileStorage.open_zipsafe_ro(self._file_path, "rb") as f: + for line in f: + # Decode date/time to py datetime objects. Some drivers have issues with pendulum objects + for item in json.typed_loadb(line, decoders=PY_DATETIME_DECODERS): + # Fill any missing columns in item with None. Bulk insert fails when items have different keys + if item.keys() != all_cols.keys(): + yield {**all_cols, **item} + else: + yield item + + def _iter_data_item_chunks(self) -> Iterator[Sequence[Dict[str, Any]]]: + max_rows = self._job_client.capabilities.max_rows_per_insert or math.inf + # Limit by max query length should not be needed, + # bulk insert generates an INSERT template with a single VALUES tuple of placeholders + # If any dialects don't do that we need to check the str length of the query + # TODO: Max params may not be needed. Limits only apply to placeholders in sql string (mysql/sqlite) + max_params = self._job_client.capabilities.max_query_parameters or math.inf + chunk: List[Dict[str, Any]] = [] + params_count = 0 + for item in self._iter_data_items(): + if len(chunk) + 1 == max_rows or params_count + len(item) > max_params: + # Rotate chunk + yield chunk + chunk = [] + params_count = 0 + params_count += len(item) + chunk.append(item) + + if chunk: + yield chunk + + def run(self) -> None: + _sql_client = self._job_client.sql_client + # Copy the table to the current dataset (i.e. staging) if needed + # This is a no-op if the table is already in the correct schema + table = self.table.to_metadata( + self.table.metadata, schema=_sql_client.dataset_name # type: ignore[attr-defined] + ) + + with _sql_client.begin_transaction(): + for chunk in self._iter_data_item_chunks(): + _sql_client.execute_sql(table.insert(), chunk) + + +class SqlalchemyParquetInsertJob(SqlalchemyJsonLInsertJob): + def _iter_data_item_chunks(self) -> Iterator[Sequence[Dict[str, Any]]]: + from dlt.common.libs.pyarrow import ParquetFile + + num_cols = len(self.table.columns) + max_rows = self._job_client.capabilities.max_rows_per_insert or None + max_params = self._job_client.capabilities.max_query_parameters or None + read_limit = None + + with ParquetFile(self._file_path) as reader: + if max_params is not None: + read_limit = math.floor(max_params / num_cols) + + if max_rows is not None: + if read_limit is None: + read_limit = max_rows + else: + read_limit = min(read_limit, max_rows) + + if read_limit is None: + yield reader.read().to_pylist() + return + + for chunk in reader.iter_batches(batch_size=read_limit): + yield chunk.to_pylist() + + +class SqlalchemyStagingCopyJob(SqlFollowupJob): + @classmethod + def generate_sql( + cls, + table_chain: Sequence[PreparedTableSchema], + sql_client: SqlalchemyClient, # type: ignore[override] + params: Optional[SqlJobParams] = None, + ) -> List[str]: + statements: List[str] = [] + for table in table_chain: + # Tables must have already been created in metadata + table_obj = sql_client.get_existing_table(table["name"]) + staging_table_obj = table_obj.to_metadata( + sql_client.metadata, schema=sql_client.staging_dataset_name + ) + if params["replace"]: + stmt = str(table_obj.delete().compile(dialect=sql_client.dialect)) + if not stmt.endswith(";"): + stmt += ";" + statements.append(stmt) + + stmt = str( + table_obj.insert() + .from_select( + [col.name for col in staging_table_obj.columns], staging_table_obj.select() + ) + .compile(dialect=sql_client.dialect) + ) + if not stmt.endswith(";"): + stmt += ";" + + statements.append(stmt) + + return statements diff --git a/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py b/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py index c51d3cbe3a..a2514a43e0 100644 --- a/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py +++ b/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py @@ -1,112 +1,35 @@ -from typing import Iterable, Optional, Dict, Any, Iterator, Sequence, List, Tuple, IO +from typing import Iterable, Optional, Sequence, List, Tuple from contextlib import suppress -import math import sqlalchemy as sa +from dlt.common.json import json from dlt.common import logger from dlt.common import pendulum from dlt.common.destination.reference import ( JobClientBase, LoadJob, - RunnableLoadJob, StorageSchemaInfo, StateInfo, PreparedTableSchema, + FollowupJobRequest, ) -from dlt.destinations.job_client_impl import SqlJobClientBase +from dlt.destinations.job_client_impl import SqlJobClientWithStagingDataset, SqlLoadJob from dlt.common.destination.capabilities import DestinationCapabilitiesContext from dlt.common.schema import Schema, TTableSchema, TColumnSchema, TSchemaTables from dlt.common.schema.typing import TColumnType, TTableSchemaColumns from dlt.common.schema.utils import pipeline_state_table, normalize_table_identifiers -from dlt.common.storages import FileStorage -from dlt.common.json import json, PY_DATETIME_DECODERS from dlt.destinations.exceptions import DatabaseUndefinedRelation - - -# from dlt.destinations.impl.sqlalchemy.sql_client import SqlalchemyClient from dlt.destinations.impl.sqlalchemy.db_api_client import SqlalchemyClient from dlt.destinations.impl.sqlalchemy.configuration import SqlalchemyClientConfiguration +from dlt.destinations.impl.sqlalchemy.load_jobs import ( + SqlalchemyJsonLInsertJob, + SqlalchemyParquetInsertJob, + SqlalchemyStagingCopyJob, +) -class SqlalchemyJsonLInsertJob(RunnableLoadJob): - def __init__(self, file_path: str, table: sa.Table) -> None: - super().__init__(file_path) - self._job_client: "SqlalchemyJobClient" = None - self.table = table - - def _open_load_file(self) -> IO[bytes]: - return FileStorage.open_zipsafe_ro(self._file_path, "rb") - - def _iter_data_items(self) -> Iterator[Dict[str, Any]]: - all_cols = {col.name: None for col in self.table.columns} - with FileStorage.open_zipsafe_ro(self._file_path, "rb") as f: - for line in f: - # Decode date/time to py datetime objects. Some drivers have issues with pendulum objects - for item in json.typed_loadb(line, decoders=PY_DATETIME_DECODERS): - # Fill any missing columns in item with None. Bulk insert fails when items have different keys - if item.keys() != all_cols.keys(): - yield {**all_cols, **item} - else: - yield item - - def _iter_data_item_chunks(self) -> Iterator[Sequence[Dict[str, Any]]]: - max_rows = self._job_client.capabilities.max_rows_per_insert or math.inf - # Limit by max query length should not be needed, - # bulk insert generates an INSERT template with a single VALUES tuple of placeholders - # If any dialects don't do that we need to check the str length of the query - # TODO: Max params may not be needed. Limits only apply to placeholders in sql string (mysql/sqlite) - max_params = self._job_client.capabilities.max_query_parameters or math.inf - chunk: List[Dict[str, Any]] = [] - params_count = 0 - for item in self._iter_data_items(): - if len(chunk) + 1 == max_rows or params_count + len(item) > max_params: - # Rotate chunk - yield chunk - chunk = [] - params_count = 0 - params_count += len(item) - chunk.append(item) - - if chunk: - yield chunk - - def run(self) -> None: - _sql_client = self._job_client.sql_client - - with _sql_client.begin_transaction(): - for chunk in self._iter_data_item_chunks(): - _sql_client.execute_sql(self.table.insert(), chunk) - - -class SqlalchemyParquetInsertJob(SqlalchemyJsonLInsertJob): - def _iter_data_item_chunks(self) -> Iterator[Sequence[Dict[str, Any]]]: - from dlt.common.libs.pyarrow import ParquetFile - - num_cols = len(self.table.columns) - max_rows = self._job_client.capabilities.max_rows_per_insert or None - max_params = self._job_client.capabilities.max_query_parameters or None - read_limit = None - - with ParquetFile(self._file_path) as reader: - if max_params is not None: - read_limit = math.floor(max_params / num_cols) - - if max_rows is not None: - if read_limit is None: - read_limit = max_rows - else: - read_limit = min(read_limit, max_rows) - - if read_limit is None: - yield reader.read().to_pylist() - return - - for chunk in reader.iter_batches(batch_size=read_limit): - yield chunk.to_pylist() - - -class SqlalchemyJobClient(SqlJobClientBase): +class SqlalchemyJobClient(SqlJobClientWithStagingDataset): sql_client: SqlalchemyClient # type: ignore[assignment] def __init__( @@ -117,7 +40,7 @@ def __init__( ) -> None: self.sql_client = SqlalchemyClient( config.normalize_dataset_name(schema), - None, + config.normalize_staging_dataset_name(schema), config.credentials, capabilities, engine_args=config.engine_args, @@ -157,9 +80,37 @@ def _to_column_object( unique=schema_column.get("unique", False), ) + def _create_replace_followup_jobs( + self, table_chain: Sequence[PreparedTableSchema] + ) -> List[FollowupJobRequest]: + if self.config.replace_strategy in ["insert-from-staging", "staging-optimized"]: + # Make sure all tables are generated in metadata before creating the job + for table in table_chain: + self._to_table_object(table) + return [ + SqlalchemyStagingCopyJob.from_table_chain( + table_chain, self.sql_client, {"replace": True} + ) + ] + return [] + + def _create_merge_followup_jobs( + self, table_chain: Sequence[PreparedTableSchema] + ) -> List[FollowupJobRequest]: + for table in table_chain: + self._to_table_object(table) + return [ + SqlalchemyStagingCopyJob.from_table_chain( + table_chain, self.sql_client, {"replace": False} + ) + ] + def create_load_job( self, table: PreparedTableSchema, file_path: str, load_id: str, restore: bool = False ) -> LoadJob: + job = super().create_load_job(table, file_path, load_id, restore) + if job is not None: + return job if file_path.endswith(".typed-jsonl"): table_obj = self._to_table_object(table) return SqlalchemyJsonLInsertJob(file_path, table_obj) diff --git a/dlt/destinations/impl/synapse/factory.py b/dlt/destinations/impl/synapse/factory.py index f035f2f713..14ce622f8b 100644 --- a/dlt/destinations/impl/synapse/factory.py +++ b/dlt/destinations/impl/synapse/factory.py @@ -100,6 +100,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.timestamp_precision = 7 caps.supported_merge_strategies = ["delete-insert", "scd2"] + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps diff --git a/dlt/destinations/impl/weaviate/factory.py b/dlt/destinations/impl/weaviate/factory.py index a5c1e9f2a1..7cb71d4944 100644 --- a/dlt/destinations/impl/weaviate/factory.py +++ b/dlt/destinations/impl/weaviate/factory.py @@ -61,6 +61,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.is_max_text_data_type_length_in_bytes = False caps.supports_ddl_transactions = False caps.naming_convention = "dlt.destinations.impl.weaviate.naming" + caps.supported_replace_strategies = ["truncate-and-insert"] return caps diff --git a/docs/website/docs/dlt-ecosystem/destinations/sqlalchemy.md b/docs/website/docs/dlt-ecosystem/destinations/sqlalchemy.md new file mode 100644 index 0000000000..e5e82831da --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/destinations/sqlalchemy.md @@ -0,0 +1,158 @@ +--- +title: SQL databases (powered by SQLAlchemy) +description: SQLAlchemy destination +keywords: [sql, sqlalchemy, database, destination] +--- + +# SQLAlchemy destination + +The SQLAlchemy destination allows you to use any database which has an [SQLAlchemy dialect](https://docs.sqlalchemy.org/en/20/dialects/) implemented as a destination. + +Currently mysql and SQLite are considered to have full support and are tested as part of the `dlt` CI suite. Other dialects are not tested but should generally work. + +## Install dlt with SQLAlchemy + +Install dlt with the `sqlalchemy` extra dependency: + +```sh +pip install "dlt[sqlalchemy]" +``` + +Note that database drivers are not included and need to be installed separately for +the database you plan on using. For example for MySQL: + +```sh +pip install mysqlclient +``` + +Refer to the [SQLAlchemy documentation on dialects](https://docs.sqlalchemy.org/en/20/dialects/) for info about client libraries required for supported databases. + +### Create a pipeline + +**1. Initialize a project with a pipeline that loads to MS SQL by running:** +```sh +dlt init chess sqlalchemy +``` + +**2. Install the necessary dependencies for SQLAlchemy by running:** +```sh +pip install -r requirements.txt +``` +or run: +```sh +pip install "dlt[sqlalchemy]" +``` + +**3. Install your database client library.** + +E.g. for MySQL: +```sh +pip install mysqlclient +``` + +**4. Enter your credentials into `.dlt/secrets.toml`.** + +For example, replace with your database connection info: +```toml +[destination.sqlalchemy.credentials] +database = "dlt_data" +username = "loader" +password = "" +host = "localhost" +port = 3306 +driver_name = "mysql" +``` + +Alternatively a valid SQLAlchemy database URL can be used, either in `secrets.toml` or as an environment variable. +E.g. + +```toml +[destination.sqlalchemy] +credentials = "mysql://loader:@localhost:3306/dlt_data" +``` + +or + +```sh +export DESTINATION__SQLALCHEMY__CREDENTIALS="mysql://loader:@localhost:3306/dlt_data" +``` + +An SQLAlchemy `Engine` can also be passed directly by creating an instance of the destination: + +```py +import sqlalchemy as sa +import dlt + +engine = sa.create_engine('sqlite:///chess_data.db') + +pipeline = dlt.pipeline( + pipeline_name='chess', + destination=dlt.destinations.sqlalchemy(engine), + dataset_name='main' +) +``` + +## Notes on SQLite + +### Dataset files +When using an SQLite database file each dataset is stored in a separate file since SQLite does not support multiple schemas in a single database file. +Under the hood this uses [`ATTACH DATABASE`](https://www.sqlite.org/lang_attach.html). + +The file is stored in the same directory as the main database file (provided by your database URL) + +E.g. if your SQLite URL is `sqlite:////home/me/data/chess_data.db` and you `dataset_name` is `games`, the data +is stored in `/home/me/data/chess_data__games.db` + +**Note**: if dataset name is `main` no additional file is created as this is the default SQLite database. + +### In-memory databases +In-memory databases require a persistent connection as the database is destroyed when the connection is closed. +Normally connections are opened and closed for each load job and in other stages during the pipeline run. +To make sure the database persists throughout the pipeline run you need to pass in an SQLAlchemy `Engine` object instead of credentials. +This engine is not disposed of automatically by `dlt`. Example: + +```py +import dlt +import sqlalchemy as sa + +# Create the sqlite engine +engine = sa.create_engine('sqlite:///:memory:') + +# Configure the destination instance and create pipeline +pipeline = dlt.pipeline('my_pipeline', destination=dlt.destinations.sqlalchemy(engine), dataset_name='main') + +# Run the pipeline with some data +pipeline.run([1,2,3], table_name='my_table') + +# engine is still open and you can query the database +with engine.connect() as conn: + result = conn.execute(sa.text('SELECT * FROM my_table')) + print(result.fetchall()) +``` + +## Write dispositions + +The following write dispositions are supported: + +- `append` +- `replace` with `truncate-and-insert` and `insert-from-staging` replace strategies. `staging-optimized` falls back to `insert-from-staging`. + +`merge` disposition is not supported and falls back to `append`. + +## Data loading +Data is loaded in a dialect agnostic with an `insert` statement generated with SQLAlchemy's core API. +Rows are inserted in batches as long as the underlying database driver supports it. By default the batch size 10,000 rows. + +## Syncing of `dlt` state +This destination fully supports [dlt state sync](../../general-usage/state#syncing-state-with-destination). + +### Data types +All `dlt` data types are supported, but how they are stored in the database depends on the SQLAlchemy dialect. +For example SQLite does not have a `DATETIME`/`TIMESTAMP` type so `timestamp` columns are stored as `TEXT` in ISO 8601 format. + +## Supported file formats +* [typed-jsonl](../file-formats/jsonl.md) is used by default. JSON encoded data with typing information included. +* [parquet](../file-formats/parquet.md) is supported + +## Supported column hints +* `unique` hints are translated to `UNIQUE` constraints via SQLAlchemy (granted the database supports it) diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index 659bca6cb9..d064456c0d 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -48,7 +48,7 @@ destinations_configs, DestinationTestConfiguration, ) -from tests.load.pipeline.utils import REPLACE_STRATEGIES +from tests.load.pipeline.utils import REPLACE_STRATEGIES, skip_if_unsupported_replace_strategy # mark all tests as essential, do not remove pytestmark = pytest.mark.essential @@ -641,11 +641,7 @@ def test_dataset_name_change(destination_config: DestinationTestConfiguration) - def test_pipeline_upfront_tables_two_loads( destination_config: DestinationTestConfiguration, replace_strategy: str ) -> None: - if not destination_config.supports_merge and replace_strategy != "truncate-and-insert": - pytest.skip( - f"Destination {destination_config.name} does not support merge and thus" - f" {replace_strategy}" - ) + skip_if_unsupported_replace_strategy(destination_config, replace_strategy) # use staging tables for replace os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy diff --git a/tests/load/pipeline/test_replace_disposition.py b/tests/load/pipeline/test_replace_disposition.py index 82cef83019..569bb8ce33 100644 --- a/tests/load/pipeline/test_replace_disposition.py +++ b/tests/load/pipeline/test_replace_disposition.py @@ -9,7 +9,7 @@ destinations_configs, DestinationTestConfiguration, ) -from tests.load.pipeline.utils import REPLACE_STRATEGIES +from tests.load.pipeline.utils import REPLACE_STRATEGIES, skip_if_unsupported_replace_strategy @pytest.mark.essential @@ -24,11 +24,7 @@ def test_replace_disposition( destination_config: DestinationTestConfiguration, replace_strategy: str ) -> None: - if not destination_config.supports_merge and replace_strategy != "truncate-and-insert": - pytest.skip( - f"Destination {destination_config.name} does not support merge and thus" - f" {replace_strategy}" - ) + skip_if_unsupported_replace_strategy(destination_config, replace_strategy) # only allow 40 items per file os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "40" @@ -242,11 +238,7 @@ def load_items_none(): def test_replace_table_clearing( destination_config: DestinationTestConfiguration, replace_strategy: str ) -> None: - if not destination_config.supports_merge and replace_strategy != "truncate-and-insert": - pytest.skip( - f"Destination {destination_config.name} does not support merge and thus" - f" {replace_strategy}" - ) + skip_if_unsupported_replace_strategy(destination_config, replace_strategy) # use staging tables for replace os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy diff --git a/tests/load/pipeline/utils.py b/tests/load/pipeline/utils.py index 679c2d6da9..1a1324e59a 100644 --- a/tests/load/pipeline/utils.py +++ b/tests/load/pipeline/utils.py @@ -1 +1,19 @@ +import pytest + +from tests.load.utils import DestinationTestConfiguration + REPLACE_STRATEGIES = ["truncate-and-insert", "insert-from-staging", "staging-optimized"] + + +def skip_if_unsupported_replace_strategy( + destination_config: DestinationTestConfiguration, replace_strategy: str +): + """Skip test if destination does not support the given replace strategy.""" + supported_replace_strategies = ( + destination_config.raw_capabilities().supported_replace_strategies + ) + if not supported_replace_strategies or replace_strategy not in supported_replace_strategies: + pytest.skip( + f"Destination {destination_config.name} does not support the replace strategy" + f" {replace_strategy}" + ) diff --git a/tests/load/utils.py b/tests/load/utils.py index f443748f8e..19601f2cf1 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -35,6 +35,7 @@ DestinationClientStagingConfiguration, TDestinationReferenceArg, WithStagingDataset, + DestinationCapabilitiesContext, ) from dlt.common.destination import TLoaderFileFormat, Destination from dlt.common.destination.reference import DEFAULT_FILE_LAYOUT @@ -171,6 +172,10 @@ def destination_factory(self, **kwargs) -> Destination[Any, Any]: self.setup() return Destination.from_reference(dest_type, destination_name=dest_name, **kwargs) + def raw_capabilities(self) -> DestinationCapabilitiesContext: + dest = Destination.from_reference(self.destination_type) + return dest._raw_capabilities() + @property def name(self) -> str: name: str = self.destination_name or self.destination_type