Skip to content

Commit

Permalink
refine staging table indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorrit Sandbrink committed Jan 29, 2024
1 parent 6d14d57 commit b87dd1b
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 9 deletions.
32 changes: 25 additions & 7 deletions dlt/destinations/impl/synapse/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,22 @@ def __init__(self, schema: Schema, config: SynapseClientConfiguration) -> None:
def _get_table_update_sql(
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
) -> List[str]:
table = self.get_load_table(table_name)
table = self.get_load_table(table_name, staging=self.in_staging_mode)
if table is None:
table_index_type = self.config.default_table_index_type
else:
table_index_type = cast(TTableIndexType, table.get(TABLE_INDEX_TYPE_HINT))
if table_index_type == "clustered_columnstore_index":
if self.in_staging_mode:
final_table = self.get_load_table(table_name, staging=False)
final_table_index_type = cast(
TTableIndexType, final_table.get(TABLE_INDEX_TYPE_HINT)
)
else:
final_table_index_type = table_index_type
if final_table_index_type == "clustered_columnstore_index":
# Even if the staging table has index type "heap", we still adjust
# the column data types to prevent errors when writing into the
# final table that has index type "clustered_columnstore_index".
new_columns = self._get_columstore_valid_columns(new_columns)

_sql_result = SqlJobClientBase._get_table_update_sql(
Expand Down Expand Up @@ -129,12 +139,20 @@ def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema
table = super().get_load_table(table_name, staging)
if table is None:
return None
if table_name in self.schema.dlt_table_names():
# dlt tables should always be heap tables, regardless of the user
# configuration. Why? "For small lookup tables, less than 60 million rows,
# consider using HEAP or clustered index for faster query performance."
# https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-index#heap-tables
if staging and self.config.replace_strategy == "insert-from-staging":
# Staging tables should always be heap tables, because "when you are
# temporarily landing data in dedicated SQL pool, you may find that
# using a heap table makes the overall process faster."
# "staging-optimized" is not included, because in that strategy the
# staging table becomes the final table, so we should already create
# it with the desired index type.
table[TABLE_INDEX_TYPE_HINT] = "heap" # type: ignore[typeddict-unknown-key]
elif table_name in self.schema.dlt_table_names():
# dlt tables should always be heap tables, because "for small lookup
# tables, less than 60 million rows, consider using HEAP or clustered
# index for faster query performance."
table[TABLE_INDEX_TYPE_HINT] = "heap" # type: ignore[typeddict-unknown-key]
# https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-index#heap-tables
elif table_name in self.schema.data_table_names():
if TABLE_INDEX_TYPE_HINT not in table:
# If present in parent table, fetch hint from there.
Expand Down
3 changes: 2 additions & 1 deletion docs/website/docs/dlt-ecosystem/destinations/synapse.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ Possible values:
>* **Set `default_table_index_type` to `"clustered_columnstore_index"` if you want to change the default** (see [additional destination options](#additional-destination-options)).
>* **CLUSTERED COLUMNSTORE INDEX tables do not support the `varchar(max)`, `nvarchar(max)`, and `varbinary(max)` data types.** If you don't specify the `precision` for columns that map to any of these types, `dlt` will use the maximum lengths `varchar(4000)`, `nvarchar(4000)`, and `varbinary(8000)`.
>* **While Synapse creates CLUSTERED COLUMNSTORE INDEXES by default, `dlt` creates HEAP tables by default.** HEAP is a more robust choice, because it supports all data types and doesn't require conversions.
>* **When using the `staging-optimized` [`replace` strategy](../../general-usage/full-loading.md), the staging tables are always created as HEAP tables**—any configuration of the table index types is ignored. The HEAP strategy makes sense
>* **When using the `insert-from-staging` [`replace` strategy](../../general-usage/full-loading.md), the staging tables are always created as HEAP tables**—any configuration of the table index types is ignored. The HEAP strategy makes sense
for staging tables for reasons explained [here](https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-index#heap-tables).
>* **When using the `staging-optimized` [`replace` strategy](../../general-usage/full-loading.md), the staging tables are already created with the configured table index type**, because the staging table becomes the final table.
>* **`dlt` system tables are always created as HEAP tables, regardless of any configuration.** This is in line with Microsoft's recommendation that "for small lookup tables, less than 60 million rows, consider using HEAP or clustered index for faster query performance."
>* Child tables, if any, inherent the table index type of their parent table.
Expand Down
11 changes: 10 additions & 1 deletion tests/load/synapse/test_synapse_table_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,22 @@ def items_with_table_index_type_specified() -> Iterator[Any]:
@pytest.mark.parametrize(
"table_index_type,column_schema", TABLE_INDEX_TYPE_COLUMN_SCHEMA_PARAM_GRID
)
@pytest.mark.parametrize(
# Also test staging replace strategies, to make sure the final table index
# type is not affected by staging table index type adjustments.
"replace_strategy",
["insert-from-staging", "staging-optimized"],
)
def test_resource_table_index_type_configuration(
table_index_type: TTableIndexType,
column_schema: Union[List[TColumnSchema], None],
replace_strategy: str,
) -> None:
os.environ["DESTINATION__REPLACE_STRATEGY"] = replace_strategy

@dlt.resource(
name="items_with_table_index_type_specified",
write_disposition="append",
write_disposition="replace",
columns=column_schema,
)
def items_with_table_index_type_specified() -> Iterator[Any]:
Expand Down

0 comments on commit b87dd1b

Please sign in to comment.