diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 59f13b30b9..1c28dffa8c 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -34,7 +34,7 @@ from dlt.common.schema import Schema, TTableSchema, TSchemaTables from dlt.common.schema.typing import TWriteDisposition from dlt.common.schema.exceptions import InvalidDatasetName -from dlt.common.schema.utils import get_write_disposition, get_table_format, get_table_index_type +from dlt.common.schema.utils import get_write_disposition, get_table_format from dlt.common.configuration import configspec, with_config, resolve_configuration, known_sections from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration from dlt.common.configuration.accessors import config @@ -372,8 +372,6 @@ def get_load_table(self, table_name: str, prepare_for_staging: bool = False) -> table["write_disposition"] = get_write_disposition(self.schema.tables, table_name) if "table_format" not in table: table["table_format"] = get_table_format(self.schema.tables, table_name) - if "table_index_type" not in table: - table["table_index_type"] = get_table_index_type(self.schema.tables, table_name) return table except KeyError: raise UnknownTableException(table_name) diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index 351d666553..9a27cbe4bb 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -62,8 +62,6 @@ """Known hints of a column used to declare hint regexes.""" TWriteDisposition = Literal["skip", "append", "replace", "merge"] TTableFormat = Literal["iceberg"] -TTableIndexType = Literal["heap", "clustered_columnstore_index"] -"Table index type. Currently only used for Synapse destination." TTypeDetections = Literal[ "timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double" ] @@ -167,7 +165,6 @@ class TTableSchema(TypedDict, total=False): columns: TTableSchemaColumns resource: Optional[str] table_format: Optional[TTableFormat] - table_index_type: Optional[TTableIndexType] class TPartialTableSchema(TTableSchema): diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index 5ea244148e..dc243f50dd 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -32,7 +32,6 @@ TColumnSchema, TColumnProp, TTableFormat, - TTableIndexType, TColumnHint, TTypeDetectionFunc, TTypeDetections, @@ -619,14 +618,6 @@ def get_table_format(tables: TSchemaTables, table_name: str) -> TTableFormat: ) -def get_table_index_type(tables: TSchemaTables, table_name: str) -> TTableIndexType: - """Returns table index type of a table if present. If not, looks up into parent table.""" - return cast( - TTableIndexType, - get_inherited_table_hint(tables, table_name, "table_index_type", allow_none=True), - ) - - def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool: """Checks if `table` schema contains column with type _typ""" return any(c.get("data_type") == _typ for c in table["columns"].values()) @@ -733,7 +724,6 @@ def new_table( resource: str = None, schema_contract: TSchemaContract = None, table_format: TTableFormat = None, - table_index_type: TTableIndexType = None, ) -> TTableSchema: table: TTableSchema = { "name": table_name, @@ -752,8 +742,6 @@ def new_table( table["schema_contract"] = schema_contract if table_format: table["table_format"] = table_format - if table_index_type is not None: - table["table_index_type"] = table_index_type if validate_schema: validate_dict_ignoring_xkeys( spec=TColumnSchema, diff --git a/dlt/destinations/adapters.py b/dlt/destinations/adapters.py index b8f12599dc..22c98d4f5a 100644 --- a/dlt/destinations/adapters.py +++ b/dlt/destinations/adapters.py @@ -2,5 +2,6 @@ from dlt.destinations.impl.weaviate import weaviate_adapter from dlt.destinations.impl.qdrant import qdrant_adapter +from dlt.destinations.impl.synapse import synapse_adapter -__all__ = ["weaviate_adapter", "qdrant_adapter"] +__all__ = ["weaviate_adapter", "qdrant_adapter", "synapse_adapter"] diff --git a/dlt/destinations/impl/qdrant/qdrant_adapter.py b/dlt/destinations/impl/qdrant/qdrant_adapter.py index 243cbd6c5b..215d87a920 100644 --- a/dlt/destinations/impl/qdrant/qdrant_adapter.py +++ b/dlt/destinations/impl/qdrant/qdrant_adapter.py @@ -2,6 +2,7 @@ from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns from dlt.extract import DltResource, resource as make_resource +from dlt.destinations.utils import ensure_resource VECTORIZE_HINT = "x-qdrant-embed" @@ -31,15 +32,7 @@ def qdrant_adapter( >>> qdrant_adapter(data, embed="description") [DltResource with hints applied] """ - # wrap `data` in a resource if not an instance already - resource: DltResource - if not isinstance(data, DltResource): - resource_name: str = None - if not hasattr(data, "__name__"): - resource_name = "content" - resource = make_resource(data, name=resource_name) - else: - resource = data + resource = ensure_resource(data) column_hints: TTableSchemaColumns = {} diff --git a/dlt/destinations/impl/synapse/__init__.py b/dlt/destinations/impl/synapse/__init__.py index 639d8a598f..53dbabc090 100644 --- a/dlt/destinations/impl/synapse/__init__.py +++ b/dlt/destinations/impl/synapse/__init__.py @@ -3,6 +3,8 @@ from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE from dlt.common.wei import EVM_DECIMAL_PRECISION +from dlt.destinations.impl.synapse.synapse_adapter import synapse_adapter + def capabilities() -> DestinationCapabilitiesContext: caps = DestinationCapabilitiesContext() diff --git a/dlt/destinations/impl/synapse/configuration.py b/dlt/destinations/impl/synapse/configuration.py index 34b227a2ac..cc0e40114b 100644 --- a/dlt/destinations/impl/synapse/configuration.py +++ b/dlt/destinations/impl/synapse/configuration.py @@ -2,7 +2,7 @@ from dlt.common import logger from dlt.common.configuration import configspec -from dlt.common.schema.typing import TTableIndexType, TSchemaTables +from dlt.common.schema.typing import TSchemaTables from dlt.common.schema.utils import get_write_disposition from dlt.destinations.impl.mssql.configuration import ( @@ -11,6 +11,8 @@ ) from dlt.destinations.impl.mssql.configuration import MsSqlCredentials +from dlt.destinations.impl.synapse.synapse_adapter import TTableIndexType + @configspec class SynapseCredentials(MsSqlCredentials): diff --git a/dlt/destinations/impl/synapse/factory.py b/dlt/destinations/impl/synapse/factory.py index 3d951f3d4a..0ac58001ca 100644 --- a/dlt/destinations/impl/synapse/factory.py +++ b/dlt/destinations/impl/synapse/factory.py @@ -1,13 +1,13 @@ import typing as t from dlt.common.destination import Destination, DestinationCapabilitiesContext -from dlt.common.schema.typing import TTableIndexType -from dlt.destinations.impl.synapse import capabilities +from dlt.destinations.impl.synapse import capabilities from dlt.destinations.impl.synapse.configuration import ( SynapseCredentials, SynapseClientConfiguration, ) +from dlt.destinations.impl.synapse.synapse_adapter import TTableIndexType if t.TYPE_CHECKING: from dlt.destinations.impl.synapse.synapse import SynapseClient diff --git a/dlt/destinations/impl/synapse/synapse.py b/dlt/destinations/impl/synapse/synapse.py index c29c0df3f5..d34fef1ab4 100644 --- a/dlt/destinations/impl/synapse/synapse.py +++ b/dlt/destinations/impl/synapse/synapse.py @@ -12,8 +12,8 @@ ) from dlt.common.schema import TTableSchema, TColumnSchema, Schema, TColumnHint -from dlt.common.schema.utils import table_schema_has_type -from dlt.common.schema.typing import TTableSchemaColumns, TTableIndexType +from dlt.common.schema.utils import table_schema_has_type, get_inherited_table_hint +from dlt.common.schema.typing import TTableSchemaColumns from dlt.common.configuration.specs import AzureCredentialsWithoutDefaults @@ -34,6 +34,10 @@ from dlt.destinations.impl.synapse import capabilities from dlt.destinations.impl.synapse.sql_client import SynapseSqlClient from dlt.destinations.impl.synapse.configuration import SynapseClientConfiguration +from dlt.destinations.impl.synapse.synapse_adapter import ( + TABLE_INDEX_TYPE_HINT, + TTableIndexType, +) HINT_TO_SYNAPSE_ATTR: Dict[TColumnHint, str] = { @@ -68,7 +72,7 @@ def _get_table_update_sql( if table is None: table_index_type = self.config.default_table_index_type else: - table_index_type = table.get("table_index_type") + table_index_type = cast(TTableIndexType, table.get(TABLE_INDEX_TYPE_HINT)) if table_index_type == "clustered_columnstore_index": new_columns = self._get_columstore_valid_columns(new_columns) @@ -128,9 +132,16 @@ def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema # configuration. Why? "For small lookup tables, less than 60 million rows, # consider using HEAP or clustered index for faster query performance." # https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-index#heap-tables - table["table_index_type"] = "heap" - if table["table_index_type"] is None: - table["table_index_type"] = self.config.default_table_index_type + table[TABLE_INDEX_TYPE_HINT] = "heap" # type: ignore[typeddict-unknown-key] + elif table_name in self.schema.data_table_names(): + if TABLE_INDEX_TYPE_HINT not in table: + # If present in parent table, fetch hint from there. + table[TABLE_INDEX_TYPE_HINT] = get_inherited_table_hint( # type: ignore[typeddict-unknown-key] + self.schema.tables, table_name, TABLE_INDEX_TYPE_HINT, allow_none=True + ) + if table[TABLE_INDEX_TYPE_HINT] is None: # type: ignore[typeddict-item] + # Hint still not defined, fall back to default. + table[TABLE_INDEX_TYPE_HINT] = self.config.default_table_index_type # type: ignore[typeddict-unknown-key] return table def get_storage_table_index_type(self, table_name: str) -> TTableIndexType: diff --git a/dlt/destinations/impl/synapse/synapse_adapter.py b/dlt/destinations/impl/synapse/synapse_adapter.py new file mode 100644 index 0000000000..f135dd967a --- /dev/null +++ b/dlt/destinations/impl/synapse/synapse_adapter.py @@ -0,0 +1,50 @@ +from typing import Any, Literal, Set, get_args, Final + +from dlt.extract import DltResource, resource as make_resource +from dlt.extract.typing import TTableHintTemplate +from dlt.extract.hints import TResourceHints +from dlt.destinations.utils import ensure_resource + +TTableIndexType = Literal["heap", "clustered_columnstore_index"] +""" +Table [index type](https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-index) used when creating the Synapse table. +This regards indexes specified at the table level, not the column level. +""" +TABLE_INDEX_TYPES: Set[TTableIndexType] = set(get_args(TTableIndexType)) + +TABLE_INDEX_TYPE_HINT: Literal["x-table-index-type"] = "x-table-index-type" + + +def synapse_adapter(data: Any, table_index_type: TTableIndexType = None) -> DltResource: + """Prepares data for the Synapse destination by specifying which table index + type should be used. + + Args: + data (Any): The data to be transformed. It can be raw data or an instance + of DltResource. If raw data, the function wraps it into a DltResource + object. + table_index_type (TTableIndexType, optional): The table index type used when creating + the Synapse table. + + Returns: + DltResource: A resource with applied Synapse-specific hints. + + Raises: + ValueError: If input for `table_index_type` is invalid. + + Examples: + >>> data = [{"name": "Anush", "description": "Integrations Hacker"}] + >>> synapse_adapter(data, table_index_type="clustered_columnstore_index") + [DltResource with hints applied] + """ + resource = ensure_resource(data) + + if table_index_type is not None: + if table_index_type not in TABLE_INDEX_TYPES: + allowed_types = ", ".join(TABLE_INDEX_TYPES) + raise ValueError( + f"Table index type {table_index_type} is invalid. Allowed table index" + f" types are: {allowed_types}." + ) + resource._hints[TABLE_INDEX_TYPE_HINT] = table_index_type # type: ignore[typeddict-unknown-key] + return resource diff --git a/dlt/destinations/impl/weaviate/weaviate_adapter.py b/dlt/destinations/impl/weaviate/weaviate_adapter.py index 2d5161d9e9..a290ac65b4 100644 --- a/dlt/destinations/impl/weaviate/weaviate_adapter.py +++ b/dlt/destinations/impl/weaviate/weaviate_adapter.py @@ -2,6 +2,7 @@ from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns from dlt.extract import DltResource, resource as make_resource +from dlt.destinations.utils import ensure_resource TTokenizationTMethod = Literal["word", "lowercase", "whitespace", "field"] TOKENIZATION_METHODS: Set[TTokenizationTMethod] = set(get_args(TTokenizationTMethod)) @@ -53,15 +54,7 @@ def weaviate_adapter( >>> weaviate_adapter(data, vectorize="description", tokenization={"description": "word"}) [DltResource with hints applied] """ - # wrap `data` in a resource if not an instance already - resource: DltResource - if not isinstance(data, DltResource): - resource_name: str = None - if not hasattr(data, "__name__"): - resource_name = "content" - resource = make_resource(data, name=resource_name) - else: - resource = data + resource = ensure_resource(data) column_hints: TTableSchemaColumns = {} if vectorize: diff --git a/dlt/destinations/utils.py b/dlt/destinations/utils.py new file mode 100644 index 0000000000..d4b945a840 --- /dev/null +++ b/dlt/destinations/utils.py @@ -0,0 +1,16 @@ +from typing import Any + +from dlt.extract import DltResource, resource as make_resource + + +def ensure_resource(data: Any) -> DltResource: + """Wraps `data` in a DltResource if it's not a DltResource already.""" + resource: DltResource + if not isinstance(data, DltResource): + resource_name: str = None + if not hasattr(data, "__name__"): + resource_name = "content" + resource = make_resource(data, name=resource_name) + else: + resource = data + return resource diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index 573d3d3ad0..cf7426e683 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -36,7 +36,6 @@ TAnySchemaColumns, TSchemaContract, TTableFormat, - TTableIndexType, ) from dlt.extract.utils import ( ensure_table_schema_columns_hint, @@ -257,7 +256,6 @@ def resource( merge_key: TTableHintTemplate[TColumnNames] = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, table_format: TTableHintTemplate[TTableFormat] = None, - table_index_type: TTableHintTemplate[TTableIndexType] = None, selected: bool = True, spec: Type[BaseConfiguration] = None, ) -> DltResource: ... @@ -275,7 +273,6 @@ def resource( merge_key: TTableHintTemplate[TColumnNames] = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, table_format: TTableHintTemplate[TTableFormat] = None, - table_index_type: TTableHintTemplate[TTableIndexType] = None, selected: bool = True, spec: Type[BaseConfiguration] = None, ) -> Callable[[Callable[TResourceFunParams, Any]], DltResource]: ... @@ -293,7 +290,6 @@ def resource( merge_key: TTableHintTemplate[TColumnNames] = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, table_format: TTableHintTemplate[TTableFormat] = None, - table_index_type: TTableHintTemplate[TTableIndexType] = None, selected: bool = True, spec: Type[BaseConfiguration] = None, standalone: Literal[True] = True, @@ -312,7 +308,6 @@ def resource( merge_key: TTableHintTemplate[TColumnNames] = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, table_format: TTableHintTemplate[TTableFormat] = None, - table_index_type: TTableHintTemplate[TTableIndexType] = None, selected: bool = True, spec: Type[BaseConfiguration] = None, ) -> DltResource: ... @@ -329,7 +324,6 @@ def resource( merge_key: TTableHintTemplate[TColumnNames] = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, table_format: TTableHintTemplate[TTableFormat] = None, - table_index_type: TTableHintTemplate[TTableIndexType] = None, selected: bool = True, spec: Type[BaseConfiguration] = None, standalone: bool = False, @@ -409,7 +403,6 @@ def make_resource( merge_key=merge_key, schema_contract=schema_contract, table_format=table_format, - table_index_type=table_index_type, ) return DltResource.from_data( _data, diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 36354eb0da..437dbbc6bd 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -12,7 +12,6 @@ TWriteDisposition, TAnySchemaColumns, TTableFormat, - TTableIndexType, TSchemaContract, ) from dlt.common.typing import TDataItem @@ -275,7 +274,6 @@ def new_table_template( merge_key: TTableHintTemplate[TColumnNames] = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, table_format: TTableHintTemplate[TTableFormat] = None, - table_index_type: TTableHintTemplate[TTableIndexType] = None, ) -> TResourceHints: validator, schema_contract = create_item_validator(columns, schema_contract) clean_columns = columns @@ -291,7 +289,6 @@ def new_table_template( columns=clean_columns, # type: ignore schema_contract=schema_contract, # type: ignore table_format=table_format, # type: ignore - table_index_type=table_index_type, # type: ignore ) if not table_name: new_template.pop("name") diff --git a/tests/load/pipeline/test_table_indexing.py b/tests/load/synapse/test_table_indexing.py similarity index 81% rename from tests/load/pipeline/test_table_indexing.py rename to tests/load/synapse/test_table_indexing.py index 5f62cddfee..097bde09f9 100644 --- a/tests/load/pipeline/test_table_indexing.py +++ b/tests/load/synapse/test_table_indexing.py @@ -5,16 +5,13 @@ import dlt from dlt.common.schema import TColumnSchema -from dlt.common.schema.typing import TTableIndexType, TSchemaTables -from dlt.common.schema.utils import get_table_index_type from dlt.destinations.sql_client import SqlClientBase +from dlt.destinations.impl.synapse import synapse_adapter +from dlt.destinations.impl.synapse.synapse_adapter import TTableIndexType + from tests.load.utils import TABLE_UPDATE, TABLE_ROW_ALL_DATA_TYPES -from tests.load.pipeline.utils import ( - destinations_configs, - DestinationTestConfiguration, -) TABLE_INDEX_TYPE_COLUMN_SCHEMA_PARAM_GRID = [ @@ -27,16 +24,10 @@ ] -@pytest.mark.parametrize( - "destination_config", - destinations_configs(default_sql_configs=True, subset=["synapse"]), - ids=lambda x: x.name, -) @pytest.mark.parametrize( "table_index_type,column_schema", TABLE_INDEX_TYPE_COLUMN_SCHEMA_PARAM_GRID ) def test_default_table_index_type_configuration( - destination_config: DestinationTestConfiguration, table_index_type: TTableIndexType, column_schema: Union[List[TColumnSchema], None], ) -> None: @@ -51,10 +42,13 @@ def test_default_table_index_type_configuration( def items_without_table_index_type_specified() -> Iterator[Any]: yield TABLE_ROW_ALL_DATA_TYPES - pipeline = destination_config.setup_pipeline( - f"test_default_table_index_type_{table_index_type}", + pipeline = dlt.pipeline( + pipeline_name=f"test_default_table_index_type_{table_index_type}", + destination="synapse", + dataset_name=f"test_default_table_index_type_{table_index_type}", full_refresh=True, ) + job_client = pipeline.destination_client() # Assert configuration value gets properly propagated to job client configuration. assert job_client.config.default_table_index_type == table_index_type # type: ignore[attr-defined] @@ -80,13 +74,14 @@ def items_without_table_index_type_specified() -> Iterator[Any]: @dlt.resource( name="items_with_table_index_type_specified", write_disposition="append", - table_index_type="clustered_columnstore_index", columns=column_schema, ) def items_with_table_index_type_specified() -> Iterator[Any]: yield TABLE_ROW_ALL_DATA_TYPES - pipeline.run(items_with_table_index_type_specified) + pipeline.run( + synapse_adapter(items_with_table_index_type_specified, "clustered_columnstore_index") + ) applied_table_index_type = job_client.get_storage_table_index_type( # type: ignore[attr-defined] "items_with_table_index_type_specified" ) @@ -95,35 +90,34 @@ def items_with_table_index_type_specified() -> Iterator[Any]: assert applied_table_index_type == "clustered_columnstore_index" -@pytest.mark.parametrize( - "destination_config", - destinations_configs(default_sql_configs=True, subset=["synapse"]), - ids=lambda x: x.name, -) @pytest.mark.parametrize( "table_index_type,column_schema", TABLE_INDEX_TYPE_COLUMN_SCHEMA_PARAM_GRID ) def test_resource_table_index_type_configuration( - destination_config: DestinationTestConfiguration, table_index_type: TTableIndexType, column_schema: Union[List[TColumnSchema], None], ) -> None: @dlt.resource( name="items_with_table_index_type_specified", write_disposition="append", - table_index_type=table_index_type, columns=column_schema, ) def items_with_table_index_type_specified() -> Iterator[Any]: yield TABLE_ROW_ALL_DATA_TYPES - pipeline = destination_config.setup_pipeline( - f"test_table_index_type_{table_index_type}", + pipeline = dlt.pipeline( + pipeline_name=f"test_table_index_type_{table_index_type}", + destination="synapse", + dataset_name=f"test_table_index_type_{table_index_type}", full_refresh=True, ) + # An invalid value for `table_index_type` should raise a ValueError. + with pytest.raises(ValueError): + pipeline.run(synapse_adapter(items_with_table_index_type_specified, "foo")) # type: ignore[arg-type] + # Run the pipeline and create the tables. - pipeline.run(items_with_table_index_type_specified) + pipeline.run(synapse_adapter(items_with_table_index_type_specified, table_index_type)) # For all tables, assert the applied index type equals the expected index type. # Child tables, if any, inherit the index type of their parent.