Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

athena iceberg #659

Merged
merged 38 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
0e25102
first iceberg prototype
sh-rp Sep 28, 2023
26f9e41
fix linting and clearing of staging tables
sh-rp Sep 28, 2023
e199bd1
disable tests
sh-rp Sep 28, 2023
b768a63
enable iceberg tests for athena
sh-rp Sep 29, 2023
119ad6e
Merge branch 'devel' into d#/athena-iceberg
sh-rp Oct 4, 2023
29a6d06
fix iceberg detection
sh-rp Oct 4, 2023
439c72f
move athena tests to default sql configs
sh-rp Oct 4, 2023
b964388
finally fix regular athena tests...
sh-rp Oct 4, 2023
2b5f004
some more work
sh-rp Oct 4, 2023
7e82de7
fix replace disposition
sh-rp Oct 5, 2023
202466f
fix datatype support
sh-rp Oct 5, 2023
bd9744c
fix append for merge in iceberg
sh-rp Oct 5, 2023
f627a0f
fix merge jobs for iceberg
sh-rp Oct 5, 2023
9a94d4a
clean up followup jobs code
sh-rp Oct 5, 2023
8682350
set iceberg tests to merge supported
sh-rp Oct 5, 2023
1560768
fix sql merge syntax for iceberg
sh-rp Oct 6, 2023
3f4fb1e
separate regular athena and iceberg tests
sh-rp Oct 6, 2023
92613ec
remove some iceberg specific code
sh-rp Oct 9, 2023
0924bc5
new iceberg approach
sh-rp Oct 9, 2023
122d035
PR changes
sh-rp Oct 11, 2023
7750318
small changes
sh-rp Oct 11, 2023
0deecda
small changes
sh-rp Oct 11, 2023
702fd4b
fix two tests
sh-rp Oct 11, 2023
d70985d
add missing athena fixes
sh-rp Oct 11, 2023
95adc93
Merge branch 'devel' into d#/athena-iceberg
sh-rp Oct 11, 2023
06dbaeb
small changes
sh-rp Oct 12, 2023
baa5e44
fixes
sh-rp Oct 12, 2023
ad8dc9b
update
sh-rp Oct 12, 2023
00e474c
fix some tests
sh-rp Oct 13, 2023
acfcd16
small changes
sh-rp Oct 13, 2023
0707629
small changes
sh-rp Oct 13, 2023
4692e37
make type mapper table format sensitive
sh-rp Oct 14, 2023
10131e4
disable dbt tests for athena iceberg
sh-rp Oct 14, 2023
243246e
update doc
sh-rp Oct 14, 2023
ba0c593
small fix
sh-rp Oct 14, 2023
1e8605c
pr changes
sh-rp Oct 16, 2023
78fc17a
updates athena dbt docs
rudolfix Oct 16, 2023
918c4d5
adds docsting on table format to decorators
rudolfix Oct 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ def get_truncate_destination_table_dispositions(self) -> List[TWriteDisposition]
# in the base job, all replace strategies are treated the same, see filesystem for example
return ["replace"]

def get_truncate_staging_destination_table_dispositions(self) -> List[TWriteDisposition]:
# some clients need to additionally be able to get the staging destination to truncate tables
return []

def create_table_chain_completed_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
"""Creates a list of followup jobs that should be executed after a table chain is completed"""
return []
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/athena/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def capabilities() -> DestinationCapabilitiesContext:
caps.alter_add_multi_column = True
caps.schema_supports_numeric_precision = False
caps.timestamp_precision = 3
caps.supports_truncate_command = False
return caps


Expand Down
77 changes: 63 additions & 14 deletions dlt/destinations/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@
from dlt.common.utils import without_none
from dlt.common.data_types import TDataType
from dlt.common.schema import TColumnSchema, Schema
from dlt.common.schema.typing import TTableSchema, TColumnType
from dlt.common.schema.typing import TTableSchema, TColumnType, TWriteDisposition
from dlt.common.schema.utils import table_schema_has_type
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import LoadJob
from dlt.common.destination.reference import TLoadJobState
from dlt.common.destination.reference import LoadJob, FollowupJob
from dlt.common.destination.reference import TLoadJobState, NewLoadJob
from dlt.common.storages import FileStorage
from dlt.common.data_writers.escape import escape_bigquery_identifier

from dlt.destinations.sql_jobs import SqlStagingCopyJob

from dlt.destinations.typing import DBApi, DBTransaction
from dlt.destinations.exceptions import DatabaseTerminalException, DatabaseTransientException, DatabaseUndefinedRelation, LoadJobTerminalException
from dlt.destinations.athena import capabilities
from dlt.destinations.sql_client import SqlClientBase, DBApiCursorImpl, raise_database_error, raise_open_connection_error
from dlt.destinations.typing import DBApiCursor
from dlt.destinations.job_client_impl import SqlJobClientBase, StorageSchemaInfo
from dlt.destinations.job_client_impl import SqlJobClientWithStaging
from dlt.destinations.athena.configuration import AthenaClientConfiguration
from dlt.destinations.type_mapping import TypeMapper
from dlt.destinations import path_utils
Expand Down Expand Up @@ -69,13 +69,18 @@ class AthenaTypeMapper(TypeMapper):
"int": "bigint",
}

def __init__(self, capabilities: DestinationCapabilitiesContext, iceberg_mode: bool):
super().__init__(capabilities)
self.iceberg_mode = iceberg_mode
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we need iceberg_mode you just set it up per table


def to_db_integer_type(self, precision: Optional[int]) -> str:
if precision is None:
return "bigint"
# iceberg does not support smallint and tinyint
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: TIMESTAMP is precision 6 on iceberg, 3 on parquet

if precision <= 8:
return "tinyint"
return "int" if self.iceberg_mode else "tinyint"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's why JobClient should create/modify table schema. so you can modify precision there and do not hack the type mapper...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be the cleanest to have a subclass for iceberg and then set that before the table sql is generated? I don't feel like changing the type mapper is hacking at all, that is what it is there for, changing the mapping of the types depending on database / table format you are storing into.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we could extend the type mapper to have the info which table_format is currently being processed. that might be nice?

elif precision <= 16:
return "smallint"
return "int" if self.iceberg_mode else "smallint"
elif precision <= 32:
return "int"
return "bigint"
Expand Down Expand Up @@ -135,6 +140,11 @@ def exception(self) -> str:
# this part of code should be never reached
raise NotImplementedError()

class DoNothingFollowupJob(DoNothingJob, FollowupJob):
"""The second most lazy class of dlt"""
pass


class AthenaSQLClient(SqlClientBase[Connection]):

capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()
Expand Down Expand Up @@ -276,7 +286,7 @@ def has_dataset(self) -> bool:
return len(rows) > 0


class AthenaClient(SqlJobClientBase):
class AthenaClient(SqlJobClientWithStaging):

capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()

Expand All @@ -293,11 +303,14 @@ def __init__(self, schema: Schema, config: AthenaClientConfiguration) -> None:
super().__init__(schema, config, sql_client)
self.sql_client: AthenaSQLClient = sql_client # type: ignore
self.config: AthenaClientConfiguration = config
self.type_mapper = AthenaTypeMapper(self.capabilities)
self.iceberg_mode = not (not self.config.iceberg_bucket_url)
self.type_mapper = AthenaTypeMapper(self.capabilities, self.iceberg_mode)

def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
# never truncate tables in athena
super().initialize_storage([])
# only truncate tables in iceberg mode
if not self.iceberg_mode or self.in_staging_mode:
truncate_tables = []
super().initialize_storage(truncate_tables)

def _from_db_type(self, hive_t: str, precision: Optional[int], scale: Optional[int]) -> TColumnType:
return self.type_mapper.from_db_type(hive_t, precision, scale)
Expand All @@ -307,12 +320,19 @@ def _get_column_def_sql(self, c: TColumnSchema) -> str:

def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool) -> List[str]:

create_data_iceberg_tables = self.iceberg_mode and not self.in_staging_mode

bucket = self.config.staging_config.bucket_url
dataset = self.sql_client.dataset_name
if create_data_iceberg_tables:
bucket = self.config.iceberg_bucket_url

# TODO: we need to strip the staging layout from the table name, find a better way!
dataset = self.sql_client.dataset_name.replace("_staging", "")
sql: List[str] = []

# for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries
is_iceberg = self.schema.tables[table_name].get("write_disposition", None) == "skip"
# or if we are in iceberg mode, we create iceberg tables for all tables
is_iceberg = create_data_iceberg_tables or (self.schema.tables[table_name].get("write_disposition", None) == "skip")
columns = ", ".join([self._get_column_def_sql(c) for c in new_columns])

# this will fail if the table prefix is not properly defined
Expand Down Expand Up @@ -345,9 +365,38 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
)
job = super().start_file_load(table, file_path, load_id)
if not job:
job = DoNothingJob(file_path)
job = DoNothingFollowupJob(file_path) if self.iceberg_mode else DoNothingJob(file_path)
return job

def create_table_chain_completed_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
"""Creates a list of followup jobs for merge write disposition and staging replace strategies"""
jobs = super().create_table_chain_completed_followup_jobs(table_chain)

# add some additional jobs
write_disposition = table_chain[0]["write_disposition"]
if write_disposition == "append":
jobs.append(self._create_staging_copy_job(table_chain, False))
elif write_disposition == "replace" and self.config.replace_strategy == "truncate-and-insert":
jobs.append(self._create_staging_copy_job(table_chain, False))
return jobs

def _create_staging_copy_job(self, table_chain: Sequence[TTableSchema], replace: bool) -> NewLoadJob:
"""update destination tables from staging tables"""
if self.iceberg_mode:
return SqlStagingCopyJob.from_table_chain(table_chain, self.sql_client, {"replace": replace})
return super()._create_staging_copy_job(table_chain, replace=replace)

def get_stage_dispositions(self) -> List[TWriteDisposition]:
# in iceberg mode, we always use staging tables
if self.iceberg_mode:
return ["append", "replace", "merge"]
return super().get_stage_dispositions()

def get_truncate_staging_destination_table_dispositions(self) -> List[TWriteDisposition]:
if self.iceberg_mode:
return ["append", "replace", "merge"]
return []

@staticmethod
def is_dbapi_exception(ex: Exception) -> bool:
return isinstance(ex, Error)
2 changes: 2 additions & 0 deletions dlt/destinations/athena/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
class AthenaClientConfiguration(DestinationClientDwhWithStagingConfiguration):
destination_name: Final[str] = "athena" # type: ignore[misc]
query_result_bucket: str = None
iceberg_bucket_url: Optional[str] = None
credentials: AwsCredentials = None
athena_work_group: Optional[str] = None
aws_data_catalog: Optional[str] = "awsdatacatalog"
supports_truncate_command: bool = False

__config_gen_annotations__: ClassVar[List[str]] = ["athena_work_group"]

Expand Down
4 changes: 2 additions & 2 deletions dlt/destinations/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from dlt.destinations.bigquery import capabilities
from dlt.destinations.bigquery.configuration import BigQueryClientConfiguration
from dlt.destinations.bigquery.sql_client import BigQuerySqlClient, BQ_TERMINAL_REASONS
from dlt.destinations.sql_jobs import SqlMergeJob, SqlStagingCopyJob
from dlt.destinations.sql_jobs import SqlMergeJob, SqlStagingCopyJob, SqlJobParams
from dlt.destinations.job_impl import NewReferenceJob
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.type_mapping import TypeMapper
Expand Down Expand Up @@ -138,7 +138,7 @@ def gen_key_table_clauses(cls, root_table_name: str, staging_root_table_name: st
class BigqueryStagingCopyJob(SqlStagingCopyJob):

@classmethod
def generate_sql(cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any]) -> List[str]:
def generate_sql(cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any], params: Optional[SqlJobParams] = None) -> List[str]:
sql: List[str] = []
for table in table_chain:
with sql_client.with_staging_dataset(staging=True):
Expand Down
23 changes: 16 additions & 7 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,24 +148,26 @@ def get_truncate_destination_table_dispositions(self) -> List[TWriteDisposition]
def _create_merge_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob:
return SqlMergeJob.from_table_chain(table_chain, self.sql_client)

def _create_staging_copy_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob:
def _create_staging_copy_job(self, table_chain: Sequence[TTableSchema], replace: bool) -> NewLoadJob:
"""update destination tables from staging tables"""
return SqlStagingCopyJob.from_table_chain(table_chain, self.sql_client)
return SqlStagingCopyJob.from_table_chain(table_chain, self.sql_client, {"replace": True})

def _create_optimized_replace_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob:
"""optimized replace strategy, defaults to _create_staging_copy_job for the basic client
for some destinations there are much faster destination updates at the cost of
dropping tables possible"""
return self._create_staging_copy_job(table_chain)
return self._create_staging_copy_job(table_chain, True)

def create_table_chain_completed_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
"""Creates a list of followup jobs for merge write disposition and staging replace strategies"""
jobs = super().create_table_chain_completed_followup_jobs(table_chain)
write_disposition = table_chain[0]["write_disposition"]
if write_disposition == "merge":
if write_disposition == "append":
pass
elif write_disposition == "merge":
jobs.append(self._create_merge_job(table_chain))
elif write_disposition == "replace" and self.config.replace_strategy == "insert-from-staging":
jobs.append(self._create_staging_copy_job(table_chain))
jobs.append(self._create_staging_copy_job(table_chain, True))
elif write_disposition == "replace" and self.config.replace_strategy == "staging-optimized":
jobs.append(self._create_optimized_replace_job(table_chain))
return jobs
Expand Down Expand Up @@ -431,10 +433,17 @@ def _commit_schema_update(self, schema: Schema, schema_str: str) -> None:


class SqlJobClientWithStaging(SqlJobClientBase, WithStagingDataset):

in_staging_mode: bool = False
rudolfix marked this conversation as resolved.
Show resolved Hide resolved

@contextlib.contextmanager
def with_staging_dataset(self)-> Iterator["SqlJobClientBase"]:
with self.sql_client.with_staging_dataset(True):
yield self
try:
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
with self.sql_client.with_staging_dataset(True):
self.in_staging_mode = True
yield self
finally:
self.in_staging_mode = False

def get_stage_dispositions(self) -> List[TWriteDisposition]:
"""Returns a list of dispositions that require staging tables to be populated"""
Expand Down
4 changes: 2 additions & 2 deletions dlt/destinations/mssql/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dlt.common.schema.typing import TTableSchema, TColumnType
from dlt.common.utils import uniq_id

from dlt.destinations.sql_jobs import SqlStagingCopyJob, SqlMergeJob
from dlt.destinations.sql_jobs import SqlStagingCopyJob, SqlMergeJob, SqlJobParams

from dlt.destinations.insert_job_client import InsertValuesJobClient

Expand Down Expand Up @@ -83,7 +83,7 @@ def from_db_type(self, db_type: str, precision: Optional[int], scale: Optional[i
class MsSqlStagingCopyJob(SqlStagingCopyJob):

@classmethod
def generate_sql(cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any]) -> List[str]:
def generate_sql(cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any], params: Optional[SqlJobParams] = None) -> List[str]:
sql: List[str] = []
for table in table_chain:
with sql_client.with_staging_dataset(staging=True):
Expand Down
4 changes: 2 additions & 2 deletions dlt/destinations/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dlt.common.schema import TColumnSchema, TColumnHint, Schema
from dlt.common.schema.typing import TTableSchema, TColumnType

from dlt.destinations.sql_jobs import SqlStagingCopyJob
from dlt.destinations.sql_jobs import SqlStagingCopyJob, SqlJobParams

from dlt.destinations.insert_job_client import InsertValuesJobClient

Expand Down Expand Up @@ -79,7 +79,7 @@ def from_db_type(self, db_type: str, precision: Optional[int] = None, scale: Opt
class PostgresStagingCopyJob(SqlStagingCopyJob):

@classmethod
def generate_sql(cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any]) -> List[str]:
def generate_sql(cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any], params: Optional[SqlJobParams] = None) -> List[str]:
sql: List[str] = []
for table in table_chain:
with sql_client.with_staging_dataset(staging=True):
Expand Down
7 changes: 3 additions & 4 deletions dlt/destinations/snowflake/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dlt.destinations.snowflake import capabilities
from dlt.destinations.snowflake.configuration import SnowflakeClientConfiguration
from dlt.destinations.snowflake.sql_client import SnowflakeSqlClient
from dlt.destinations.sql_jobs import SqlStagingCopyJob
from dlt.destinations.sql_jobs import SqlStagingCopyJob, SqlJobParams
from dlt.destinations.snowflake.sql_client import SnowflakeSqlClient
from dlt.destinations.job_impl import NewReferenceJob
from dlt.destinations.sql_client import SqlClientBase
Expand Down Expand Up @@ -157,13 +157,12 @@ def exception(self) -> str:
class SnowflakeStagingCopyJob(SqlStagingCopyJob):

@classmethod
def generate_sql(cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any]) -> List[str]:
def generate_sql(cls, table_chain: Sequence[TTableSchema], sql_client: SqlClientBase[Any], params: Optional[SqlJobParams] = None) -> List[str]:
sql: List[str] = []
for table in table_chain:
with sql_client.with_staging_dataset(staging=True):
staging_table_name = sql_client.make_qualified_table_name(table["name"])
table_name = sql_client.make_qualified_table_name(table["name"])
# drop destination table
sql.append(f"DROP TABLE IF EXISTS {table_name};")
# recreate destination table with data cloned from staging table
sql.append(f"CREATE TABLE {table_name} CLONE {staging_table_name};")
Expand Down Expand Up @@ -206,7 +205,7 @@ def _make_add_column_sql(self, new_columns: Sequence[TColumnSchema]) -> List[str
return ["ADD COLUMN\n" + ",\n".join(self._get_column_def_sql(c) for c in new_columns)]

def _create_optimized_replace_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob:
return SnowflakeStagingCopyJob.from_table_chain(table_chain, self.sql_client)
return SnowflakeStagingCopyJob.from_table_chain(table_chain, self.sql_client, {"replace": True})

def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool, separate_alters: bool = False) -> List[str]:
sql = super()._get_table_update_sql(table_name, new_columns, generate_alter)
Expand Down
Loading
Loading