From 9d6af275dc399f3315308943eefc79acdb0f750b Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Thu, 19 Sep 2024 15:12:15 -0400 Subject: [PATCH] Add supported_replace_strategies capapability --- dlt/common/destination/capabilities.py | 3 ++- dlt/common/destination/reference.py | 8 ++++++-- dlt/common/schema/typing.py | 1 + dlt/destinations/impl/athena/factory.py | 1 + dlt/destinations/impl/bigquery/factory.py | 5 +++++ dlt/destinations/impl/clickhouse/factory.py | 1 + dlt/destinations/impl/databricks/factory.py | 5 +++++ dlt/destinations/impl/dremio/factory.py | 1 + dlt/destinations/impl/duckdb/factory.py | 1 + dlt/destinations/impl/filesystem/factory.py | 1 + dlt/destinations/impl/lancedb/factory.py | 1 + dlt/destinations/impl/motherduck/factory.py | 1 + dlt/destinations/impl/mssql/factory.py | 5 +++++ dlt/destinations/impl/postgres/factory.py | 5 +++++ dlt/destinations/impl/qdrant/factory.py | 1 + dlt/destinations/impl/redshift/factory.py | 1 + dlt/destinations/impl/snowflake/factory.py | 5 +++++ dlt/destinations/impl/sqlalchemy/factory.py | 1 + dlt/destinations/impl/synapse/factory.py | 1 + dlt/destinations/impl/weaviate/factory.py | 1 + tests/load/pipeline/test_pipelines.py | 8 ++------ .../load/pipeline/test_replace_disposition.py | 14 +++----------- tests/load/pipeline/utils.py | 18 ++++++++++++++++++ tests/load/utils.py | 5 +++++ 24 files changed, 74 insertions(+), 20 deletions(-) diff --git a/dlt/common/destination/capabilities.py b/dlt/common/destination/capabilities.py index 8f0dce79ce..9ef8fad96e 100644 --- a/dlt/common/destination/capabilities.py +++ b/dlt/common/destination/capabilities.py @@ -33,6 +33,7 @@ TTableSchema, TLoaderMergeStrategy, TTableFormat, + TLoaderReplaceStrategy, ) from dlt.common.wei import EVM_DECIMAL_PRECISION @@ -169,7 +170,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext): supported_merge_strategies: Sequence[TLoaderMergeStrategy] = None merge_strategies_selector: MergeStrategySelector = None - # TODO: also add `supported_replace_strategies` capability + supported_replace_strategies: Sequence[TLoaderReplaceStrategy] = None max_parallel_load_jobs: Optional[int] = None """The destination can set the maximum amount of parallel load jobs being executed""" diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 9e27b66335..f0d950f0e9 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -31,7 +31,12 @@ from dlt.common.metrics import LoadJobMetrics from dlt.common.normalizers.naming import NamingConvention from dlt.common.schema import Schema, TSchemaTables -from dlt.common.schema.typing import C_DLT_LOAD_ID, _TTableSchemaBase, TWriteDisposition +from dlt.common.schema.typing import ( + C_DLT_LOAD_ID, + _TTableSchemaBase, + TWriteDisposition, + TLoaderReplaceStrategy, +) from dlt.common.schema.utils import fill_hints_from_parent_and_clone_table from dlt.common.configuration import configspec, resolve_configuration, known_sections, NotResolved from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration @@ -48,7 +53,6 @@ from dlt.common.storages.load_storage import ParsedLoadJobFileName from dlt.common.storages.load_package import LoadJobInfo, TPipelineStateDoc -TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"] TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration") TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase") TDestinationDwhClient = TypeVar("TDestinationDwhClient", bound="DestinationClientDwhConfiguration") diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index 9221cca7ff..2247358331 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -218,6 +218,7 @@ class NormalizerInfo(TypedDict, total=True): TWriteDisposition = Literal["skip", "append", "replace", "merge"] TLoaderMergeStrategy = Literal["delete-insert", "scd2", "upsert"] +TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"] WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition)) diff --git a/dlt/destinations/impl/athena/factory.py b/dlt/destinations/impl/athena/factory.py index 5a7ae1ba8c..1749c135cc 100644 --- a/dlt/destinations/impl/athena/factory.py +++ b/dlt/destinations/impl/athena/factory.py @@ -138,6 +138,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.timestamp_precision = 3 caps.supports_truncate_command = False caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"] + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] caps.merge_strategies_selector = athena_merge_strategies_selector return caps diff --git a/dlt/destinations/impl/bigquery/factory.py b/dlt/destinations/impl/bigquery/factory.py index 7f4fd74825..32a6eb6f82 100644 --- a/dlt/destinations/impl/bigquery/factory.py +++ b/dlt/destinations/impl/bigquery/factory.py @@ -118,6 +118,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_clone_table = True caps.schema_supports_numeric_precision = False # no precision information in BigQuery caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"] + caps.supported_replace_strategies = [ + "truncate-and-insert", + "insert-from-staging", + "staging-optimized", + ] return caps diff --git a/dlt/destinations/impl/clickhouse/factory.py b/dlt/destinations/impl/clickhouse/factory.py index 696c2783ca..7a9e16464f 100644 --- a/dlt/destinations/impl/clickhouse/factory.py +++ b/dlt/destinations/impl/clickhouse/factory.py @@ -139,6 +139,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_truncate_command = True caps.supported_merge_strategies = ["delete-insert", "scd2"] + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps diff --git a/dlt/destinations/impl/databricks/factory.py b/dlt/destinations/impl/databricks/factory.py index b02f191423..a73a575901 100644 --- a/dlt/destinations/impl/databricks/factory.py +++ b/dlt/destinations/impl/databricks/factory.py @@ -134,6 +134,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_multiple_statements = False caps.supports_clone_table = True caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"] + caps.supported_replace_strategies = [ + "truncate-and-insert", + "insert-from-staging", + "staging-optimized", + ] return caps @property diff --git a/dlt/destinations/impl/dremio/factory.py b/dlt/destinations/impl/dremio/factory.py index 29ec6257e6..14ddac852e 100644 --- a/dlt/destinations/impl/dremio/factory.py +++ b/dlt/destinations/impl/dremio/factory.py @@ -109,6 +109,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_multiple_statements = False caps.timestamp_precision = 3 caps.supported_merge_strategies = ["delete-insert", "scd2"] + caps.supoorted_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps @property diff --git a/dlt/destinations/impl/duckdb/factory.py b/dlt/destinations/impl/duckdb/factory.py index 6c2011c549..e3d261d9d6 100644 --- a/dlt/destinations/impl/duckdb/factory.py +++ b/dlt/destinations/impl/duckdb/factory.py @@ -148,6 +148,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.alter_add_multi_column = False caps.supports_truncate_command = False caps.supported_merge_strategies = ["delete-insert", "scd2"] + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps diff --git a/dlt/destinations/impl/filesystem/factory.py b/dlt/destinations/impl/filesystem/factory.py index c5218f14a3..2463da58fa 100644 --- a/dlt/destinations/impl/filesystem/factory.py +++ b/dlt/destinations/impl/filesystem/factory.py @@ -51,6 +51,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: "reference", ] caps.has_case_sensitive_identifiers = True + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps @property diff --git a/dlt/destinations/impl/lancedb/factory.py b/dlt/destinations/impl/lancedb/factory.py index 339453133f..8ce2217007 100644 --- a/dlt/destinations/impl/lancedb/factory.py +++ b/dlt/destinations/impl/lancedb/factory.py @@ -40,6 +40,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.decimal_precision = (38, 18) caps.timestamp_precision = 6 + caps.supported_replace_strategies = ["truncate-and-insert"] return caps diff --git a/dlt/destinations/impl/motherduck/factory.py b/dlt/destinations/impl/motherduck/factory.py index ac5dc70b57..fec1049584 100644 --- a/dlt/destinations/impl/motherduck/factory.py +++ b/dlt/destinations/impl/motherduck/factory.py @@ -40,6 +40,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_truncate_command = False caps.supported_merge_strategies = ["delete-insert", "scd2"] caps.max_parallel_load_jobs = 8 + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps diff --git a/dlt/destinations/impl/mssql/factory.py b/dlt/destinations/impl/mssql/factory.py index 2fd668bdb6..1dbac8e8f5 100644 --- a/dlt/destinations/impl/mssql/factory.py +++ b/dlt/destinations/impl/mssql/factory.py @@ -109,6 +109,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.max_rows_per_insert = 1000 caps.timestamp_precision = 7 caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"] + caps.supported_replace_strategies = [ + "truncate-and-insert", + "insert-from-staging", + "staging-optimized", + ] return caps diff --git a/dlt/destinations/impl/postgres/factory.py b/dlt/destinations/impl/postgres/factory.py index 1a33d44577..bde0e35f3d 100644 --- a/dlt/destinations/impl/postgres/factory.py +++ b/dlt/destinations/impl/postgres/factory.py @@ -142,6 +142,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.is_max_text_data_type_length_in_bytes = True caps.supports_ddl_transactions = True caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"] + caps.supported_replace_strategies = [ + "truncate-and-insert", + "insert-from-staging", + "staging-optimized", + ] return caps diff --git a/dlt/destinations/impl/qdrant/factory.py b/dlt/destinations/impl/qdrant/factory.py index f994948d91..49c4511c8d 100644 --- a/dlt/destinations/impl/qdrant/factory.py +++ b/dlt/destinations/impl/qdrant/factory.py @@ -25,6 +25,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.max_text_data_type_length = 8 * 1024 * 1024 caps.is_max_text_data_type_length_in_bytes = False caps.supports_ddl_transactions = False + caps.supported_replace_strategies = ["truncate-and-insert"] return caps diff --git a/dlt/destinations/impl/redshift/factory.py b/dlt/destinations/impl/redshift/factory.py index 20b7df859f..cab30f8e33 100644 --- a/dlt/destinations/impl/redshift/factory.py +++ b/dlt/destinations/impl/redshift/factory.py @@ -135,6 +135,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_ddl_transactions = True caps.alter_add_multi_column = False caps.supported_merge_strategies = ["delete-insert", "scd2"] + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps diff --git a/dlt/destinations/impl/snowflake/factory.py b/dlt/destinations/impl/snowflake/factory.py index 6c2369a5aa..3ed4b39276 100644 --- a/dlt/destinations/impl/snowflake/factory.py +++ b/dlt/destinations/impl/snowflake/factory.py @@ -121,6 +121,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.alter_add_multi_column = True caps.supports_clone_table = True caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"] + caps.supported_replace_strategies = [ + "truncate-and-insert", + "insert-from-staging", + "staging-optimized", + ] return caps @property diff --git a/dlt/destinations/impl/sqlalchemy/factory.py b/dlt/destinations/impl/sqlalchemy/factory.py index 74214f8b75..360dd89192 100644 --- a/dlt/destinations/impl/sqlalchemy/factory.py +++ b/dlt/destinations/impl/sqlalchemy/factory.py @@ -49,6 +49,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: # Multiple concatenated statements are not supported by all engines, so leave them off by default caps.supports_multiple_statements = False caps.type_mapper = SqlalchemyTypeMapper + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps diff --git a/dlt/destinations/impl/synapse/factory.py b/dlt/destinations/impl/synapse/factory.py index f035f2f713..14ce622f8b 100644 --- a/dlt/destinations/impl/synapse/factory.py +++ b/dlt/destinations/impl/synapse/factory.py @@ -100,6 +100,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.timestamp_precision = 7 caps.supported_merge_strategies = ["delete-insert", "scd2"] + caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"] return caps diff --git a/dlt/destinations/impl/weaviate/factory.py b/dlt/destinations/impl/weaviate/factory.py index a5c1e9f2a1..7cb71d4944 100644 --- a/dlt/destinations/impl/weaviate/factory.py +++ b/dlt/destinations/impl/weaviate/factory.py @@ -61,6 +61,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.is_max_text_data_type_length_in_bytes = False caps.supports_ddl_transactions = False caps.naming_convention = "dlt.destinations.impl.weaviate.naming" + caps.supported_replace_strategies = ["truncate-and-insert"] return caps diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index 659bca6cb9..d064456c0d 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -48,7 +48,7 @@ destinations_configs, DestinationTestConfiguration, ) -from tests.load.pipeline.utils import REPLACE_STRATEGIES +from tests.load.pipeline.utils import REPLACE_STRATEGIES, skip_if_unsupported_replace_strategy # mark all tests as essential, do not remove pytestmark = pytest.mark.essential @@ -641,11 +641,7 @@ def test_dataset_name_change(destination_config: DestinationTestConfiguration) - def test_pipeline_upfront_tables_two_loads( destination_config: DestinationTestConfiguration, replace_strategy: str ) -> None: - if not destination_config.supports_merge and replace_strategy != "truncate-and-insert": - pytest.skip( - f"Destination {destination_config.name} does not support merge and thus" - f" {replace_strategy}" - ) + skip_if_unsupported_replace_strategy(destination_config, replace_strategy) # use staging tables for replace os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy diff --git a/tests/load/pipeline/test_replace_disposition.py b/tests/load/pipeline/test_replace_disposition.py index 82cef83019..569bb8ce33 100644 --- a/tests/load/pipeline/test_replace_disposition.py +++ b/tests/load/pipeline/test_replace_disposition.py @@ -9,7 +9,7 @@ destinations_configs, DestinationTestConfiguration, ) -from tests.load.pipeline.utils import REPLACE_STRATEGIES +from tests.load.pipeline.utils import REPLACE_STRATEGIES, skip_if_unsupported_replace_strategy @pytest.mark.essential @@ -24,11 +24,7 @@ def test_replace_disposition( destination_config: DestinationTestConfiguration, replace_strategy: str ) -> None: - if not destination_config.supports_merge and replace_strategy != "truncate-and-insert": - pytest.skip( - f"Destination {destination_config.name} does not support merge and thus" - f" {replace_strategy}" - ) + skip_if_unsupported_replace_strategy(destination_config, replace_strategy) # only allow 40 items per file os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "40" @@ -242,11 +238,7 @@ def load_items_none(): def test_replace_table_clearing( destination_config: DestinationTestConfiguration, replace_strategy: str ) -> None: - if not destination_config.supports_merge and replace_strategy != "truncate-and-insert": - pytest.skip( - f"Destination {destination_config.name} does not support merge and thus" - f" {replace_strategy}" - ) + skip_if_unsupported_replace_strategy(destination_config, replace_strategy) # use staging tables for replace os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy diff --git a/tests/load/pipeline/utils.py b/tests/load/pipeline/utils.py index 679c2d6da9..1a1324e59a 100644 --- a/tests/load/pipeline/utils.py +++ b/tests/load/pipeline/utils.py @@ -1 +1,19 @@ +import pytest + +from tests.load.utils import DestinationTestConfiguration + REPLACE_STRATEGIES = ["truncate-and-insert", "insert-from-staging", "staging-optimized"] + + +def skip_if_unsupported_replace_strategy( + destination_config: DestinationTestConfiguration, replace_strategy: str +): + """Skip test if destination does not support the given replace strategy.""" + supported_replace_strategies = ( + destination_config.raw_capabilities().supported_replace_strategies + ) + if not supported_replace_strategies or replace_strategy not in supported_replace_strategies: + pytest.skip( + f"Destination {destination_config.name} does not support the replace strategy" + f" {replace_strategy}" + ) diff --git a/tests/load/utils.py b/tests/load/utils.py index f443748f8e..19601f2cf1 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -35,6 +35,7 @@ DestinationClientStagingConfiguration, TDestinationReferenceArg, WithStagingDataset, + DestinationCapabilitiesContext, ) from dlt.common.destination import TLoaderFileFormat, Destination from dlt.common.destination.reference import DEFAULT_FILE_LAYOUT @@ -171,6 +172,10 @@ def destination_factory(self, **kwargs) -> Destination[Any, Any]: self.setup() return Destination.from_reference(dest_type, destination_name=dest_name, **kwargs) + def raw_capabilities(self) -> DestinationCapabilitiesContext: + dest = Destination.from_reference(self.destination_type) + return dest._raw_capabilities() + @property def name(self) -> str: name: str = self.destination_name or self.destination_type