Skip to content

Commit

Permalink
remove some iceberg specific code
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Oct 9, 2023
1 parent 3f4fb1e commit 92613ec
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 148 deletions.
1 change: 0 additions & 1 deletion .github/workflows/test_destination_athena.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ env:
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB
ACTIVE_DESTINATIONS: "[\"athena\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-parquet-staging-iceberg\"]"

jobs:
get_docs_changes:
Expand Down
94 changes: 0 additions & 94 deletions .github/workflows/test_destination_athena_iceberg.yml

This file was deleted.

34 changes: 13 additions & 21 deletions dlt/destinations/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,11 @@ def __init__(self, schema: Schema, config: AthenaClientConfiguration) -> None:
super().__init__(schema, config, sql_client)
self.sql_client: AthenaSQLClient = sql_client # type: ignore
self.config: AthenaClientConfiguration = config
self.iceberg_mode = not (not self.config.iceberg_bucket_url)
self.type_mapper = AthenaTypeMapper(self.capabilities, self.iceberg_mode)
self.type_mapper = AthenaTypeMapper(self.capabilities, True)

def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
# only truncate tables in iceberg mode
if not self.iceberg_mode or self.in_staging_mode:
truncate_tables = []
truncate_tables = []
super().initialize_storage(truncate_tables)

def _from_db_type(self, hive_t: str, precision: Optional[int], scale: Optional[int]) -> TColumnType:
Expand All @@ -320,23 +318,14 @@ def _get_column_def_sql(self, c: TColumnSchema) -> str:

def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool) -> List[str]:

create_data_iceberg_tables = self.iceberg_mode and not self.in_staging_mode

bucket = self.config.staging_config.bucket_url
dataset = self.sql_client.dataset_name

if create_data_iceberg_tables:
bucket = self.config.iceberg_bucket_url

# strip the staging portion from the dataset name if we are in iceberg mode
if self.iceberg_mode and self.in_staging_mode and dataset.endswith("_staging") :
dataset = dataset[:-len("_staging")]

sql: List[str] = []

# for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries
# or if we are in iceberg mode, we create iceberg tables for all tables
is_iceberg = create_data_iceberg_tables or (self.schema.tables[table_name].get("write_disposition", None) == "skip")
is_iceberg = self.schema.tables[table_name].get("write_disposition", None) == "skip"
columns = ", ".join([self._get_column_def_sql(c) for c in new_columns])

# this will fail if the table prefix is not properly defined
Expand Down Expand Up @@ -369,28 +358,31 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
)
job = super().start_file_load(table, file_path, load_id)
if not job:
job = DoNothingFollowupJob(file_path) if self.iceberg_mode else DoNothingJob(file_path)
job = DoNothingFollowupJob(file_path) if self._is_iceberg_table(table) else DoNothingJob(file_path)
return job

def _create_append_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
if self.iceberg_mode:
if self._is_iceberg_table(table_chain[0]):
return [SqlStagingCopyJob.from_table_chain(table_chain, self.sql_client, {"replace": False})]
return super()._create_append_followup_jobs(table_chain)

def _create_replace_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
if self.iceberg_mode:
if self._is_iceberg_table(table_chain[0]):
return [SqlStagingCopyJob.from_table_chain(table_chain, self.sql_client, {"replace": True})]
return super()._create_replace_followup_jobs(table_chain)

def _is_iceberg_table(self, table: TTableSchema) -> bool:
return False

def get_stage_dispositions(self) -> List[TWriteDisposition]:
# in iceberg mode, we always use staging tables
if self.iceberg_mode:
return ["append", "replace", "merge"]
# if self.iceberg_mode:
# return ["append", "replace", "merge"]
return super().get_stage_dispositions()

def get_truncate_staging_destination_table_dispositions(self) -> List[TWriteDisposition]:
if self.iceberg_mode:
return ["append", "replace", "merge"]
# if self.iceberg_mode:
# return ["append", "replace", "merge"]
return []

@staticmethod
Expand Down
1 change: 0 additions & 1 deletion dlt/destinations/athena/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
class AthenaClientConfiguration(DestinationClientDwhWithStagingConfiguration):
destination_name: Final[str] = "athena" # type: ignore[misc]
query_result_bucket: str = None
iceberg_bucket_url: Optional[str] = None
credentials: AwsCredentials = None
athena_work_group: Optional[str] = None
aws_data_catalog: Optional[str] = "awsdatacatalog"
Expand Down
19 changes: 6 additions & 13 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,17 @@ def gen_key_table_clauses(cls, root_table_name: str, staging_root_table_name: st
A list of clauses may be returned for engines that do not support OR in subqueries. Like BigQuery
"""
return [f"FROM {root_table_name} WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} as s WHERE {' OR '.join([c.format(d=root_table_name,s='s') for c in key_clauses])})"]
return [f"FROM {root_table_name} as d WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} as s WHERE {' OR '.join([c.format(d='d',s='s') for c in key_clauses])})"]

@classmethod
def gen_delete_temp_table_sql(cls, unique_column: str, key_table_clauses: Sequence[str], root_table_name: str) -> Tuple[List[str], str]:
def gen_delete_temp_table_sql(cls, unique_column: str, key_table_clauses: Sequence[str]) -> Tuple[List[str], str]:
"""Generate sql that creates delete temp table and inserts `unique_column` from root table for all records to delete. May return several statements.
Returns temp table name for cases where special names are required like SQLServer.
"""
sql: List[str] = []
temp_table_name = cls._new_temp_table_name("delete")
select_statement = f"SELECT {root_table_name}.{unique_column} {key_table_clauses[0]}"
select_statement = f"SELECT d.{unique_column} {key_table_clauses[0]}"
sql.append(cls._to_temp_table(select_statement, temp_table_name))
for clause in key_table_clauses[1:]:
sql.append(f"INSERT INTO {temp_table_name} SELECT {unique_column} {clause};")
Expand Down Expand Up @@ -143,7 +143,7 @@ def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str:
Returns:
sql statement that inserts data from selects into temp table
"""
return f"CREATE TABLE {temp_table_name} AS {select_sql};"
return f"CREATE TEMP TABLE {temp_table_name} AS {select_sql};"

@classmethod
def gen_merge_sql(cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any]) -> List[str]:
Expand All @@ -162,7 +162,6 @@ def gen_merge_sql(cls, table_chain: Sequence[TTableSchema], sql_client: SqlClien
unique_column: str = None
root_key_column: str = None
insert_temp_table_name: str = None
delete_temp_table_name: str = None


if len(table_chain) == 1:
Expand All @@ -184,7 +183,7 @@ def gen_merge_sql(cls, table_chain: Sequence[TTableSchema], sql_client: SqlClien
# get first unique column
unique_column = sql_client.capabilities.escape_identifier(unique_columns[0])
# create temp table with unique identifier
create_delete_temp_table_sql, delete_temp_table_name = cls.gen_delete_temp_table_sql(unique_column, key_table_clauses, root_table_name)
create_delete_temp_table_sql, delete_temp_table_name = cls.gen_delete_temp_table_sql(unique_column, key_table_clauses)
sql.extend(create_delete_temp_table_sql)
# delete top table
sql.append(f"DELETE FROM {root_table_name} WHERE {unique_column} IN (SELECT * FROM {delete_temp_table_name});")
Expand Down Expand Up @@ -230,10 +229,4 @@ def gen_merge_sql(cls, table_chain: Sequence[TTableSchema], sql_client: SqlClien
sql.append(insert_sql)
# -- DELETE FROM {staging_table_name} WHERE 1=1;

# clean up
if insert_temp_table_name:
sql.append(f"DROP TABLE {insert_temp_table_name};")
if delete_temp_table_name:
sql.append(f"DROP TABLE {delete_temp_table_name};")

return sql
return sql
Empty file.
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
"""
Temporary test file for iceberg
"""

import pytest
import os
Expand All @@ -22,14 +19,11 @@


def test_iceberg() -> None:

os.environ['DESTINATION__FILESYSTEM__BUCKET_URL'] = "s3://dlt-ci-test-bucket"
os.environ['DESTINATION__ATHENA__ICEBERG_BUCKET_URL'] = "s3://dlt-ci-test-bucket/iceberg"

pipeline = dlt.pipeline(pipeline_name="aaathena", destination="athena", staging="filesystem", full_refresh=True)
pipeline = dlt.pipeline(pipeline_name="aaathena-iceberg", destination="athena", staging="filesystem", full_refresh=True)

@dlt.resource(name="items", write_disposition="append")
def items():
def items() -> Iterator[Any]:
yield {
"id": 1,
"name": "item",
Expand All @@ -42,7 +36,17 @@ def items():
}]
}

print(pipeline.run(items))
@dlt.resource(name="items_normal", write_disposition="append")
def items_normal():
yield from items()

@dlt.resource(name="items_iceberg", write_disposition="append")
def items_iceberg():
yield from items()

print(pipeline.run([items_normal, items_iceberg]))

return

# see if we have athena tables with items
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema._schema_tables.values() ])
Expand Down
8 changes: 1 addition & 7 deletions tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.job_client_impl import SqlJobClientBase

from tests.utils import ACTIVE_DESTINATIONS, IMPLEMENTED_DESTINATIONS, SQL_DESTINATIONS, EXCLUDED_DESTINATION_CONFIGURATIONS
from tests.utils import ACTIVE_DESTINATIONS, IMPLEMENTED_DESTINATIONS, SQL_DESTINATIONS
from tests.cases import TABLE_UPDATE_COLUMNS_SCHEMA, TABLE_UPDATE, TABLE_ROW_ALL_DATA_TYPES, assert_all_data_types_row

# bucket urls
Expand All @@ -49,7 +49,6 @@ class DestinationTestConfiguration:
staging: Optional[str] = None
file_format: Optional[TLoaderFileFormat] = None
bucket_url: Optional[str] = None
iceberg_bucket_url: Optional[str] = None
stage_name: Optional[str] = None
staging_iam_role: Optional[str] = None
extra_info: Optional[str] = None
Expand All @@ -73,7 +72,6 @@ def setup(self) -> None:
os.environ['DESTINATION__FILESYSTEM__BUCKET_URL'] = self.bucket_url or ""
os.environ['DESTINATION__STAGE_NAME'] = self.stage_name or ""
os.environ['DESTINATION__STAGING_IAM_ROLE'] = self.staging_iam_role or ""
os.environ['DESTINATION__ATHENA__ICEBERG_BUCKET_URL'] = self.iceberg_bucket_url or ""

"""For the filesystem destinations we disable compression to make analyzing the result easier"""
if self.destination == "filesystem":
Expand Down Expand Up @@ -110,7 +108,6 @@ def destinations_configs(
destination_configs += [DestinationTestConfiguration(destination=destination) for destination in SQL_DESTINATIONS if destination != "athena"]
# athena needs filesystem staging, which will be automatically set, we have to supply a bucket url though
destination_configs += [DestinationTestConfiguration(destination="athena", supports_merge=False, bucket_url=AWS_BUCKET)]
destination_configs += [DestinationTestConfiguration(destination="athena", staging="filesystem", file_format="parquet", bucket_url=AWS_BUCKET, iceberg_bucket_url=AWS_BUCKET + "/iceberg", supports_merge=True, extra_info="iceberg")]

if default_vector_configs:
# for now only weaviate
Expand Down Expand Up @@ -154,9 +151,6 @@ def destinations_configs(
if exclude:
destination_configs = [conf for conf in destination_configs if conf.destination not in exclude]

# filter out destination configs as obtained from the env
destination_configs = [conf for conf in destination_configs if conf.name not in EXCLUDED_DESTINATION_CONFIGURATIONS]


return destination_configs

Expand Down
2 changes: 0 additions & 2 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
# filter out active destinations for current tests
ACTIVE_DESTINATIONS = set(dlt.config.get("ACTIVE_DESTINATIONS", list) or IMPLEMENTED_DESTINATIONS)

# exclude destination configs (for now used for athena and athena iceberg separation)
EXCLUDED_DESTINATION_CONFIGURATIONS = set(dlt.config.get("EXCLUDED_DESTINATION_CONFIGURATIONS", list) or set())

ACTIVE_SQL_DESTINATIONS = SQL_DESTINATIONS.intersection(ACTIVE_DESTINATIONS)
ACTIVE_NON_SQL_DESTINATIONS = NON_SQL_DESTINATIONS.intersection(ACTIVE_DESTINATIONS)
Expand Down

0 comments on commit 92613ec

Please sign in to comment.