From acfcd166b93eed8ff12e80e8bbc3dc91816cb562 Mon Sep 17 00:00:00 2001 From: Dave Date: Fri, 13 Oct 2023 11:22:04 +0200 Subject: [PATCH] small changes --- dlt/destinations/athena/athena.py | 23 +++++++++++--------- dlt/destinations/filesystem/configuration.py | 1 - dlt/load/load.py | 2 +- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/dlt/destinations/athena/athena.py b/dlt/destinations/athena/athena.py index c78b9ba2d7..9aa0493a4e 100644 --- a/dlt/destinations/athena/athena.py +++ b/dlt/destinations/athena/athena.py @@ -325,7 +325,8 @@ def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSc # for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries # or if we are in iceberg mode, we create iceberg tables for all tables - is_iceberg = (self.schema.tables[table_name].get("write_disposition", None) == "skip") or (self._is_iceberg_table(self.schema.tables[table_name]) and not self.in_staging_mode) + table = self.get_load_table(table_name, self.in_staging_mode) + is_iceberg = self._is_iceberg_table(table) or table.get("write_disposition", None) == "skip" columns = ", ".join([self._get_column_def_sql(c) for c in new_columns]) # this will fail if the table prefix is not properly defined @@ -359,7 +360,7 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> ) job = super().start_file_load(table, file_path, load_id) if not job: - job = DoNothingFollowupJob(file_path) if self._is_iceberg_table(table) else DoNothingJob(file_path) + job = DoNothingFollowupJob(file_path) if self._is_iceberg_table(self.get_load_table(table["name"])) else DoNothingJob(file_path) return job def _create_append_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: @@ -371,36 +372,38 @@ def _create_replace_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> if self._is_iceberg_table(table_chain[0]): return [SqlStagingCopyJob.from_table_chain(table_chain, self.sql_client, {"replace": True})] return super()._create_replace_followup_jobs(table_chain) - + def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: # fall back to append jobs for merge return self._create_append_followup_jobs(table_chain) def _is_iceberg_table(self, table: TTableSchema) -> bool: - table_format = get_table_format(self.schema.tables, table["name"]) - return table_format == "iceberg" or self.config.force_iceberg + table_format = table.get("table_format") + return table_format == "iceberg" def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool: # all iceberg tables need staging - if self._is_iceberg_table(table): + if self._is_iceberg_table(self.get_load_table(table["name"])): return True return super().should_load_data_to_staging_dataset(table) def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: # on athena we only truncate replace tables that are not iceberg table = self.get_load_table(table["name"]) - if table["write_disposition"] == "replace" and not self._is_iceberg_table(table): + if table["write_disposition"] == "replace" and not self._is_iceberg_table(self.get_load_table(table["name"])): return True return False def should_load_data_to_staging_dataset_on_staging_destination(self, table: TTableSchema) -> bool: """iceberg table data goes into staging on staging destination""" - return self._is_iceberg_table(table) + return self._is_iceberg_table(self.get_load_table(table["name"])) def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema: table = super().get_load_table(table_name, staging) - # if staging and table.get("table_format", None) == "iceberg": - # table.pop("table_format") + if staging and table.get("table_format", None) == "iceberg": + table.pop("table_format") + elif self.config.force_iceberg: + table["table_format"] = "iceberg" return table @staticmethod diff --git a/dlt/destinations/filesystem/configuration.py b/dlt/destinations/filesystem/configuration.py index 8939060231..174dfafb1a 100644 --- a/dlt/destinations/filesystem/configuration.py +++ b/dlt/destinations/filesystem/configuration.py @@ -10,7 +10,6 @@ @configspec class FilesystemDestinationClientConfiguration(FilesystemConfiguration, DestinationClientStagingConfiguration): # type: ignore[misc] destination_name: Final[str] = "filesystem" # type: ignore - force_iceberg: Optional[bool] = False @resolve_type('credentials') def resolve_credentials_type(self) -> Type[CredentialsConfiguration]: diff --git a/dlt/load/load.py b/dlt/load/load.py index b32156ae6e..ca8fff66df 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -277,7 +277,7 @@ def _get_table_chain_tables_with_filter(schema: Schema, f: Callable[[TTableSchem continue for table in get_child_tables(schema.tables, top_job_table["name"]): # only add tables for tables that have jobs unless the disposition is replace - # TODO: this is a (formerly used) hack to make test_merge_on_keys_in_schema, + # TODO: this is a (formerly used) hack to make test_merge_on_keys_in_schema, # we should change that test if not table["name"] in tables_with_jobs and top_job_table["write_disposition"] != "replace": continue