diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 1a6add6d2a..8f043ba4d5 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -403,7 +403,9 @@ def _from_db_type( return self.type_mapper.from_db_type(hive_t, precision, scale) def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str: - return f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}" + return ( + f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}" + ) def _iceberg_partition_clause(self, partition_hints: Optional[Dict[str, str]]) -> str: if not partition_hints: @@ -444,27 +446,21 @@ def _get_table_update_sql( partition_clause = self._iceberg_partition_clause( cast(Optional[Dict[str, str]], table.get(PARTITION_HINT)) ) - sql.append( - f"""CREATE TABLE {qualified_table_name} + sql.append(f"""CREATE TABLE {qualified_table_name} ({columns}) {partition_clause} LOCATION '{location.rstrip('/')}' - TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');""" - ) + TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');""") elif table_format == "jsonl": - sql.append( - f"""CREATE EXTERNAL TABLE {qualified_table_name} + sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name} ({columns}) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' - LOCATION '{location}';""" - ) + LOCATION '{location}';""") else: - sql.append( - f"""CREATE EXTERNAL TABLE {qualified_table_name} + sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name} ({columns}) STORED AS PARQUET - LOCATION '{location}';""" - ) + LOCATION '{location}';""") return sql def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: diff --git a/tests/load/athena_iceberg/test_athena_adapter.py b/tests/load/athena_iceberg/test_athena_adapter.py index 848a039205..3144eb9cc9 100644 --- a/tests/load/athena_iceberg/test_athena_adapter.py +++ b/tests/load/athena_iceberg/test_athena_adapter.py @@ -3,7 +3,9 @@ import dlt from dlt.destinations import filesystem from dlt.destinations.impl.athena.athena_adapter import athena_adapter, athena_partition -from tests.load.utils import destinations_configs, DestinationTestConfiguration + +# mark all tests as essential, do not remove +pytestmark = pytest.mark.essential def test_iceberg_partition_hints(): @@ -57,7 +59,10 @@ def not_partitioned_table(): )[0] # Partition clause is generated with original order - expected_clause = "PARTITIONED BY (`category`, month(`created_at`), bucket(10, `product_id`), truncate(2, `name`))" + expected_clause = ( + "PARTITIONED BY (`category`, month(`created_at`), bucket(10, `product_id`), truncate(2," + " `name`))" + ) assert expected_clause in sql_partitioned # No partition clause otherwise diff --git a/tests/load/pipeline/test_athena.py b/tests/load/pipeline/test_athena.py index fdc55e94ae..a5bb6efc0d 100644 --- a/tests/load/pipeline/test_athena.py +++ b/tests/load/pipeline/test_athena.py @@ -279,7 +279,8 @@ def partitioned_table(): partition_keys = {r[0] for r in rows} data_rows = sql_client.execute_sql( - f"SELECT id, category, created_at FROM {sql_client.make_qualified_table_name('partitioned_table')}" + "SELECT id, category, created_at FROM" + f" {sql_client.make_qualified_table_name('partitioned_table')}" ) # data_rows = [(i, c, d.toisoformat()) for i, c, d in data_rows]