Skip to content

Commit

Permalink
marks athena adapter tests essential
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed May 27, 2024
1 parent 2f4bf63 commit 593ef02
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 16 deletions.
22 changes: 9 additions & 13 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,9 @@ def _from_db_type(
return self.type_mapper.from_db_type(hive_t, precision, scale)

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
return f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}"
return (
f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}"
)

def _iceberg_partition_clause(self, partition_hints: Optional[Dict[str, str]]) -> str:
if not partition_hints:
Expand Down Expand Up @@ -444,27 +446,21 @@ def _get_table_update_sql(
partition_clause = self._iceberg_partition_clause(
cast(Optional[Dict[str, str]], table.get(PARTITION_HINT))
)
sql.append(
f"""CREATE TABLE {qualified_table_name}
sql.append(f"""CREATE TABLE {qualified_table_name}
({columns})
{partition_clause}
LOCATION '{location.rstrip('/')}'
TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');"""
)
TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');""")
elif table_format == "jsonl":
sql.append(
f"""CREATE EXTERNAL TABLE {qualified_table_name}
sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name}
({columns})
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION '{location}';"""
)
LOCATION '{location}';""")
else:
sql.append(
f"""CREATE EXTERNAL TABLE {qualified_table_name}
sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name}
({columns})
STORED AS PARQUET
LOCATION '{location}';"""
)
LOCATION '{location}';""")
return sql

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
Expand Down
9 changes: 7 additions & 2 deletions tests/load/athena_iceberg/test_athena_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import dlt
from dlt.destinations import filesystem
from dlt.destinations.impl.athena.athena_adapter import athena_adapter, athena_partition
from tests.load.utils import destinations_configs, DestinationTestConfiguration

# mark all tests as essential, do not remove
pytestmark = pytest.mark.essential


def test_iceberg_partition_hints():
Expand Down Expand Up @@ -57,7 +59,10 @@ def not_partitioned_table():
)[0]

# Partition clause is generated with original order
expected_clause = "PARTITIONED BY (`category`, month(`created_at`), bucket(10, `product_id`), truncate(2, `name`))"
expected_clause = (
"PARTITIONED BY (`category`, month(`created_at`), bucket(10, `product_id`), truncate(2,"
" `name`))"
)
assert expected_clause in sql_partitioned

# No partition clause otherwise
Expand Down
3 changes: 2 additions & 1 deletion tests/load/pipeline/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ def partitioned_table():
partition_keys = {r[0] for r in rows}

data_rows = sql_client.execute_sql(
f"SELECT id, category, created_at FROM {sql_client.make_qualified_table_name('partitioned_table')}"
"SELECT id, category, created_at FROM"
f" {sql_client.make_qualified_table_name('partitioned_table')}"
)
# data_rows = [(i, c, d.toisoformat()) for i, c, d in data_rows]

Expand Down

0 comments on commit 593ef02

Please sign in to comment.