From b4cdd36e41af7e13849e255133a2654dde79ac7e Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink Date: Fri, 26 Jan 2024 22:06:11 +0100 Subject: [PATCH] moved test function into tests folder and renamed test file --- dlt/destinations/impl/synapse/synapse.py | 18 -------------- ...xing.py => test_synapse_table_indexing.py} | 10 ++++---- tests/load/synapse/utils.py | 24 +++++++++++++++++++ 3 files changed, 30 insertions(+), 22 deletions(-) rename tests/load/synapse/{test_table_indexing.py => test_synapse_table_indexing.py} (91%) create mode 100644 tests/load/synapse/utils.py diff --git a/dlt/destinations/impl/synapse/synapse.py b/dlt/destinations/impl/synapse/synapse.py index d34fef1ab4..eb6eae3f20 100644 --- a/dlt/destinations/impl/synapse/synapse.py +++ b/dlt/destinations/impl/synapse/synapse.py @@ -144,24 +144,6 @@ def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema table[TABLE_INDEX_TYPE_HINT] = self.config.default_table_index_type # type: ignore[typeddict-unknown-key] return table - def get_storage_table_index_type(self, table_name: str) -> TTableIndexType: - """Returns table index type of table in storage destination.""" - with self.sql_client as sql_client: - schema_name = sql_client.fully_qualified_dataset_name(escape=False) - sql = dedent(f""" - SELECT - CASE i.type_desc - WHEN 'HEAP' THEN 'heap' - WHEN 'CLUSTERED COLUMNSTORE' THEN 'clustered_columnstore_index' - END AS table_index_type - FROM sys.indexes i - INNER JOIN sys.tables t ON t.object_id = i.object_id - INNER JOIN sys.schemas s ON s.schema_id = t.schema_id - WHERE s.name = '{schema_name}' AND t.name = '{table_name}' - """) - table_index_type = sql_client.execute_sql(sql)[0][0] - return cast(TTableIndexType, table_index_type) - def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: job = super().start_file_load(table, file_path, load_id) if not job: diff --git a/tests/load/synapse/test_table_indexing.py b/tests/load/synapse/test_synapse_table_indexing.py similarity index 91% rename from tests/load/synapse/test_table_indexing.py rename to tests/load/synapse/test_synapse_table_indexing.py index 097bde09f9..af4786af9f 100644 --- a/tests/load/synapse/test_table_indexing.py +++ b/tests/load/synapse/test_synapse_table_indexing.py @@ -12,6 +12,7 @@ from dlt.destinations.impl.synapse.synapse_adapter import TTableIndexType from tests.load.utils import TABLE_UPDATE, TABLE_ROW_ALL_DATA_TYPES +from tests.load.synapse.utils import get_storage_table_index_type TABLE_INDEX_TYPE_COLUMN_SCHEMA_PARAM_GRID = [ @@ -60,7 +61,7 @@ def items_without_table_index_type_specified() -> Iterator[Any]: # Child tables, if any, inherit the index type of their parent. tables = pipeline.default_schema.tables for table_name in tables: - applied_table_index_type = job_client.get_storage_table_index_type(table_name) # type: ignore[attr-defined] + applied_table_index_type = get_storage_table_index_type(job_client.sql_client, table_name) # type: ignore[attr-defined] if table_name in pipeline.default_schema.data_table_names(): # For data tables, the applied table index type should be the default value. assert applied_table_index_type == job_client.config.default_table_index_type # type: ignore[attr-defined] @@ -82,8 +83,9 @@ def items_with_table_index_type_specified() -> Iterator[Any]: pipeline.run( synapse_adapter(items_with_table_index_type_specified, "clustered_columnstore_index") ) - applied_table_index_type = job_client.get_storage_table_index_type( # type: ignore[attr-defined] - "items_with_table_index_type_specified" + applied_table_index_type = get_storage_table_index_type( + job_client.sql_client, # type: ignore[attr-defined] + "items_with_table_index_type_specified", ) # While the default is "heap", the applied index type should be "clustered_columnstore_index" # because it was provided as argument to the resource. @@ -124,7 +126,7 @@ def items_with_table_index_type_specified() -> Iterator[Any]: job_client = pipeline.destination_client() tables = pipeline.default_schema.tables for table_name in tables: - applied_table_index_type = job_client.get_storage_table_index_type(table_name) # type: ignore[attr-defined] + applied_table_index_type = get_storage_table_index_type(job_client.sql_client, table_name) # type: ignore[attr-defined] if table_name in pipeline.default_schema.data_table_names(): # For data tables, the applied table index type should be the type # configured in the resource. diff --git a/tests/load/synapse/utils.py b/tests/load/synapse/utils.py new file mode 100644 index 0000000000..cd53716878 --- /dev/null +++ b/tests/load/synapse/utils.py @@ -0,0 +1,24 @@ +from typing import cast +from textwrap import dedent + +from dlt.destinations.impl.synapse.sql_client import SynapseSqlClient +from dlt.destinations.impl.synapse.synapse_adapter import TTableIndexType + + +def get_storage_table_index_type(sql_client: SynapseSqlClient, table_name: str) -> TTableIndexType: + """Returns table index type of table in storage destination.""" + with sql_client: + schema_name = sql_client.fully_qualified_dataset_name(escape=False) + sql = dedent(f""" + SELECT + CASE i.type_desc + WHEN 'HEAP' THEN 'heap' + WHEN 'CLUSTERED COLUMNSTORE' THEN 'clustered_columnstore_index' + END AS table_index_type + FROM sys.indexes i + INNER JOIN sys.tables t ON t.object_id = i.object_id + INNER JOIN sys.schemas s ON s.schema_id = t.schema_id + WHERE s.name = '{schema_name}' AND t.name = '{table_name}' + """) + table_index_type = sql_client.execute_sql(sql)[0][0] + return cast(TTableIndexType, table_index_type)