-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
athena iceberg #659
athena iceberg #659
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
40a235a
to
e199bd1
Compare
bd5527b
to
29a6d06
Compare
dlt/destinations/sql_jobs.py
Outdated
|
||
if insert_sql.strip()[-1] != ";": | ||
insert_sql += ";" | ||
sql.append(insert_sql) | ||
# -- DELETE FROM {staging_table_name} WHERE 1=1; | ||
|
||
# clean up | ||
if insert_temp_table_name: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iceberg does not support temp tables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should happen only in athena. if we need a cleanup method that gets implemented in a subclass then let's do that.
dlt/destinations/sql_jobs.py
Outdated
@@ -93,17 +100,17 @@ def gen_key_table_clauses(cls, root_table_name: str, staging_root_table_name: st | |||
|
|||
A list of clauses may be returned for engines that do not support OR in subqueries. Like BigQuery | |||
""" | |||
return [f"FROM {root_table_name} as d WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} as s WHERE {' OR '.join([c.format(d='d',s='s') for c in key_clauses])})"] | |||
return [f"FROM {root_table_name} WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} as s WHERE {' OR '.join([c.format(d=root_table_name,s='s') for c in key_clauses])})"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
icerberg does not support table aliases on the delete statement. I tried a few suggestions I found on stackoverflow (for other sql dialects), but none worked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK this will conflict with other destinations that do not work with non aliased tables. this is getting into spaghetti code mode. btw. I'm totally OK with what you did here - if other destinations still work.
EDIT: seems it was bigquery. see the CI
google.api_core.exceptions.BadRequest: 400 Query error: Unrecognized name: `chat-analytics-rasa-ci` at [1:64]
it can't work with non aliased tables in DELETE :/ I remember that because I spent a day going from the code you did above into aliased one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a lot of good stuff (ie. how we create followup jobs is generalized, some method renames connected to that 👍 )
regarding the merge: if we are getting into something really complicated let's drop it from this ticket (or altogether). I had no idea that there are no temp tables on Iceberg. we may need a fully specialized merge job for it
some other things look hacky to me ie. that we make Athena connector aware of working on staging dataset, cutting staging from the name etc.
maybe we should do something different. what about this:
- iceberg tables must be marked as such in table schema (ie. table_format hint) so all the components in dlt know and can react to that. btw. typically only 1-2 tables should be iceberg, not all (ie those that have GDPR data)
- having that we change how
filesystem
works. if it seesiceberg
table, it always does replace on such table so we keep only the most recent load id (filesystem
knows if it is used as staging so it can do that selectively) - the athena destination creates all iceberg tables in staging dataset and generates copy/merge jobs to move data to iceberg tables. it has only the new data in the stage (due to replace on iceberg tables). we can also use
load_id
to selectively copy data but then the staging dataset will grow all the time
this will require a refactor but I think we'll get a simpler cleaner code
maybe if we make
def get_load_table(schema: Schema, file_name: str) -> TTableSchema:
a method of the JobClient then the job client will produce a transient table schema with write dispotion and table format properly set and possibly other settings. it may also get additional flag if the table is destined to staging dataset or not
also if we replace
job_client.get_stage_dispositions():
with a more selective method - if it gets table schema it decides that it goes to staging dataset or not. then we may just move this logic out of load.py altogehter
dlt/destinations/athena/athena.py
Outdated
def to_db_integer_type(self, precision: Optional[int]) -> str: | ||
if precision is None: | ||
return "bigint" | ||
# iceberg does not support smallint and tinyint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: TIMESTAMP is precision 6 on iceberg, 3 on parquet
dlt/destinations/sql_jobs.py
Outdated
@@ -93,17 +100,17 @@ def gen_key_table_clauses(cls, root_table_name: str, staging_root_table_name: st | |||
|
|||
A list of clauses may be returned for engines that do not support OR in subqueries. Like BigQuery | |||
""" | |||
return [f"FROM {root_table_name} as d WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} as s WHERE {' OR '.join([c.format(d='d',s='s') for c in key_clauses])})"] | |||
return [f"FROM {root_table_name} WHERE EXISTS (SELECT 1 FROM {staging_root_table_name} as s WHERE {' OR '.join([c.format(d=root_table_name,s='s') for c in key_clauses])})"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK this will conflict with other destinations that do not work with non aliased tables. this is getting into spaghetti code mode. btw. I'm totally OK with what you did here - if other destinations still work.
EDIT: seems it was bigquery. see the CI
google.api_core.exceptions.BadRequest: 400 Query error: Unrecognized name: `chat-analytics-rasa-ci` at [1:64]
it can't work with non aliased tables in DELETE :/ I remember that because I spent a day going from the code you did above into aliased one
dlt/destinations/sql_jobs.py
Outdated
@@ -136,7 +143,7 @@ def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str: | |||
Returns: | |||
sql statement that inserts data from selects into temp table | |||
""" | |||
return f"CREATE TEMP TABLE {temp_table_name} AS {select_sql};" | |||
return f"CREATE TABLE {temp_table_name} AS {select_sql};" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should happen only in Athena? we should use temp tables everywhere it is possible. If it is really very complicated we can give up on merge in this ticket
dlt/destinations/sql_jobs.py
Outdated
|
||
if insert_sql.strip()[-1] != ";": | ||
insert_sql += ";" | ||
sql.append(insert_sql) | ||
# -- DELETE FROM {staging_table_name} WHERE 1=1; | ||
|
||
# clean up | ||
if insert_temp_table_name: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should happen only in athena. if we need a cleanup method that gets implemented in a subclass then let's do that.
dlt/destinations/athena/athena.py
Outdated
sql: List[str] = [] | ||
|
||
# for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries | ||
is_iceberg = self.schema.tables[table_name].get("write_disposition", None) == "skip" | ||
# or if we are in iceberg mode, we create iceberg tables for all tables | ||
is_iceberg = (self.schema.tables[table_name].get("write_disposition", None) == "skip") or (self._is_iceberg_table(self.schema.tables[table_name]) and not self.in_staging_mode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this "in staging mode" is still here unfortunately.. i don't know how to do it any other way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you move the get_load_table
to JobClient you can modify table format to keep iceberg (destination dataset) or not (staging dataset). OFC you need to pass is_staging flag to the method. I do not like it but probably it is a good compromise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed: use get_load_table() to
- adjust the precision
- set the table format
and pass it to the code below
|
||
def table_needs_staging(self, table: TTableSchema) -> bool: | ||
# not so nice, how to do it better, collect this info from the main destination as before? | ||
if table["table_format"] == "iceberg": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is hardcoded in here now, i think this should be configured in the main destination as I did before...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather say that this table needs truncation! We do not need to keep it in staging dataset, we just need to truncate it.
To me it makes sense... if you have iceberg for sure you need to copy/move data and keeping full history does not makes sense...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like how load.py works now. And I think we can way simplify the implementation by moving get_load_table into the JobClient.
dlt/destinations/athena/athena.py
Outdated
@@ -69,13 +69,18 @@ class AthenaTypeMapper(TypeMapper): | |||
"int": "bigint", | |||
} | |||
|
|||
def __init__(self, capabilities: DestinationCapabilitiesContext, iceberg_mode: bool): | |||
super().__init__(capabilities) | |||
self.iceberg_mode = iceberg_mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think we need iceberg_mode
you just set it up per table
dlt/common/schema/utils.py
Outdated
@@ -525,6 +536,18 @@ def get_top_level_table(tables: TSchemaTables, table_name: str) -> TTableSchema: | |||
return get_top_level_table(tables, parent) | |||
return table | |||
|
|||
def get_load_table(tables: TSchemaTables, table_name: str) -> TTableSchema: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be IMO part of JobClient
dlt/destinations/athena/athena.py
Outdated
if precision <= 8: | ||
return "tinyint" | ||
return "int" if self.iceberg_mode else "tinyint" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's why JobClient should create/modify table schema. so you can modify precision there and do not hack the type mapper...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be the cleanest to have a subclass for iceberg and then set that before the table sql is generated? I don't feel like changing the type mapper is hacking at all, that is what it is there for, changing the mapping of the types depending on database / table format you are storing into.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we could extend the type mapper to have the info which table_format is currently being processed. that might be nice?
dlt/destinations/athena/athena.py
Outdated
sql: List[str] = [] | ||
|
||
# for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries | ||
is_iceberg = self.schema.tables[table_name].get("write_disposition", None) == "skip" | ||
# or if we are in iceberg mode, we create iceberg tables for all tables | ||
is_iceberg = (self.schema.tables[table_name].get("write_disposition", None) == "skip") or (self._is_iceberg_table(self.schema.tables[table_name]) and not self.in_staging_mode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you move the get_load_table
to JobClient you can modify table format to keep iceberg (destination dataset) or not (staging dataset). OFC you need to pass is_staging flag to the method. I do not like it but probably it is a good compromise
|
||
def table_needs_staging(self, table: TTableSchema) -> bool: | ||
# not so nice, how to do it better, collect this info from the main destination as before? | ||
if table["table_format"] == "iceberg": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather say that this table needs truncation! We do not need to keep it in staging dataset, we just need to truncate it.
To me it makes sense... if you have iceberg for sure you need to copy/move data and keeping full history does not makes sense...
dlt/destinations/job_client_impl.py
Outdated
finally: | ||
self.in_staging_mode = False | ||
|
||
def table_needs_staging(self, table: TTableSchema) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
dlt/common/schema/utils.py
Outdated
def get_load_table(tables: TSchemaTables, table_name: str) -> TTableSchema: | ||
try: | ||
# make a copy of the schema so modifications do not affect the original document | ||
table = copy(tables[table_name]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we'll need a deepcopy because we will modify columns
3d0c4ed
to
122d035
Compare
# Conflicts: # dlt/extract/decorators.py
dlt/destinations/athena/athena.py
Outdated
@@ -293,11 +303,12 @@ def __init__(self, schema: Schema, config: AthenaClientConfiguration) -> None: | |||
super().__init__(schema, config, sql_client) | |||
self.sql_client: AthenaSQLClient = sql_client # type: ignore | |||
self.config: AthenaClientConfiguration = config | |||
self.type_mapper = AthenaTypeMapper(self.capabilities) | |||
self.type_mapper = AthenaTypeMapper(self.capabilities, True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some tables are iceberg, some not. you can't set always True here, see below
dlt/destinations/athena/athena.py
Outdated
sql: List[str] = [] | ||
|
||
# for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries | ||
is_iceberg = self.schema.tables[table_name].get("write_disposition", None) == "skip" | ||
# or if we are in iceberg mode, we create iceberg tables for all tables | ||
is_iceberg = (self.schema.tables[table_name].get("write_disposition", None) == "skip") or (self._is_iceberg_table(self.schema.tables[table_name]) and not self.in_staging_mode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed: use get_load_table() to
- adjust the precision
- set the table format
and pass it to the code below
columns = ", ".join([self._get_column_def_sql(c) for c in new_columns]) | ||
|
||
# this will fail if the table prefix is not properly defined | ||
table_prefix = self.table_prefix_layout.format(table_name=table_name) | ||
location = f"{bucket}/{dataset}/{table_prefix}" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one thing (maybe helpful) when we create parquet file we are setting precision of various fields ie datetime. make sure we do not have problems here (probably you should take into account both capabilities and table format but I think our current implementation is good - I hope it works)
dlt/destinations/athena/athena.py
Outdated
|
||
def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema: | ||
table = super().get_load_table(table_name, staging) | ||
# if staging and table.get("table_format", None) == "iceberg": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heh I think you need to uncomment that and also modify the precision
def with_staging_dataset(self) -> Iterator["FilesystemClient"]: | ||
current_dataset_path = self.dataset_path | ||
try: | ||
self.dataset_path = posixpath.join(self.fs_path, self.config.normalize_dataset_name(self.schema)) + "_staging" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.config.normalize_dataset_name(SqlClientBase.make_staging_dataset_name(self.schema))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you sure you want to use a method from the sqlclientbase in the filesystem destination? this does not seem quite right..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we need to move it to client base? or to WithStagingDataset
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sh-rp we need to fix this one. at least do not add _staging
to normalized string
701f18a
to
ad8dc9b
Compare
|
||
@abstractmethod | ||
def with_staging_dataset(self)-> ContextManager["JobClientBase"]: | ||
"""Executes job client methods on staging dataset""" | ||
return self # type: ignore | ||
|
||
class SupportsStagingDestination(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this the main destination can control the behavior of the staging destination, this was kind of undefined before, so we were lucky that athena worked properly, but now it is well defined and we will also need this on other datalakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good! you could use that in tests to generate test configs automatically, no?
# Override in subclass if db supports other integer types (e.g. smallint, integer, tinyint, etc.) | ||
return self.sct_to_unbound_dbt["bigint"] | ||
|
||
def to_db_type(self, column: TColumnSchema) -> str: | ||
def to_db_type(self, column: TColumnSchema, table_format: TTableFormat = None) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to me manipulating the load table to achieve the correct mapping feels hacky. I would say that the type mapper should be in charge of mapping schema types to db types and in the case of athena (and maybe others in the future) it has to take the chosen table format into account.
Notes:
|
f13c14f
to
ba0c593
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very good now!
- see one comment regarding `"_staging" (we must normalize the whole name)
- dbt problems: it is clear from the error that we are copying timestamps from iceberg table (6) to regular table (precision 3). and this fails
10:05:28 NOT_SUPPORTED: Incorrect timestamp precision for timestamp(6); the configured precision is MILLISECONDS; column name: end_time. You may need to manually clean the data at location 's3://dlt-athena-output/tables/850461bf-aa78-4d8c-8be7-36c1630da099' before retrying. Athena will not delete data in your account.
my take: disable dbt chess tests on athena + iceberg and leave just jaffle shop - just mention the error in docs maybe
or you mess around with table_type='iceberg',
in the materialization in chess
def with_staging_dataset(self) -> Iterator["FilesystemClient"]: | ||
current_dataset_path = self.dataset_path | ||
try: | ||
self.dataset_path = posixpath.join(self.fs_path, self.config.normalize_dataset_name(self.schema)) + "_staging" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sh-rp we need to fix this one. at least do not add _staging
to normalized string
|
||
@abstractmethod | ||
def with_staging_dataset(self)-> ContextManager["JobClientBase"]: | ||
"""Executes job client methods on staging dataset""" | ||
return self # type: ignore | ||
|
||
class SupportsStagingDestination(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good! you could use that in tests to generate test configs automatically, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Description
Implements iceberg tables on Athena.