Skip to content

Commit

Permalink
Add supported_replace_strategies capapability
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Sep 19, 2024
1 parent bb03128 commit 9d6af27
Show file tree
Hide file tree
Showing 24 changed files with 74 additions and 20 deletions.
3 changes: 2 additions & 1 deletion dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
TTableSchema,
TLoaderMergeStrategy,
TTableFormat,
TLoaderReplaceStrategy,
)
from dlt.common.wei import EVM_DECIMAL_PRECISION

Expand Down Expand Up @@ -169,7 +170,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):

supported_merge_strategies: Sequence[TLoaderMergeStrategy] = None
merge_strategies_selector: MergeStrategySelector = None
# TODO: also add `supported_replace_strategies` capability
supported_replace_strategies: Sequence[TLoaderReplaceStrategy] = None

max_parallel_load_jobs: Optional[int] = None
"""The destination can set the maximum amount of parallel load jobs being executed"""
Expand Down
8 changes: 6 additions & 2 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@
from dlt.common.metrics import LoadJobMetrics
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.schema import Schema, TSchemaTables
from dlt.common.schema.typing import C_DLT_LOAD_ID, _TTableSchemaBase, TWriteDisposition
from dlt.common.schema.typing import (
C_DLT_LOAD_ID,
_TTableSchemaBase,
TWriteDisposition,
TLoaderReplaceStrategy,
)
from dlt.common.schema.utils import fill_hints_from_parent_and_clone_table
from dlt.common.configuration import configspec, resolve_configuration, known_sections, NotResolved
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
Expand All @@ -48,7 +53,6 @@
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.storages.load_package import LoadJobInfo, TPipelineStateDoc

TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]
TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration")
TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase")
TDestinationDwhClient = TypeVar("TDestinationDwhClient", bound="DestinationClientDwhConfiguration")
Expand Down
1 change: 1 addition & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class NormalizerInfo(TypedDict, total=True):

TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TLoaderMergeStrategy = Literal["delete-insert", "scd2", "upsert"]
TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]


WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition))
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/athena/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.timestamp_precision = 3
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
caps.merge_strategies_selector = athena_merge_strategies_selector
return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/bigquery/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_clone_table = True
caps.schema_supports_numeric_precision = False # no precision information in BigQuery
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/clickhouse/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_truncate_command = True

caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/databricks/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_multiple_statements = False
caps.supports_clone_table = True
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]
return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/dremio/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_multiple_statements = False
caps.timestamp_precision = 3
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supoorted_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/duckdb/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.alter_add_multi_column = False
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/filesystem/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
"reference",
]
caps.has_case_sensitive_identifiers = True
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/lancedb/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:

caps.decimal_precision = (38, 18)
caps.timestamp_precision = 6
caps.supported_replace_strategies = ["truncate-and-insert"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/motherduck/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.max_parallel_load_jobs = 8
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/mssql/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.max_rows_per_insert = 1000
caps.timestamp_precision = 7
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]

return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/postgres/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.is_max_text_data_type_length_in_bytes = True
caps.supports_ddl_transactions = True
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/qdrant/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.max_text_data_type_length = 8 * 1024 * 1024
caps.is_max_text_data_type_length_in_bytes = False
caps.supports_ddl_transactions = False
caps.supported_replace_strategies = ["truncate-and-insert"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/redshift/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_ddl_transactions = True
caps.alter_add_multi_column = False
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
5 changes: 5 additions & 0 deletions dlt/destinations/impl/snowflake/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.alter_add_multi_column = True
caps.supports_clone_table = True
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
"insert-from-staging",
"staging-optimized",
]
return caps

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/sqlalchemy/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
# Multiple concatenated statements are not supported by all engines, so leave them off by default
caps.supports_multiple_statements = False
caps.type_mapper = SqlalchemyTypeMapper
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/synapse/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.timestamp_precision = 7

caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]

return caps

Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/weaviate/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.is_max_text_data_type_length_in_bytes = False
caps.supports_ddl_transactions = False
caps.naming_convention = "dlt.destinations.impl.weaviate.naming"
caps.supported_replace_strategies = ["truncate-and-insert"]

return caps

Expand Down
8 changes: 2 additions & 6 deletions tests/load/pipeline/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
destinations_configs,
DestinationTestConfiguration,
)
from tests.load.pipeline.utils import REPLACE_STRATEGIES
from tests.load.pipeline.utils import REPLACE_STRATEGIES, skip_if_unsupported_replace_strategy

# mark all tests as essential, do not remove
pytestmark = pytest.mark.essential
Expand Down Expand Up @@ -641,11 +641,7 @@ def test_dataset_name_change(destination_config: DestinationTestConfiguration) -
def test_pipeline_upfront_tables_two_loads(
destination_config: DestinationTestConfiguration, replace_strategy: str
) -> None:
if not destination_config.supports_merge and replace_strategy != "truncate-and-insert":
pytest.skip(
f"Destination {destination_config.name} does not support merge and thus"
f" {replace_strategy}"
)
skip_if_unsupported_replace_strategy(destination_config, replace_strategy)

# use staging tables for replace
os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy
Expand Down
14 changes: 3 additions & 11 deletions tests/load/pipeline/test_replace_disposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
destinations_configs,
DestinationTestConfiguration,
)
from tests.load.pipeline.utils import REPLACE_STRATEGIES
from tests.load.pipeline.utils import REPLACE_STRATEGIES, skip_if_unsupported_replace_strategy


@pytest.mark.essential
Expand All @@ -24,11 +24,7 @@
def test_replace_disposition(
destination_config: DestinationTestConfiguration, replace_strategy: str
) -> None:
if not destination_config.supports_merge and replace_strategy != "truncate-and-insert":
pytest.skip(
f"Destination {destination_config.name} does not support merge and thus"
f" {replace_strategy}"
)
skip_if_unsupported_replace_strategy(destination_config, replace_strategy)

# only allow 40 items per file
os.environ["DATA_WRITER__FILE_MAX_ITEMS"] = "40"
Expand Down Expand Up @@ -242,11 +238,7 @@ def load_items_none():
def test_replace_table_clearing(
destination_config: DestinationTestConfiguration, replace_strategy: str
) -> None:
if not destination_config.supports_merge and replace_strategy != "truncate-and-insert":
pytest.skip(
f"Destination {destination_config.name} does not support merge and thus"
f" {replace_strategy}"
)
skip_if_unsupported_replace_strategy(destination_config, replace_strategy)

# use staging tables for replace
os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy
Expand Down
18 changes: 18 additions & 0 deletions tests/load/pipeline/utils.py
Original file line number Diff line number Diff line change
@@ -1 +1,19 @@
import pytest

from tests.load.utils import DestinationTestConfiguration

REPLACE_STRATEGIES = ["truncate-and-insert", "insert-from-staging", "staging-optimized"]


def skip_if_unsupported_replace_strategy(
destination_config: DestinationTestConfiguration, replace_strategy: str
):
"""Skip test if destination does not support the given replace strategy."""
supported_replace_strategies = (
destination_config.raw_capabilities().supported_replace_strategies
)
if not supported_replace_strategies or replace_strategy not in supported_replace_strategies:
pytest.skip(
f"Destination {destination_config.name} does not support the replace strategy"
f" {replace_strategy}"
)
5 changes: 5 additions & 0 deletions tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
DestinationClientStagingConfiguration,
TDestinationReferenceArg,
WithStagingDataset,
DestinationCapabilitiesContext,
)
from dlt.common.destination import TLoaderFileFormat, Destination
from dlt.common.destination.reference import DEFAULT_FILE_LAYOUT
Expand Down Expand Up @@ -171,6 +172,10 @@ def destination_factory(self, **kwargs) -> Destination[Any, Any]:
self.setup()
return Destination.from_reference(dest_type, destination_name=dest_name, **kwargs)

def raw_capabilities(self) -> DestinationCapabilitiesContext:
dest = Destination.from_reference(self.destination_type)
return dest._raw_capabilities()

@property
def name(self) -> str:
name: str = self.destination_name or self.destination_type
Expand Down

0 comments on commit 9d6af27

Please sign in to comment.