From b87dd1b744bc4c9fe3f2b6ac1cbea08c58296eb5 Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Mon, 29 Jan 2024 16:08:22 +0100 Subject: [PATCH] refine staging table indexing --- dlt/destinations/impl/synapse/synapse.py | 32 +++++++++++++++---- .../dlt-ecosystem/destinations/synapse.md | 3 +- .../synapse/test_synapse_table_indexing.py | 11 ++++++- 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/dlt/destinations/impl/synapse/synapse.py b/dlt/destinations/impl/synapse/synapse.py index 268ffad933..33e6194602 100644 --- a/dlt/destinations/impl/synapse/synapse.py +++ b/dlt/destinations/impl/synapse/synapse.py @@ -70,12 +70,22 @@ def __init__(self, schema: Schema, config: SynapseClientConfiguration) -> None: def _get_table_update_sql( self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool ) -> List[str]: - table = self.get_load_table(table_name) + table = self.get_load_table(table_name, staging=self.in_staging_mode) if table is None: table_index_type = self.config.default_table_index_type else: table_index_type = cast(TTableIndexType, table.get(TABLE_INDEX_TYPE_HINT)) - if table_index_type == "clustered_columnstore_index": + if self.in_staging_mode: + final_table = self.get_load_table(table_name, staging=False) + final_table_index_type = cast( + TTableIndexType, final_table.get(TABLE_INDEX_TYPE_HINT) + ) + else: + final_table_index_type = table_index_type + if final_table_index_type == "clustered_columnstore_index": + # Even if the staging table has index type "heap", we still adjust + # the column data types to prevent errors when writing into the + # final table that has index type "clustered_columnstore_index". new_columns = self._get_columstore_valid_columns(new_columns) _sql_result = SqlJobClientBase._get_table_update_sql( @@ -129,12 +139,20 @@ def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema table = super().get_load_table(table_name, staging) if table is None: return None - if table_name in self.schema.dlt_table_names(): - # dlt tables should always be heap tables, regardless of the user - # configuration. Why? "For small lookup tables, less than 60 million rows, - # consider using HEAP or clustered index for faster query performance." - # https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-index#heap-tables + if staging and self.config.replace_strategy == "insert-from-staging": + # Staging tables should always be heap tables, because "when you are + # temporarily landing data in dedicated SQL pool, you may find that + # using a heap table makes the overall process faster." + # "staging-optimized" is not included, because in that strategy the + # staging table becomes the final table, so we should already create + # it with the desired index type. + table[TABLE_INDEX_TYPE_HINT] = "heap" # type: ignore[typeddict-unknown-key] + elif table_name in self.schema.dlt_table_names(): + # dlt tables should always be heap tables, because "for small lookup + # tables, less than 60 million rows, consider using HEAP or clustered + # index for faster query performance." table[TABLE_INDEX_TYPE_HINT] = "heap" # type: ignore[typeddict-unknown-key] + # https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-index#heap-tables elif table_name in self.schema.data_table_names(): if TABLE_INDEX_TYPE_HINT not in table: # If present in parent table, fetch hint from there. diff --git a/docs/website/docs/dlt-ecosystem/destinations/synapse.md b/docs/website/docs/dlt-ecosystem/destinations/synapse.md index 4d66714ce3..dcfd92b9fb 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/synapse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/synapse.md @@ -135,8 +135,9 @@ Possible values: >* **Set `default_table_index_type` to `"clustered_columnstore_index"` if you want to change the default** (see [additional destination options](#additional-destination-options)). >* **CLUSTERED COLUMNSTORE INDEX tables do not support the `varchar(max)`, `nvarchar(max)`, and `varbinary(max)` data types.** If you don't specify the `precision` for columns that map to any of these types, `dlt` will use the maximum lengths `varchar(4000)`, `nvarchar(4000)`, and `varbinary(8000)`. >* **While Synapse creates CLUSTERED COLUMNSTORE INDEXES by default, `dlt` creates HEAP tables by default.** HEAP is a more robust choice, because it supports all data types and doesn't require conversions. ->* **When using the `staging-optimized` [`replace` strategy](../../general-usage/full-loading.md), the staging tables are always created as HEAP tables**—any configuration of the table index types is ignored. The HEAP strategy makes sense +>* **When using the `insert-from-staging` [`replace` strategy](../../general-usage/full-loading.md), the staging tables are always created as HEAP tables**—any configuration of the table index types is ignored. The HEAP strategy makes sense for staging tables for reasons explained [here](https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-index#heap-tables). +>* **When using the `staging-optimized` [`replace` strategy](../../general-usage/full-loading.md), the staging tables are already created with the configured table index type**, because the staging table becomes the final table. >* **`dlt` system tables are always created as HEAP tables, regardless of any configuration.** This is in line with Microsoft's recommendation that "for small lookup tables, less than 60 million rows, consider using HEAP or clustered index for faster query performance." >* Child tables, if any, inherent the table index type of their parent table. diff --git a/tests/load/synapse/test_synapse_table_indexing.py b/tests/load/synapse/test_synapse_table_indexing.py index e87b83fa3f..df90933de4 100644 --- a/tests/load/synapse/test_synapse_table_indexing.py +++ b/tests/load/synapse/test_synapse_table_indexing.py @@ -98,13 +98,22 @@ def items_with_table_index_type_specified() -> Iterator[Any]: @pytest.mark.parametrize( "table_index_type,column_schema", TABLE_INDEX_TYPE_COLUMN_SCHEMA_PARAM_GRID ) +@pytest.mark.parametrize( + # Also test staging replace strategies, to make sure the final table index + # type is not affected by staging table index type adjustments. + "replace_strategy", + ["insert-from-staging", "staging-optimized"], +) def test_resource_table_index_type_configuration( table_index_type: TTableIndexType, column_schema: Union[List[TColumnSchema], None], + replace_strategy: str, ) -> None: + os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy + @dlt.resource( name="items_with_table_index_type_specified", - write_disposition="append", + write_disposition="replace", columns=column_schema, ) def items_with_table_index_type_specified() -> Iterator[Any]: