Skip to content

Commit

Permalink
small changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Oct 13, 2023
1 parent 00e474c commit acfcd16
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
23 changes: 13 additions & 10 deletions dlt/destinations/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSc

# for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries
# or if we are in iceberg mode, we create iceberg tables for all tables
is_iceberg = (self.schema.tables[table_name].get("write_disposition", None) == "skip") or (self._is_iceberg_table(self.schema.tables[table_name]) and not self.in_staging_mode)
table = self.get_load_table(table_name, self.in_staging_mode)
is_iceberg = self._is_iceberg_table(table) or table.get("write_disposition", None) == "skip"
columns = ", ".join([self._get_column_def_sql(c) for c in new_columns])

# this will fail if the table prefix is not properly defined
Expand Down Expand Up @@ -359,7 +360,7 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
)
job = super().start_file_load(table, file_path, load_id)
if not job:
job = DoNothingFollowupJob(file_path) if self._is_iceberg_table(table) else DoNothingJob(file_path)
job = DoNothingFollowupJob(file_path) if self._is_iceberg_table(self.get_load_table(table["name"])) else DoNothingJob(file_path)
return job

def _create_append_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
Expand All @@ -371,36 +372,38 @@ def _create_replace_followup_jobs(self, table_chain: Sequence[TTableSchema]) ->
if self._is_iceberg_table(table_chain[0]):
return [SqlStagingCopyJob.from_table_chain(table_chain, self.sql_client, {"replace": True})]
return super()._create_replace_followup_jobs(table_chain)

def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
# fall back to append jobs for merge
return self._create_append_followup_jobs(table_chain)

def _is_iceberg_table(self, table: TTableSchema) -> bool:
table_format = get_table_format(self.schema.tables, table["name"])
return table_format == "iceberg" or self.config.force_iceberg
table_format = table.get("table_format")
return table_format == "iceberg"

def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool:
# all iceberg tables need staging
if self._is_iceberg_table(table):
if self._is_iceberg_table(self.get_load_table(table["name"])):
return True
return super().should_load_data_to_staging_dataset(table)

def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
# on athena we only truncate replace tables that are not iceberg
table = self.get_load_table(table["name"])
if table["write_disposition"] == "replace" and not self._is_iceberg_table(table):
if table["write_disposition"] == "replace" and not self._is_iceberg_table(self.get_load_table(table["name"])):
return True
return False

def should_load_data_to_staging_dataset_on_staging_destination(self, table: TTableSchema) -> bool:
"""iceberg table data goes into staging on staging destination"""
return self._is_iceberg_table(table)
return self._is_iceberg_table(self.get_load_table(table["name"]))

def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema:
table = super().get_load_table(table_name, staging)
# if staging and table.get("table_format", None) == "iceberg":
# table.pop("table_format")
if staging and table.get("table_format", None) == "iceberg":
table.pop("table_format")
elif self.config.force_iceberg:
table["table_format"] = "iceberg"
return table

@staticmethod
Expand Down
1 change: 0 additions & 1 deletion dlt/destinations/filesystem/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
@configspec
class FilesystemDestinationClientConfiguration(FilesystemConfiguration, DestinationClientStagingConfiguration): # type: ignore[misc]
destination_name: Final[str] = "filesystem" # type: ignore
force_iceberg: Optional[bool] = False

@resolve_type('credentials')
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
Expand Down
2 changes: 1 addition & 1 deletion dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def _get_table_chain_tables_with_filter(schema: Schema, f: Callable[[TTableSchem
continue
for table in get_child_tables(schema.tables, top_job_table["name"]):
# only add tables for tables that have jobs unless the disposition is replace
# TODO: this is a (formerly used) hack to make test_merge_on_keys_in_schema,
# TODO: this is a (formerly used) hack to make test_merge_on_keys_in_schema,
# we should change that test
if not table["name"] in tables_with_jobs and top_job_table["write_disposition"] != "replace":
continue
Expand Down

0 comments on commit acfcd16

Please sign in to comment.