diff --git a/README.md b/README.md index b8692afd..bb80a84c 100644 --- a/README.md +++ b/README.md @@ -47,16 +47,19 @@ stored login info. You can configure the AWS profile name to use via `aws_profil A dbt profile can be configured to run against AWS Athena using the following configuration: -| Option | Description | Required? | Example | -|---------------- |-------------------------------------------------------------------------------- |----------- |-------------------- | -| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | -| region_name | AWS region of your Athena instance | Required | `eu-west-1` | -| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | -| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | -| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | -| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` | -| work_group| Identifier of Athena workgroup | Optional | `my-custom-workgroup` | -| num_retries| Number of times to retry a failing query | Optional | `3` | `5` +| Option | Description | Required? | Example | +|---------------- |--------------------------------------------------------------------------------|----------- |-----------------------| +| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | +| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` | +| s3_data_naming | How to generate table paths in `s3_data_dir` | Optional | `schema_table_unique` | +| region_name | AWS region of your Athena instance | Required | `eu-west-1` | +| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | +| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | +| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | +| aws_profile_name| Profile to use from your AWS shared credentials file. | Optional | `my-profile` | +| work_group| Identifier of Athena workgroup | Optional | `my-custom-workgroup` | +| num_retries| Number of times to retry a failing query | Optional | `3` | `5` + **Example profiles.yml entry:** ```yaml @@ -66,6 +69,8 @@ athena: dev: type: athena s3_staging_dir: s3://athena-query-results/dbt/ + s3_data_dir: s3://your_s3_bucket/dbt/ + s3_data_naming: schema_table region_name: eu-west-1 schema: dbt database: awsdatacatalog @@ -84,9 +89,7 @@ _Additional information_ #### Table Configuration * `external_location` (`default=none`) - * The location where Athena saves your table in Amazon S3 - * If `none` then it will default to `{s3_staging_dir}/tables` - * If you are using a static value, when your table/partition is recreated underlying data will be cleaned up and overwritten by new data + * If set, the full S3 path in which the table will be saved. * `partitioned_by` (`default=none`) * An array list of columns by which the table will be partitioned * Limited to creation of 100 partitions (_currently_) @@ -104,11 +107,23 @@ _Additional information_ * `table_properties`: table properties to add to the table, valid for Iceberg only * `strict_location` (`default=True`): when working with iceberg it's possible to rename tables, in order to do so, tables need to avoid to have same location. Setting up `strict_location` to *false* allow a table creation on an unique location -More information: [CREATE TABLE AS][create-table-as] +#### Table location +The location in which a table is saved is determined by: + +1. If `external_location` is defined, that value is used. +2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming` +3. If `s3_data_dir` is not defined data is stored under `s3_staging_dir/tables/` + +Here all the options available for `s3_data_naming`: +* `uuid`: `{s3_data_dir}/{uuid4()}/` +* `table_table`: `{s3_data_dir}/{table}/` +* `table_unique`: `{s3_data_dir}/{table}/{uuid4()}/` +* `schema_table`: `{s3_data_dir}/{schema}/{table}/` +* `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()}/` + +It's possible to set the `s3_data_naming` globally in the target profile, or overwrite the value in the table config, +or setting up the value for groups of model in dbt_project.yml -[run_started_at]: https://docs.getdbt.com/reference/dbt-jinja-functions/run_started_at -[invocation_id]: https://docs.getdbt.com/reference/dbt-jinja-functions/invocation_id -[create-table-as]: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html #### Supported functionality @@ -129,7 +144,6 @@ To get started just add this as your model: materialized='table', format='iceberg', partitioned_by=['bucket(5, user_id)'], - strict_location=false, table_properties={ 'optimize_rewrite_delete_file_threshold': '2' } @@ -193,6 +207,12 @@ You can run the tests using `make`: make run_tests ``` +### Helpful Resources + +* [Athena CREATE TABLE AS](https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html) +* [dbt run_started_at](https://docs.getdbt.com/reference/dbt-jinja-functions/run_started_at) +* [dbt invocation_id](https://docs.getdbt.com/reference/dbt-jinja-functions/invocation_id) + ### Community * [fishtown-analytics/dbt][fishtown-analytics/dbt] diff --git a/dbt/adapters/athena/connections.py b/dbt/adapters/athena/connections.py index 0835ca41..bd0772fc 100644 --- a/dbt/adapters/athena/connections.py +++ b/dbt/adapters/athena/connections.py @@ -45,6 +45,8 @@ class AthenaCredentials(Credentials): poll_interval: float = 1.0 _ALIASES = {"catalog": "database"} num_retries: Optional[int] = 5 + s3_data_dir: Optional[str] = None + s3_data_naming: Optional[str] = "schema_table_unique" @property def type(self) -> str: @@ -64,6 +66,8 @@ def _connection_keys(self) -> Tuple[str, ...]: "poll_interval", "aws_profile_name", "endpoing_url", + "s3_data_dir", + "s3_data_naming", ) diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index 27c3d808..37ad06e9 100755 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -46,26 +46,41 @@ def convert_datetime_type(cls, agate_table: agate.Table, col_idx: int) -> str: return "timestamp" @available - def s3_uuid_table_location(self): + def s3_table_prefix(self, s3_data_dir: str) -> str: + """ + Returns the root location for storing tables in S3. + This is `s3_data_dir`, if set, and `s3_staging_dir/tables/` if not. + We generate a value here even if `s3_data_dir` is not set, + since creating a seed table requires a non-default location. + """ conn = self.connections.get_thread_connection() - client = conn.handle - return f"{client.s3_staging_dir}tables/{str(uuid4())}/" + creds = conn.credentials + if s3_data_dir is not None: + return s3_data_dir + else: + return path.join(creds.s3_staging_dir, "tables") @available - def s3_unique_location(self, external_location, strict_location, staging_dir, relation_name): + def s3_table_location(self, s3_data_dir: str, s3_data_naming: str, schema_name: str, table_name: str) -> str: """ - Generate a unique not overlapping location. + Returns either a UUID or database/table prefix for storing a table, + depending on the value of s3_table """ - unique_id = str(uuid4()) - if external_location is not None: - if not strict_location: - if external_location.endswith("/"): - external_location = external_location[:-1] - external_location = f"{external_location}_{unique_id}/" - else: - base_path = path.join(staging_dir, f"{relation_name}_{unique_id}") - external_location = f"{base_path}/" - return external_location + mapping = { + "uuid": path.join(self.s3_table_prefix(s3_data_dir), str(uuid4())) + "/", + "table": path.join(self.s3_table_prefix(s3_data_dir), table_name) + "/", + "table_unique": path.join(self.s3_table_prefix(s3_data_dir), table_name, str(uuid4())) + "/", + "schema_table": path.join(self.s3_table_prefix(s3_data_dir), schema_name, table_name) + "/", + "schema_table_unique": path.join(self.s3_table_prefix(s3_data_dir), schema_name, table_name, str(uuid4())) + + "/", + } + + table_location = mapping.get(s3_data_naming) + + if table_location is None: + raise ValueError(f"Unknown value for s3_data_naming: {s3_data_naming}") + + return table_location @available def clean_up_partitions(self, database_name: str, table_name: str, where_condition: str): diff --git a/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql b/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql index 7a6b8920..1dc10522 100644 --- a/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql +++ b/dbt/include/athena/macros/materializations/models/helpers_iceberg.sql @@ -50,12 +50,13 @@ {% macro create_iceberg_table_definition(relation, dest_columns) -%} {%- set external_location = config.get('external_location', default=none) -%} - {%- set strict_location = config.get('strict_location', default=true) -%} {%- set partitioned_by = config.get('partitioned_by', default=none) -%} {%- set table_properties = config.get('table_properties', default={}) -%} {%- set _ = table_properties.update({'table_type': 'ICEBERG'}) -%} {%- set table_properties_formatted = [] -%} {%- set dest_columns_with_type = [] -%} + {%- set s3_data_dir = config.get('s3_data_dir', default=target.s3_data_dir) -%} + {%- set s3_data_naming = config.get('s3_data_naming', default=target.s3_data_naming) -%} {%- for k in table_properties -%} {% set _ = table_properties_formatted.append("'" + k + "'='" + table_properties[k] + "'") -%} @@ -63,7 +64,9 @@ {%- set table_properties_csv= table_properties_formatted | join(', ') -%} - {%- set external_location = adapter.s3_unique_location(external_location, strict_location, target.s3_staging_dir, relation.name) -%} + {%- if external_location is none %} + {%- set external_location = adapter.s3_table_location(s3_data_dir, s3_data_naming, relation.schema, relation.identifier) -%} + {%- endif %} {%- for col in dest_columns -%} {% set dtype = iceberg_data_type(col.dtype) -%} diff --git a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql index 70157c40..82cf391a 100644 --- a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql +++ b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql @@ -6,6 +6,8 @@ {%- set field_delimiter = config.get('field_delimiter', default=none) -%} {%- set format = config.get('format', default='parquet') -%} {%- set write_compression = config.get('write_compression', default=none) -%} + {%- set s3_data_dir = config.get('s3_data_dir', default=target.s3_data_dir) -%} + {%- set s3_data_naming = config.get('s3_data_naming', default=target.s3_data_naming) -%} create table {{ relation }} @@ -13,6 +15,8 @@ with ( {%- if external_location is not none and not temporary %} external_location='{{ external_location }}', + {%- else -%} + external_location='{{ adapter.s3_table_location(s3_data_dir, s3_data_naming, relation.schema, relation.identifier) }}', {%- endif %} {%- if partitioned_by is not none %} partitioned_by=ARRAY{{ partitioned_by | tojson | replace('\"', '\'') }}, diff --git a/dbt/include/athena/macros/materializations/seeds/helpers.sql b/dbt/include/athena/macros/materializations/seeds/helpers.sql index bbc0e0e0..2ce2956c 100644 --- a/dbt/include/athena/macros/materializations/seeds/helpers.sql +++ b/dbt/include/athena/macros/materializations/seeds/helpers.sql @@ -10,6 +10,8 @@ {% macro athena__create_csv_table(model, agate_table) %} {%- set column_override = model['config'].get('column_types', {}) -%} {%- set quote_seed_column = model['config'].get('quote_columns', None) -%} + {%- set s3_data_dir = config.get('s3_data_dir', default=target.s3_data_dir) -%} + {%- set s3_data_naming = model['config'].get('s3_data_naming', target.s3_data_naming) -%} {% set sql %} create external table {{ this.render() }} ( @@ -21,7 +23,7 @@ {%- endfor -%} ) stored as parquet - location '{{ adapter.s3_uuid_table_location() }}' + location '{{ adapter.s3_table_location(s3_data_dir, s3_data_naming, model["schema"], model["alias"]) }}' tblproperties ('classification'='parquet') {% endset %}