-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
athena iceberg #659
athena iceberg #659
Changes from 19 commits
0e25102
26f9e41
e199bd1
b768a63
119ad6e
29a6d06
439c72f
b964388
2b5f004
7e82de7
202466f
bd9744c
f627a0f
9a94d4a
8682350
1560768
3f4fb1e
92613ec
0924bc5
122d035
7750318
0deecda
702fd4b
d70985d
95adc93
06dbaeb
baa5e44
ad8dc9b
00e474c
acfcd16
0707629
4692e37
10131e4
243246e
ba0c593
1e8605c
78fc17a
918c4d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,10 +15,10 @@ | |
from dlt.common.validation import TCustomValidator, validate_dict, validate_dict_ignoring_xkeys | ||
from dlt.common.schema import detections | ||
from dlt.common.schema.typing import (COLUMN_HINTS, SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TPartialTableSchema, TSchemaTables, TSchemaUpdate, | ||
TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, | ||
TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, TTableFormat, | ||
TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition) | ||
from dlt.common.schema.exceptions import (CannotCoerceColumnException, ParentTableNotFoundException, SchemaEngineNoUpgradePathException, SchemaException, | ||
TablePropertiesConflictException, InvalidSchemaName) | ||
TablePropertiesConflictException, InvalidSchemaName, UnknownTableException) | ||
|
||
from dlt.common.normalizers.utils import import_normalizers | ||
from dlt.common.schema.typing import TAnySchemaColumns | ||
|
@@ -493,18 +493,29 @@ def merge_schema_updates(schema_updates: Sequence[TSchemaUpdate]) -> TSchemaTabl | |
return aggregated_update | ||
|
||
|
||
def get_write_disposition(tables: TSchemaTables, table_name: str) -> TWriteDisposition: | ||
"""Returns write disposition of a table if present. If not, looks up into parent table""" | ||
def get_inherited_table_hint(tables: TSchemaTables, table_name: str, table_hint_name: str, allow_none: bool = False) -> Any: | ||
table = tables[table_name] | ||
w_d = table.get("write_disposition") | ||
if w_d: | ||
return w_d | ||
hint = table.get(table_hint_name) | ||
if hint: | ||
return hint | ||
|
||
parent = table.get("parent") | ||
if parent: | ||
return get_write_disposition(tables, parent) | ||
return get_inherited_table_hint(tables, parent, table_hint_name, allow_none) | ||
|
||
if allow_none: | ||
return None | ||
|
||
raise ValueError(f"No table hint '{table_hint_name} found in the chain of tables for '{table_name}'.") | ||
|
||
|
||
def get_write_disposition(tables: TSchemaTables, table_name: str) -> TWriteDisposition: | ||
"""Returns table hint of a table if present. If not, looks up into parent table""" | ||
return get_inherited_table_hint(tables, table_name, "write_disposition", allow_none=False) | ||
|
||
|
||
raise ValueError(f"No write disposition found in the chain of tables for '{table_name}'.") | ||
def get_table_format(tables: TSchemaTables, table_name: str) -> TTableFormat: | ||
return get_inherited_table_hint(tables, table_name, "table_format", allow_none=True) | ||
|
||
|
||
def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool: | ||
|
@@ -525,6 +536,18 @@ def get_top_level_table(tables: TSchemaTables, table_name: str) -> TTableSchema: | |
return get_top_level_table(tables, parent) | ||
return table | ||
|
||
def get_load_table(tables: TSchemaTables, table_name: str) -> TTableSchema: | ||
try: | ||
# make a copy of the schema so modifications do not affect the original document | ||
table = copy(tables[table_name]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we'll need a deepcopy because we will modify columns |
||
# add write disposition if not specified - in child tables | ||
if "write_disposition" not in table: | ||
table["write_disposition"] = get_write_disposition(tables, table_name) | ||
if "table_format" not in table: | ||
table["table_format"] = get_table_format(tables, table_name) | ||
return table | ||
except KeyError: | ||
raise UnknownTableException(table_name) | ||
|
||
def get_child_tables(tables: TSchemaTables, table_name: str) -> List[TTableSchema]: | ||
"""Get child tables for table name and return a list of tables ordered by ancestry so the child tables are always after their parents""" | ||
|
@@ -637,7 +660,8 @@ def new_table( | |
write_disposition: TWriteDisposition = None, | ||
columns: Sequence[TColumnSchema] = None, | ||
validate_schema: bool = False, | ||
resource: str = None | ||
resource: str = None, | ||
table_format: TTableFormat = None | ||
) -> TTableSchema: | ||
|
||
table: TTableSchema = { | ||
|
@@ -652,6 +676,8 @@ def new_table( | |
# set write disposition only for root tables | ||
table["write_disposition"] = write_disposition or DEFAULT_WRITE_DISPOSITION | ||
table["resource"] = resource or table_name | ||
if table_format: | ||
table["table_format"] = table_format | ||
if validate_schema: | ||
validate_dict_ignoring_xkeys( | ||
spec=TColumnSchema, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,21 +16,21 @@ | |
from dlt.common.utils import without_none | ||
from dlt.common.data_types import TDataType | ||
from dlt.common.schema import TColumnSchema, Schema | ||
from dlt.common.schema.typing import TTableSchema, TColumnType | ||
from dlt.common.schema.utils import table_schema_has_type | ||
from dlt.common.schema.typing import TTableSchema, TColumnType, TWriteDisposition | ||
from dlt.common.schema.utils import table_schema_has_type, get_table_format | ||
from dlt.common.destination import DestinationCapabilitiesContext | ||
from dlt.common.destination.reference import LoadJob | ||
from dlt.common.destination.reference import TLoadJobState | ||
from dlt.common.destination.reference import LoadJob, FollowupJob | ||
from dlt.common.destination.reference import TLoadJobState, NewLoadJob | ||
from dlt.common.storages import FileStorage | ||
from dlt.common.data_writers.escape import escape_bigquery_identifier | ||
|
||
from dlt.destinations.sql_jobs import SqlStagingCopyJob | ||
|
||
from dlt.destinations.typing import DBApi, DBTransaction | ||
from dlt.destinations.exceptions import DatabaseTerminalException, DatabaseTransientException, DatabaseUndefinedRelation, LoadJobTerminalException | ||
from dlt.destinations.athena import capabilities | ||
from dlt.destinations.sql_client import SqlClientBase, DBApiCursorImpl, raise_database_error, raise_open_connection_error | ||
from dlt.destinations.typing import DBApiCursor | ||
from dlt.destinations.job_client_impl import SqlJobClientBase, StorageSchemaInfo | ||
from dlt.destinations.job_client_impl import SqlJobClientWithStaging | ||
from dlt.destinations.athena.configuration import AthenaClientConfiguration | ||
from dlt.destinations.type_mapping import TypeMapper | ||
from dlt.destinations import path_utils | ||
|
@@ -69,13 +69,18 @@ class AthenaTypeMapper(TypeMapper): | |
"int": "bigint", | ||
} | ||
|
||
def __init__(self, capabilities: DestinationCapabilitiesContext, iceberg_mode: bool): | ||
super().__init__(capabilities) | ||
self.iceberg_mode = iceberg_mode | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not think we need |
||
|
||
def to_db_integer_type(self, precision: Optional[int]) -> str: | ||
if precision is None: | ||
return "bigint" | ||
# iceberg does not support smallint and tinyint | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI: TIMESTAMP is precision 6 on iceberg, 3 on parquet |
||
if precision <= 8: | ||
return "tinyint" | ||
return "int" if self.iceberg_mode else "tinyint" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's why JobClient should create/modify table schema. so you can modify precision there and do not hack the type mapper... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't it be the cleanest to have a subclass for iceberg and then set that before the table sql is generated? I don't feel like changing the type mapper is hacking at all, that is what it is there for, changing the mapping of the types depending on database / table format you are storing into. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually we could extend the type mapper to have the info which table_format is currently being processed. that might be nice? |
||
elif precision <= 16: | ||
return "smallint" | ||
return "int" if self.iceberg_mode else "smallint" | ||
elif precision <= 32: | ||
return "int" | ||
return "bigint" | ||
|
@@ -135,6 +140,11 @@ def exception(self) -> str: | |
# this part of code should be never reached | ||
raise NotImplementedError() | ||
|
||
class DoNothingFollowupJob(DoNothingJob, FollowupJob): | ||
"""The second most lazy class of dlt""" | ||
pass | ||
|
||
|
||
class AthenaSQLClient(SqlClientBase[Connection]): | ||
|
||
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() | ||
|
@@ -276,7 +286,7 @@ def has_dataset(self) -> bool: | |
return len(rows) > 0 | ||
|
||
|
||
class AthenaClient(SqlJobClientBase): | ||
class AthenaClient(SqlJobClientWithStaging): | ||
|
||
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() | ||
|
||
|
@@ -293,11 +303,12 @@ def __init__(self, schema: Schema, config: AthenaClientConfiguration) -> None: | |
super().__init__(schema, config, sql_client) | ||
self.sql_client: AthenaSQLClient = sql_client # type: ignore | ||
self.config: AthenaClientConfiguration = config | ||
self.type_mapper = AthenaTypeMapper(self.capabilities) | ||
self.type_mapper = AthenaTypeMapper(self.capabilities, True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. some tables are iceberg, some not. you can't set always True here, see below |
||
|
||
def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: | ||
# never truncate tables in athena | ||
super().initialize_storage([]) | ||
# only truncate tables in iceberg mode | ||
truncate_tables = [] | ||
super().initialize_storage(truncate_tables) | ||
|
||
def _from_db_type(self, hive_t: str, precision: Optional[int], scale: Optional[int]) -> TColumnType: | ||
return self.type_mapper.from_db_type(hive_t, precision, scale) | ||
|
@@ -309,15 +320,18 @@ def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSc | |
|
||
bucket = self.config.staging_config.bucket_url | ||
dataset = self.sql_client.dataset_name | ||
|
||
sql: List[str] = [] | ||
|
||
# for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries | ||
is_iceberg = self.schema.tables[table_name].get("write_disposition", None) == "skip" | ||
# or if we are in iceberg mode, we create iceberg tables for all tables | ||
is_iceberg = (self.schema.tables[table_name].get("write_disposition", None) == "skip") or (self._is_iceberg_table(self.schema.tables[table_name]) and not self.in_staging_mode) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this "in staging mode" is still here unfortunately.. i don't know how to do it any other way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you move the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as discussed: use get_load_table() to
|
||
columns = ", ".join([self._get_column_def_sql(c) for c in new_columns]) | ||
|
||
# this will fail if the table prefix is not properly defined | ||
table_prefix = self.table_prefix_layout.format(table_name=table_name) | ||
location = f"{bucket}/{dataset}/{table_prefix}" | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. one thing (maybe helpful) when we create parquet file we are setting precision of various fields ie datetime. make sure we do not have problems here (probably you should take into account both capabilities and table format but I think our current implementation is good - I hope it works) |
||
# use qualified table names | ||
qualified_table_name = self.sql_client.make_qualified_ddl_table_name(table_name) | ||
if is_iceberg and not generate_alter: | ||
|
@@ -345,9 +359,29 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> | |
) | ||
job = super().start_file_load(table, file_path, load_id) | ||
if not job: | ||
job = DoNothingJob(file_path) | ||
job = DoNothingFollowupJob(file_path) if self._is_iceberg_table(table) else DoNothingJob(file_path) | ||
return job | ||
|
||
def _create_append_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: | ||
if self._is_iceberg_table(table_chain[0]): | ||
return [SqlStagingCopyJob.from_table_chain(table_chain, self.sql_client, {"replace": False})] | ||
return super()._create_append_followup_jobs(table_chain) | ||
|
||
def _create_replace_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]: | ||
if self._is_iceberg_table(table_chain[0]): | ||
return [SqlStagingCopyJob.from_table_chain(table_chain, self.sql_client, {"replace": True})] | ||
return super()._create_replace_followup_jobs(table_chain) | ||
|
||
def _is_iceberg_table(self, table: TTableSchema) -> bool: | ||
table_format = get_table_format(self.schema.tables, table["name"]) | ||
return table_format == "iceberg" | ||
|
||
def table_needs_staging(self, table: TTableSchema) -> bool: | ||
# all iceberg tables need staging | ||
if self._is_iceberg_table(table): | ||
return True | ||
return super().table_needs_staging(table) | ||
|
||
@staticmethod | ||
def is_dbapi_exception(ex: Exception) -> bool: | ||
return isinstance(ex, Error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be IMO part of JobClient