Skip to content

Commit

Permalink
moved test function into tests folder and renamed test file
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorrit Sandbrink committed Jan 26, 2024
1 parent 7868ca6 commit b4cdd36
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 22 deletions.
18 changes: 0 additions & 18 deletions dlt/destinations/impl/synapse/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,24 +144,6 @@ def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema
table[TABLE_INDEX_TYPE_HINT] = self.config.default_table_index_type # type: ignore[typeddict-unknown-key]
return table

def get_storage_table_index_type(self, table_name: str) -> TTableIndexType:
"""Returns table index type of table in storage destination."""
with self.sql_client as sql_client:
schema_name = sql_client.fully_qualified_dataset_name(escape=False)
sql = dedent(f"""
SELECT
CASE i.type_desc
WHEN 'HEAP' THEN 'heap'
WHEN 'CLUSTERED COLUMNSTORE' THEN 'clustered_columnstore_index'
END AS table_index_type
FROM sys.indexes i
INNER JOIN sys.tables t ON t.object_id = i.object_id
INNER JOIN sys.schemas s ON s.schema_id = t.schema_id
WHERE s.name = '{schema_name}' AND t.name = '{table_name}'
""")
table_index_type = sql_client.execute_sql(sql)[0][0]
return cast(TTableIndexType, table_index_type)

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
job = super().start_file_load(table, file_path, load_id)
if not job:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from dlt.destinations.impl.synapse.synapse_adapter import TTableIndexType

from tests.load.utils import TABLE_UPDATE, TABLE_ROW_ALL_DATA_TYPES
from tests.load.synapse.utils import get_storage_table_index_type


TABLE_INDEX_TYPE_COLUMN_SCHEMA_PARAM_GRID = [
Expand Down Expand Up @@ -60,7 +61,7 @@ def items_without_table_index_type_specified() -> Iterator[Any]:
# Child tables, if any, inherit the index type of their parent.
tables = pipeline.default_schema.tables
for table_name in tables:
applied_table_index_type = job_client.get_storage_table_index_type(table_name) # type: ignore[attr-defined]
applied_table_index_type = get_storage_table_index_type(job_client.sql_client, table_name) # type: ignore[attr-defined]
if table_name in pipeline.default_schema.data_table_names():
# For data tables, the applied table index type should be the default value.
assert applied_table_index_type == job_client.config.default_table_index_type # type: ignore[attr-defined]
Expand All @@ -82,8 +83,9 @@ def items_with_table_index_type_specified() -> Iterator[Any]:
pipeline.run(
synapse_adapter(items_with_table_index_type_specified, "clustered_columnstore_index")
)
applied_table_index_type = job_client.get_storage_table_index_type( # type: ignore[attr-defined]
"items_with_table_index_type_specified"
applied_table_index_type = get_storage_table_index_type(
job_client.sql_client, # type: ignore[attr-defined]
"items_with_table_index_type_specified",
)
# While the default is "heap", the applied index type should be "clustered_columnstore_index"
# because it was provided as argument to the resource.
Expand Down Expand Up @@ -124,7 +126,7 @@ def items_with_table_index_type_specified() -> Iterator[Any]:
job_client = pipeline.destination_client()
tables = pipeline.default_schema.tables
for table_name in tables:
applied_table_index_type = job_client.get_storage_table_index_type(table_name) # type: ignore[attr-defined]
applied_table_index_type = get_storage_table_index_type(job_client.sql_client, table_name) # type: ignore[attr-defined]
if table_name in pipeline.default_schema.data_table_names():
# For data tables, the applied table index type should be the type
# configured in the resource.
Expand Down
24 changes: 24 additions & 0 deletions tests/load/synapse/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import cast
from textwrap import dedent

from dlt.destinations.impl.synapse.sql_client import SynapseSqlClient
from dlt.destinations.impl.synapse.synapse_adapter import TTableIndexType


def get_storage_table_index_type(sql_client: SynapseSqlClient, table_name: str) -> TTableIndexType:
"""Returns table index type of table in storage destination."""
with sql_client:
schema_name = sql_client.fully_qualified_dataset_name(escape=False)
sql = dedent(f"""
SELECT
CASE i.type_desc
WHEN 'HEAP' THEN 'heap'
WHEN 'CLUSTERED COLUMNSTORE' THEN 'clustered_columnstore_index'
END AS table_index_type
FROM sys.indexes i
INNER JOIN sys.tables t ON t.object_id = i.object_id
INNER JOIN sys.schemas s ON s.schema_id = t.schema_id
WHERE s.name = '{schema_name}' AND t.name = '{table_name}'
""")
table_index_type = sql_client.execute_sql(sql)[0][0]
return cast(TTableIndexType, table_index_type)

0 comments on commit b4cdd36

Please sign in to comment.